Purging headpages needs to reflect in the currentByteSize of the Queue

Fixes #8186
This commit is contained in:
Armin 2017-09-13 10:32:12 +02:00 committed by Armin Braun
parent 0fe8b38280
commit f7eca432c3
2 changed files with 27 additions and 0 deletions

View file

@ -382,6 +382,7 @@ public class Queue implements Closeable {
// not trigger a checkpoint creation in itself
TailPage tailPage = new TailPage(this.headPage);
tailPage.purge();
currentByteSize -= tailPage.getPageIO().getCapacity();
} else {
// beheading includes checkpoint+fsync if required
TailPage tailPage = this.headPage.behead();

View file

@ -23,6 +23,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -80,6 +81,31 @@ public class QueueTest {
}
}
/**
* This test guards against issue https://github.com/elastic/logstash/pull/8186 by ensuring
* that repeated writes to an already fully acknowledged headpage do not corrupt the queue's
* internal bytesize counter.
* @throws IOException On Failure
*/
@Test(timeout = 5000)
public void writeToFullyAckedHeadpage() throws IOException {
final Queueable element = new StringElement("foobarbaz");
final int page = element.serialize().length * 2 + AbstractByteBufferPageIO.MIN_CAPACITY;
// Queue that can only hold one element per page.
try (Queue q = new TestQueue(
TestSettings.volatileQueueSettings(page, page * 2 - 1))) {
q.open();
for (int i = 0; i < 5; ++i) {
q.write(element);
try (Batch b = q.readBatch(1, 500L)) {
assertThat(b.getElements().size(), is(1));
assertThat(b.getElements().get(0).toString(), is(element.toString()));
}
}
assertThat(q.nonBlockReadBatch(1), nullValue());
}
}
@Test
public void singleWriteMultiRead() throws IOException {
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {