mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
handle fully acked zero byte page at queue open (#7810)
This commit is contained in:
parent
64388f34a5
commit
73f00842fa
3 changed files with 103 additions and 5 deletions
|
@ -38,6 +38,8 @@ public class Batch implements Closeable {
|
|||
return elements;
|
||||
}
|
||||
|
||||
public List<Long> getSeqNums() { return this.seqNums; }
|
||||
|
||||
public Queue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
|
|
@ -198,9 +198,7 @@ public class Queue implements Closeable {
|
|||
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
|
||||
|
||||
add(cp, new TailPage(cp, this, pageIO));
|
||||
addIO(cp, pageIO);
|
||||
}
|
||||
|
||||
// transform the head page into a tail page only if the headpage is non-empty
|
||||
|
@ -235,7 +233,7 @@ public class Queue implements Closeable {
|
|||
this.headPage.checkpoint();
|
||||
} else {
|
||||
// head page is non-empty, transform it into a tail page and create a new empty head page
|
||||
add(headCheckpoint, this.headPage.behead());
|
||||
addPage(headCheckpoint, this.headPage.behead());
|
||||
|
||||
headPageNum = headCheckpoint.getPageNum() + 1;
|
||||
newCheckpointedHeadpage(headPageNum);
|
||||
|
@ -261,9 +259,53 @@ public class Queue implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: addIO and addPage are almost identical - we should refactor to DRY it up.
|
||||
|
||||
// addIO is basically the same as addPage except that it avoid calling PageIO.open
|
||||
// before actually purging the page if it is fully acked. This avoid dealing with
|
||||
// zero byte page files that are fully acked.
|
||||
// see issue #7809
|
||||
private void addIO(Checkpoint checkpoint, PageIO pageIO) throws IOException {
|
||||
if (checkpoint.isFullyAcked()) {
|
||||
// first make sure any fully acked page per the checkpoint is purged if not already
|
||||
try { pageIO.purge(); } catch (NoSuchFileException e) { /* ignore */ }
|
||||
|
||||
// we want to keep all the "middle" checkpoints between the first unacked tail page and the head page
|
||||
// to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this
|
||||
// we will remove any prepended fully acked tail pages but keep all other checkpoints between the first
|
||||
// unacked tail page and the head page. we did however purge the data file to free disk resources.
|
||||
|
||||
if (this.tailPages.size() == 0) {
|
||||
// this is the first tail page and it is fully acked so just purge it
|
||||
this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum()));
|
||||
} else {
|
||||
// create a tail page with a null PageIO and add it to tail pages but not unreadTailPages
|
||||
// since it is fully read because also fully acked
|
||||
// TODO: I don't like this null pageIO tail page...
|
||||
this.tailPages.add(new TailPage(checkpoint, this, null));
|
||||
}
|
||||
} else {
|
||||
pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount());
|
||||
TailPage page = new TailPage(checkpoint, this, pageIO);
|
||||
|
||||
this.tailPages.add(page);
|
||||
this.unreadTailPages.add(page);
|
||||
this.unreadCount += page.unreadCount();
|
||||
this.currentByteSize += page.getPageIO().getCapacity();
|
||||
|
||||
// for now deactivate all tail pages, we will only reactivate the first one at the end
|
||||
page.getPageIO().deactivate();
|
||||
}
|
||||
|
||||
// track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum
|
||||
if (checkpoint.maxSeqNum() > this.seqNum) {
|
||||
this.seqNum = checkpoint.maxSeqNum();
|
||||
}
|
||||
}
|
||||
|
||||
// add a read tail page into this queue structures but also verify that this tail page
|
||||
// is not fully acked in which case it will be purged
|
||||
private void add(Checkpoint checkpoint, TailPage page) throws IOException {
|
||||
private void addPage(Checkpoint checkpoint, TailPage page) throws IOException {
|
||||
if (checkpoint.isFullyAcked()) {
|
||||
// first make sure any fully acked page per the checkpoint is purged if not already
|
||||
try { page.getPageIO().purge(); } catch (NoSuchFileException e) { /* ignore */ }
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
package org.logstash.ackedqueue;
|
||||
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -782,4 +785,55 @@ public class QueueTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZeroByteFullyAckedPageOnOpen() throws IOException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element);
|
||||
Settings settings = TestSettings.persistedQueueSettings(singleElementCapacity, dataPath);
|
||||
|
||||
// the goal here is to recreate a condition where the queue has a tail page of size zero with
|
||||
// a checkpoint that indicates it is full acknowledged
|
||||
// see issue #7809
|
||||
|
||||
try(TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
Queueable element1 = new StringElement("0123456789");
|
||||
Queueable element2 = new StringElement("9876543210");
|
||||
|
||||
// write 2 elements to force a new page.
|
||||
q.write(element1);
|
||||
q.write(element2);
|
||||
assertThat(q.getTailPages().size(), is(1));
|
||||
|
||||
// work directly on the tail page and not the queue to avoid habing the queue purge the page
|
||||
// but make sure the tail page checkpoint marks it as fully acked
|
||||
TailPage tp = q.getTailPages().get(0);
|
||||
Batch b = tp.readBatch(1);
|
||||
assertThat(b.getElements().get(0), is(element1));
|
||||
tp.ack(b.getSeqNums(), 1);
|
||||
assertThat(tp.isFullyAcked(), is(true));
|
||||
|
||||
}
|
||||
// now we have a queue state where page 0 is fully acked but not purged
|
||||
// manually truncate page 0 to zero byte.
|
||||
|
||||
// TODO page.0 file name is hard coded here because we did not expose the page file naming.
|
||||
FileChannel c = new FileOutputStream(Paths.get(dataPath, "page.0").toFile(), true).getChannel();
|
||||
c.truncate(0);
|
||||
c.close();
|
||||
|
||||
try(TestQueue q = new TestQueue(settings)) {
|
||||
// here q.open used to crash with:
|
||||
// java.io.IOException: Page file size 0 different to configured page capacity 27 for ...
|
||||
// because page.0 ended up as a zero byte file but its checkpoint says it's fully acked
|
||||
q.open();
|
||||
assertThat(q.getUnackedCount(), is(1L));
|
||||
assertThat(q.getTailPages().size(), is(1));
|
||||
assertThat(q.getTailPages().get(0).isFullyAcked(), is(false));
|
||||
assertThat(q.getTailPages().get(0).elementCount, is(1));
|
||||
assertThat(q.getHeadPage().elementCount, is(0));
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue