diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java index 7ff83a1c7..3fb6fb510 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Batch.java @@ -38,6 +38,8 @@ public class Batch implements Closeable { return elements; } + public List getSeqNums() { return this.seqNums; } + public Queue getQueue() { return queue; } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 64ec027a1..dad8d0f9d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -198,9 +198,7 @@ public class Queue implements Closeable { logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString()); PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath); - pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); - - add(cp, new TailPage(cp, this, pageIO)); + addIO(cp, pageIO); } // transform the head page into a tail page only if the headpage is non-empty @@ -235,7 +233,7 @@ public class Queue implements Closeable { this.headPage.checkpoint(); } else { // head page is non-empty, transform it into a tail page and create a new empty head page - add(headCheckpoint, this.headPage.behead()); + addPage(headCheckpoint, this.headPage.behead()); headPageNum = headCheckpoint.getPageNum() + 1; newCheckpointedHeadpage(headPageNum); @@ -261,9 +259,53 @@ public class Queue implements Closeable { } } + // TODO: addIO and addPage are almost identical - we should refactor to DRY it up. + + // addIO is basically the same as addPage except that it avoid calling PageIO.open + // before actually purging the page if it is fully acked. This avoid dealing with + // zero byte page files that are fully acked. + // see issue #7809 + private void addIO(Checkpoint checkpoint, PageIO pageIO) throws IOException { + if (checkpoint.isFullyAcked()) { + // first make sure any fully acked page per the checkpoint is purged if not already + try { pageIO.purge(); } catch (NoSuchFileException e) { /* ignore */ } + + // we want to keep all the "middle" checkpoints between the first unacked tail page and the head page + // to always have a contiguous sequence of checkpoints which helps figuring queue integrity. for this + // we will remove any prepended fully acked tail pages but keep all other checkpoints between the first + // unacked tail page and the head page. we did however purge the data file to free disk resources. + + if (this.tailPages.size() == 0) { + // this is the first tail page and it is fully acked so just purge it + this.checkpointIO.purge(this.checkpointIO.tailFileName(checkpoint.getPageNum())); + } else { + // create a tail page with a null PageIO and add it to tail pages but not unreadTailPages + // since it is fully read because also fully acked + // TODO: I don't like this null pageIO tail page... + this.tailPages.add(new TailPage(checkpoint, this, null)); + } + } else { + pageIO.open(checkpoint.getMinSeqNum(), checkpoint.getElementCount()); + TailPage page = new TailPage(checkpoint, this, pageIO); + + this.tailPages.add(page); + this.unreadTailPages.add(page); + this.unreadCount += page.unreadCount(); + this.currentByteSize += page.getPageIO().getCapacity(); + + // for now deactivate all tail pages, we will only reactivate the first one at the end + page.getPageIO().deactivate(); + } + + // track the seqNum as we rebuild tail pages, prevent empty pages with a minSeqNum of 0 to reset seqNum + if (checkpoint.maxSeqNum() > this.seqNum) { + this.seqNum = checkpoint.maxSeqNum(); + } + } + // add a read tail page into this queue structures but also verify that this tail page // is not fully acked in which case it will be purged - private void add(Checkpoint checkpoint, TailPage page) throws IOException { + private void addPage(Checkpoint checkpoint, TailPage page) throws IOException { if (checkpoint.isFullyAcked()) { // first make sure any fully acked page per the checkpoint is purged if not already try { page.getPageIO().purge(); } catch (NoSuchFileException e) { /* ignore */ } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 6bd4bb987..bc66181c7 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -1,7 +1,10 @@ package org.logstash.ackedqueue; +import java.io.FileOutputStream; import java.io.IOException; +import java.nio.channels.FileChannel; import java.nio.file.NoSuchFileException; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -782,4 +785,55 @@ public class QueueTest { } } + @Test + public void testZeroByteFullyAckedPageOnOpen() throws IOException { + Queueable element = new StringElement("0123456789"); // 10 bytes + int singleElementCapacity = singleElementCapacityForByteBufferPageIO(element); + Settings settings = TestSettings.persistedQueueSettings(singleElementCapacity, dataPath); + + // the goal here is to recreate a condition where the queue has a tail page of size zero with + // a checkpoint that indicates it is full acknowledged + // see issue #7809 + + try(TestQueue q = new TestQueue(settings)) { + q.open(); + + Queueable element1 = new StringElement("0123456789"); + Queueable element2 = new StringElement("9876543210"); + + // write 2 elements to force a new page. + q.write(element1); + q.write(element2); + assertThat(q.getTailPages().size(), is(1)); + + // work directly on the tail page and not the queue to avoid habing the queue purge the page + // but make sure the tail page checkpoint marks it as fully acked + TailPage tp = q.getTailPages().get(0); + Batch b = tp.readBatch(1); + assertThat(b.getElements().get(0), is(element1)); + tp.ack(b.getSeqNums(), 1); + assertThat(tp.isFullyAcked(), is(true)); + + } + // now we have a queue state where page 0 is fully acked but not purged + // manually truncate page 0 to zero byte. + + // TODO page.0 file name is hard coded here because we did not expose the page file naming. + FileChannel c = new FileOutputStream(Paths.get(dataPath, "page.0").toFile(), true).getChannel(); + c.truncate(0); + c.close(); + + try(TestQueue q = new TestQueue(settings)) { + // here q.open used to crash with: + // java.io.IOException: Page file size 0 different to configured page capacity 27 for ... + // because page.0 ended up as a zero byte file but its checkpoint says it's fully acked + q.open(); + assertThat(q.getUnackedCount(), is(1L)); + assertThat(q.getTailPages().size(), is(1)); + assertThat(q.getTailPages().get(0).isFullyAcked(), is(false)); + assertThat(q.getTailPages().get(0).elementCount, is(1)); + assertThat(q.getHeadPage().elementCount, is(0)); + } + } + }