diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 2e3ed5d08..08c5fdfb4 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -382,6 +382,7 @@ public class Queue implements Closeable { // not trigger a checkpoint creation in itself TailPage tailPage = new TailPage(this.headPage); tailPage.purge(); + currentByteSize -= tailPage.getPageIO().getCapacity(); } else { // beheading includes checkpoint+fsync if required TailPage tailPage = this.headPage.behead(); diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index bc66181c7..76b527823 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -23,6 +23,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.logstash.ackedqueue.io.AbstractByteBufferPageIO; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; @@ -80,6 +81,31 @@ public class QueueTest { } } + /** + * This test guards against issue https://github.com/elastic/logstash/pull/8186 by ensuring + * that repeated writes to an already fully acknowledged headpage do not corrupt the queue's + * internal bytesize counter. + * @throws IOException On Failure + */ + @Test(timeout = 5000) + public void writeToFullyAckedHeadpage() throws IOException { + final Queueable element = new StringElement("foobarbaz"); + final int page = element.serialize().length * 2 + AbstractByteBufferPageIO.MIN_CAPACITY; + // Queue that can only hold one element per page. + try (Queue q = new TestQueue( + TestSettings.volatileQueueSettings(page, page * 2 - 1))) { + q.open(); + for (int i = 0; i < 5; ++i) { + q.write(element); + try (Batch b = q.readBatch(1, 500L)) { + assertThat(b.getElements().size(), is(1)); + assertThat(b.getElements().get(0).toString(), is(element.toString())); + } + } + assertThat(q.nonBlockReadBatch(1), nullValue()); + } + } + @Test public void singleWriteMultiRead() throws IOException { try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {