diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 4bcef21c7..06dc377b7 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -45,7 +45,6 @@ public class Queue implements Closeable { protected final List unreadTailPages; protected volatile long unreadCount; - protected volatile long currentByteSize; private final CheckpointIO checkpointIO; @@ -331,7 +330,7 @@ public class Queue implements Closeable { this.unreadCount++; // if the queue was empty before write, signal non emptiness - if (wasEmpty) { notEmpty.signal(); } + if (wasEmpty) { notEmpty.signalAll(); } // now check if we reached a queue full state and block here until it is not full // for the next write or the queue was closed. @@ -481,7 +480,7 @@ public class Queue implements Closeable { this.unreadCount -= b.size(); if (p.isFullyRead()) { removeUnreadPage(p); } - if (wasFull) { notFull.signal(); } + if (wasFull) { notFull.signalAll(); } return b; } @@ -565,6 +564,8 @@ public class Queue implements Closeable { // cleanup fully acked tail page if (result.page.isFullyAcked()) { + boolean wasFull = isFull(); + this.tailPages.remove(result.index); // remove page data file regardless if it is the first or a middle tail page to free resources @@ -583,6 +584,8 @@ public class Queue implements Closeable { this.tailPages.remove(0); } } + + if (wasFull) { notFull.signalAll(); } } this.headPage.checkpoint(); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index c3bdd4960..f1e0ff4b4 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -464,7 +464,7 @@ public class QueueTest { } @Test(timeout = 5000) - public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException { + public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedException, ExecutionException { Queueable element = new StringElement("0123456789"); // 10 bytes @@ -487,22 +487,64 @@ public class QueueTest { Callable write = () -> { return q.write(element); }; - ExecutorService executor = Executors.newFixedThreadPool(1); Future future = executor.submit(write); + assertThat(future.isDone(), is(false)); while (!q.isFull()) { Thread.sleep(10); } - assertThat(q.isFull(), is(true)); Batch b = q.readBatch(10); // read 1 page (10 events) b.close(); // purge 1 page - // spin wait until data is written and write blocks while (q.isFull()) { Thread.sleep(10); } + assertThat(q.isFull(), is(false)); + + // will not complete because write will not unblock until the page is purge with a batch close/acking. + assertThat(future.isDone(), is(false)); + + q.close(); + } + + @Test(timeout = 5000) + public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException { + Queueable element = new StringElement("0123456789"); // 10 bytes + + int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length); + + // allow 10 elements per page but only 100 events in total + Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100); + + TestQueue q = new TestQueue(settings); + q.open(); + + int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full + for (int i = 0; i < ELEMENT_COUNT; i++) { + long seqNum = q.write(element); + } assertThat(q.isFull(), is(false)); + // read 1 page (10 events) here while not full yet so that the read will not singal the not full state + // we want the batch closing below to signal the not full state + Batch b = q.readBatch(10); + + // we expect this next write call to block so let's wrap it in a Future + Callable write = () -> { + return q.write(element); + }; + ExecutorService executor = Executors.newFixedThreadPool(1); + Future future = executor.submit(write); + assertThat(future.isDone(), is(false)); + + while (!q.isFull()) { Thread.sleep(10); } + assertThat(q.isFull(), is(true)); + assertThat(future.isDone(), is(false)); + + b.close(); // purge 1 page + + assertThat(future.get(), is(equalTo(ELEMENT_COUNT + 1L))); + executor.shutdown(); q.close(); }