mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
fix queue open bug for fully acked in-between pages plus new test
This commit is contained in:
parent
c2e922f9c5
commit
400075d715
2 changed files with 37 additions and 11 deletions
|
@ -282,11 +282,6 @@ public class Queue implements Closeable {
|
|||
if (this.tailPages.size() == 0) {
|
||||
// this is the first tail page and it is fully acked so just purge it
|
||||
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
|
||||
} else {
|
||||
// create a tail page with a null PageIO and add it to tail pages but not unreadTailPages
|
||||
// since it is fully read because also fully acked
|
||||
// TODO: I don't like this null pageIO tail page...
|
||||
this.tailPages.add(new TailPage(checkpoint, this, null));
|
||||
}
|
||||
} else {
|
||||
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
|
||||
|
@ -322,11 +317,6 @@ public class Queue implements Closeable {
|
|||
if (this.tailPages.size() == 0) {
|
||||
// this is the first tail page and it is fully acked so just purge it
|
||||
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
|
||||
} else {
|
||||
// create a tail page with a null PageIO and add it to tail pages but not unreadTailPages
|
||||
// since it is fully read because also fully acked
|
||||
// TODO: I don't like this null pageIO tail page...
|
||||
this.tailPages.add(new TailPage(checkpoint, this, null));
|
||||
}
|
||||
} else {
|
||||
this.tailPages.add(page);
|
||||
|
|
|
@ -775,7 +775,43 @@ public class QueueTest {
|
|||
assertThat(q.getPersistedByteSize(), is(0L));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void getsPersistedByteSizeCorrectlyForFullyAckedDeletedTailPages() throws Exception {
|
||||
final Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
final int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
final Settings settings = TestSettings.persistedQueueSettings(singleElementCapacity, dataPath);
|
||||
|
||||
try (Queue q = new Queue(settings)) {
|
||||
q.open();
|
||||
|
||||
q.write(element);
|
||||
Batch b1 = q.readBatch(2);
|
||||
q.write(element);
|
||||
Batch b2 = q.readBatch(2);
|
||||
q.write(element);
|
||||
Batch b3 = q.readBatch(2);
|
||||
q.write(element);
|
||||
Batch b4 = q.readBatch(2);
|
||||
|
||||
assertThat(q.tailPages.size(), is(3));
|
||||
assertThat(q.getPersistedByteSize() > 0, is(true));
|
||||
|
||||
// fully ack middle page and head page
|
||||
b2.close();
|
||||
b4.close();
|
||||
|
||||
assertThat(q.tailPages.size(), is(2));
|
||||
assertThat(q.getPersistedByteSize() > 0, is(true));
|
||||
|
||||
q.close();
|
||||
q.open();
|
||||
|
||||
assertThat(q.tailPages.size(), is(2));
|
||||
assertThat(q.getPersistedByteSize() > 0, is(true));
|
||||
}
|
||||
}
|
||||
|
||||
private void stableUnderStress(final int capacity) throws IOException {
|
||||
Settings settings = TestSettings.persistedQueueSettings(capacity, dataPath);
|
||||
final ExecutorService exec = Executors.newScheduledThreadPool(2);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue