diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 0f4364b3a..b58aad0c4 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; @@ -473,38 +474,37 @@ public class QueueTest { // allow 10 elements per page but only 100 events in total Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100); - - TestQueue q = new TestQueue(settings); - q.open(); - - int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full - for (int i = 0; i < ELEMENT_COUNT; i++) { - long seqNum = q.write(element); - } - - assertThat(q.isFull(), is(false)); - - // we expect this next write call to block so let's wrap it in a Future - Callable write = () -> { - return q.write(element); - }; ExecutorService executor = Executors.newFixedThreadPool(1); - Future future = executor.submit(write); - assertThat(future.isDone(), is(false)); - - while (!q.isFull()) { Thread.sleep(10); } - assertThat(q.isFull(), is(true)); - - Batch b = q.readBatch(10); // read 1 page (10 events) - b.close(); // purge 1 page - - while (q.isFull()) { Thread.sleep(10); } - assertThat(q.isFull(), is(false)); - - // will not complete because write will not unblock until the page is purge with a batch close/acking. - assertThat(future.isDone(), is(false)); - - q.close(); + try (TestQueue q = new TestQueue(settings)) { + q.open(); + // should be able to write 90 events (9 pages) before getting full + final long ELEMENT_COUNT = 90; + for (int i = 0; i < ELEMENT_COUNT; i++) { + q.write(element); + } + assertThat(q.isFull(), is(false)); + + // we expect this next write call to block so let's wrap it in a Future + Callable write = () -> q.write(element); + Future future = executor.submit(write); + assertThat(future.isDone(), is(false)); + + while (!q.isFull()) { + Thread.sleep(10); + } + assertThat(q.isFull(), is(true)); + + Batch b = q.readBatch(10); // read 1 page (10 events) + b.close(); // purge 1 page + + while (q.isFull()) { Thread.sleep(10); } + assertThat(q.isFull(), is(false)); + + assertThat(future.get(), is(ELEMENT_COUNT + 1)); + } finally { + executor.shutdownNow(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + } } @Test(timeout = 5000)