diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 3a6a4e77a..bcbfe988a 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -160,6 +160,8 @@ public class Queue implements Closeable { } catch (NoSuchFileException e) { // if there is no head checkpoint, create a new headpage and checkpoint it and exit method + logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName()); + this.seqNum = 0; headPageNum = 0; @@ -177,6 +179,8 @@ public class Queue implements Closeable { // all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum)); + logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString()); + PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath); pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); @@ -186,6 +190,8 @@ public class Queue implements Closeable { // transform the head page into a tail page only if the headpage is non-empty // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages + logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString()); + PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data diff --git a/logstash-core/src/main/java/org/logstash/common/io/AbstractByteBufferPageIO.java b/logstash-core/src/main/java/org/logstash/common/io/AbstractByteBufferPageIO.java index bdc7714f6..19282e37c 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/AbstractByteBufferPageIO.java +++ b/logstash-core/src/main/java/org/logstash/common/io/AbstractByteBufferPageIO.java @@ -1,5 +1,7 @@ package org.logstash.common.io; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.logstash.ackedqueue.SequencedList; import java.io.IOException; @@ -33,6 +35,8 @@ public abstract class AbstractByteBufferPageIO implements PageIO { public static final boolean VERIFY_CHECKSUM = true; public static final boolean STRICT_CAPACITY = true; + private static final Logger logger = LogManager.getLogger(AbstractByteBufferPageIO.class); + protected int capacity; // page capacity is an int per the ByteBuffer class. protected final int pageNum; protected final List offsetMap; // has to be extendable @@ -105,6 +109,7 @@ public abstract class AbstractByteBufferPageIO implements PageIO { this.elementCount += 1; } catch (PageIOInvalidElementException e) { // simply stop at first invalid element + logger.debug("PageIO recovery element index:{}, readNextElement exception: {}", i, e.getMessage()); break; } } @@ -131,21 +136,22 @@ public abstract class AbstractByteBufferPageIO implements PageIO { if (this.head + SEQNUM_SIZE + LENGTH_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read seqNum and length bytes past buffer capacity"); } int elementOffset = this.head; + int newHead = this.head; ByteBuffer buffer = getBuffer(); long seqNum = buffer.getLong(); - this.head += SEQNUM_SIZE; + newHead += SEQNUM_SIZE; if (seqNum != expectedSeqNum) { throw new PageIOInvalidElementException(String.format("Element seqNum %d is expected to be %d", seqNum, expectedSeqNum)); } int length = buffer.getInt(); - this.head += LENGTH_SIZE; + newHead += LENGTH_SIZE; // length must be > 0 if (length <= 0) { throw new PageIOInvalidElementException("Element invalid length"); } // if there is no room for the proposed data length and checksum just stop here - if (this.head + length + CHECKSUM_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read element payload and checksum past buffer capacity"); } + if (newHead + length + CHECKSUM_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read element payload and checksum past buffer capacity"); } if (verifyChecksum) { // read data and compute checksum; @@ -158,9 +164,9 @@ public abstract class AbstractByteBufferPageIO implements PageIO { // at this point we recovered a valid element this.offsetMap.add(elementOffset); - this.head += length + CHECKSUM_SIZE; + this.head = newHead + length + CHECKSUM_SIZE; - buffer.position(head); + buffer.position(this.head); } @Override diff --git a/logstash-core/src/main/java/org/logstash/common/io/MmapPageIO.java b/logstash-core/src/main/java/org/logstash/common/io/MmapPageIO.java index a3d2763dd..12d316e8d 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/MmapPageIO.java +++ b/logstash-core/src/main/java/org/logstash/common/io/MmapPageIO.java @@ -101,6 +101,9 @@ public class MmapPageIO extends AbstractByteBufferPageIO { @Override public void close() throws IOException { + if (this.buffer != null) { + this.buffer.force(); + } if (this.channel != null && this.channel.isOpen()) { this.channel.close(); } diff --git a/logstash-core/src/test/java/org/logstash/common/io/ByteBufferPageIOTest.java b/logstash-core/src/test/java/org/logstash/common/io/ByteBufferPageIOTest.java index 5dc7713ba..c238c8c36 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/ByteBufferPageIOTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/ByteBufferPageIOTest.java @@ -134,6 +134,46 @@ public class ByteBufferPageIOTest { assertThat(io.getMinSeqNum(), is(equalTo(seqNum))); } + @Test + public void recoverEmptyWriteRecover() throws IOException { + Queueable element = new StringElement("foobarbaz"); + long seqNum = 42L; + ByteBufferPageIO io = newEmptyPageIO(); + byte[] inititalState = io.dump(); + + io = newPageIO(inititalState.length, inititalState); + io.recover(); + assertThat(io.getElementCount(), is(equalTo(0))); + + io.write(element.serialize(), seqNum); + inititalState = io.dump(); + + io = newPageIO(inititalState.length, inititalState); + io.recover(); + assertThat(io.getElementCount(), is(equalTo(1))); + assertThat(io.getMinSeqNum(), is(equalTo(seqNum))); + } + + @Test + public void recoverNonEmptyWriteRecover() throws IOException { + Queueable element = new StringElement("foobarbaz"); + + ByteBufferPageIO io = newEmptyPageIO(); + io.write(element.serialize(), 1L); + byte[] inititalState = io.dump(); + + io = newPageIO(inititalState.length, inititalState); + io.recover(); + assertThat(io.getElementCount(), is(equalTo(1))); + + io.write(element.serialize(), 2L); + inititalState = io.dump(); + + io = newPageIO(inititalState.length, inititalState); + io.recover(); + assertThat(io.getElementCount(), is(equalTo(2))); + } + @Test(expected = IOException.class) public void openUnexpectedSeqNum() throws IOException { Queueable element = new StringElement("foobarbaz");