mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Purging headpages needs to reflect in the currentByteSize of the Queue
Fixes #8186
This commit is contained in:
parent
ee1a2825f8
commit
b23edffb63
2 changed files with 27 additions and 0 deletions
|
@ -382,6 +382,7 @@ public class Queue implements Closeable {
|
||||||
// not trigger a checkpoint creation in itself
|
// not trigger a checkpoint creation in itself
|
||||||
TailPage tailPage = new TailPage(this.headPage);
|
TailPage tailPage = new TailPage(this.headPage);
|
||||||
tailPage.purge();
|
tailPage.purge();
|
||||||
|
currentByteSize -= tailPage.getPageIO().getCapacity();
|
||||||
} else {
|
} else {
|
||||||
// beheading includes checkpoint+fsync if required
|
// beheading includes checkpoint+fsync if required
|
||||||
TailPage tailPage = this.headPage.behead();
|
TailPage tailPage = this.headPage.behead();
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.TemporaryFolder;
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
import static org.hamcrest.CoreMatchers.is;
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
@ -80,6 +81,31 @@ public class QueueTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test guards against issue https://github.com/elastic/logstash/pull/8186 by ensuring
|
||||||
|
* that repeated writes to an already fully acknowledged headpage do not corrupt the queue's
|
||||||
|
* internal bytesize counter.
|
||||||
|
* @throws IOException On Failure
|
||||||
|
*/
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void writeToFullyAckedHeadpage() throws IOException {
|
||||||
|
final Queueable element = new StringElement("foobarbaz");
|
||||||
|
final int page = element.serialize().length * 2 + AbstractByteBufferPageIO.MIN_CAPACITY;
|
||||||
|
// Queue that can only hold one element per page.
|
||||||
|
try (Queue q = new TestQueue(
|
||||||
|
TestSettings.volatileQueueSettings(page, page * 2 - 1))) {
|
||||||
|
q.open();
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
q.write(element);
|
||||||
|
try (Batch b = q.readBatch(1, 500L)) {
|
||||||
|
assertThat(b.getElements().size(), is(1));
|
||||||
|
assertThat(b.getElements().get(0).toString(), is(element.toString()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertThat(q.nonBlockReadBatch(1), nullValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void singleWriteMultiRead() throws IOException {
|
public void singleWriteMultiRead() throws IOException {
|
||||||
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
try (Queue q = new TestQueue(TestSettings.volatileQueueSettings(100))) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue