From 882acda2391f9e9afcce6757c3a89e98b75bcc67 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Thu, 24 Nov 2016 15:14:51 +0000 Subject: [PATCH] add failing test for recovery from a full queue Fixes #6297 --- .../org/logstash/ackedqueue/QueueTest.java | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 47261bb9b..27fb42168 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -420,4 +420,43 @@ public class QueueTest { executor.shutdown(); } + @Test(timeout = 5000) + public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException { + + Queueable element = new StringElement("0123456789"); // 10 bytes + + Settings settings = TestSettings.getSettings(256, 1000); // allow 10 elements per page but only 1024 bytes in total + + TestQueue q = new TestQueue(settings); + q.open(); + + int ELEMENT_COUNT = 99; // should be able to write 99 events before getting full + for (int i = 0; i < ELEMENT_COUNT; i++) { + long seqNum = q.write(element); + } + + assertThat(q.isFull(), is(false)); + + // we expect this next write call to block so let's wrap it in a Future + Callable write = () -> { + return q.write(element); + }; + + ExecutorService executor = Executors.newFixedThreadPool(1); + Future future = executor.submit(write); + + Thread.sleep(1); + + assertThat(q.isFull(), is(true)); + + Batch b = q.readBatch(99); + + // spin wait until data is written and write blocks + while (!q.isFull()) { Thread.sleep(1); } + + assertThat(q.isFull(), is(false)); + + executor.shutdown(); + } + } \ No newline at end of file