fix bad head positioning on recovery & tests

This commit is contained in:
Colin Surprenant 2017-02-13 18:54:35 -05:00
parent f323e68ac7
commit 971537c27e
4 changed files with 60 additions and 5 deletions

View file

@ -160,6 +160,8 @@ public class Queue implements Closeable {
} catch (NoSuchFileException e) {
// if there is no head checkpoint, create a new headpage and checkpoint it and exit method
logger.debug("No head checkpoint found at: {}, creating new head page", checkpointIO.headFileName());
this.seqNum = 0;
headPageNum = 0;
@ -177,6 +179,8 @@ public class Queue implements Closeable {
// all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException
Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
logger.debug("opening tail page: {}, in: {}, with checkpoint: {}", pageNum, this.dirPath, cp.toString());
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
pageIO.open(cp.getMinSeqNum(), cp.getElementCount());
@ -186,6 +190,8 @@ public class Queue implements Closeable {
// transform the head page into a tail page only if the headpage is non-empty
// in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages
logger.debug("opening head page: {}, in: {}, with checkpoint: {}", headCheckpoint.getPageNum(), this.dirPath, headCheckpoint.toString());
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data

View file

@ -1,5 +1,7 @@
package org.logstash.common.io;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ackedqueue.SequencedList;
import java.io.IOException;
@ -33,6 +35,8 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
public static final boolean VERIFY_CHECKSUM = true;
public static final boolean STRICT_CAPACITY = true;
private static final Logger logger = LogManager.getLogger(AbstractByteBufferPageIO.class);
protected int capacity; // page capacity is an int per the ByteBuffer class.
protected final int pageNum;
protected final List<Integer> offsetMap; // has to be extendable
@ -105,6 +109,7 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
this.elementCount += 1;
} catch (PageIOInvalidElementException e) {
// simply stop at first invalid element
logger.debug("PageIO recovery element index:{}, readNextElement exception: {}", i, e.getMessage());
break;
}
}
@ -131,21 +136,22 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
if (this.head + SEQNUM_SIZE + LENGTH_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read seqNum and length bytes past buffer capacity"); }
int elementOffset = this.head;
int newHead = this.head;
ByteBuffer buffer = getBuffer();
long seqNum = buffer.getLong();
this.head += SEQNUM_SIZE;
newHead += SEQNUM_SIZE;
if (seqNum != expectedSeqNum) { throw new PageIOInvalidElementException(String.format("Element seqNum %d is expected to be %d", seqNum, expectedSeqNum)); }
int length = buffer.getInt();
this.head += LENGTH_SIZE;
newHead += LENGTH_SIZE;
// length must be > 0
if (length <= 0) { throw new PageIOInvalidElementException("Element invalid length"); }
// if there is no room for the proposed data length and checksum just stop here
if (this.head + length + CHECKSUM_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read element payload and checksum past buffer capacity"); }
if (newHead + length + CHECKSUM_SIZE > capacity) { throw new PageIOInvalidElementException("cannot read element payload and checksum past buffer capacity"); }
if (verifyChecksum) {
// read data and compute checksum;
@ -158,9 +164,9 @@ public abstract class AbstractByteBufferPageIO implements PageIO {
// at this point we recovered a valid element
this.offsetMap.add(elementOffset);
this.head += length + CHECKSUM_SIZE;
this.head = newHead + length + CHECKSUM_SIZE;
buffer.position(head);
buffer.position(this.head);
}
@Override

View file

@ -101,6 +101,9 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
@Override
public void close() throws IOException {
if (this.buffer != null) {
this.buffer.force();
}
if (this.channel != null && this.channel.isOpen()) {
this.channel.close();
}

View file

@ -134,6 +134,46 @@ public class ByteBufferPageIOTest {
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
}
@Test
public void recoverEmptyWriteRecover() throws IOException {
Queueable element = new StringElement("foobarbaz");
long seqNum = 42L;
ByteBufferPageIO io = newEmptyPageIO();
byte[] inititalState = io.dump();
io = newPageIO(inititalState.length, inititalState);
io.recover();
assertThat(io.getElementCount(), is(equalTo(0)));
io.write(element.serialize(), seqNum);
inititalState = io.dump();
io = newPageIO(inititalState.length, inititalState);
io.recover();
assertThat(io.getElementCount(), is(equalTo(1)));
assertThat(io.getMinSeqNum(), is(equalTo(seqNum)));
}
@Test
public void recoverNonEmptyWriteRecover() throws IOException {
Queueable element = new StringElement("foobarbaz");
ByteBufferPageIO io = newEmptyPageIO();
io.write(element.serialize(), 1L);
byte[] inititalState = io.dump();
io = newPageIO(inititalState.length, inititalState);
io.recover();
assertThat(io.getElementCount(), is(equalTo(1)));
io.write(element.serialize(), 2L);
inititalState = io.dump();
io = newPageIO(inititalState.length, inititalState);
io.recover();
assertThat(io.getElementCount(), is(equalTo(2)));
}
@Test(expected = IOException.class)
public void openUnexpectedSeqNum() throws IOException {
Queueable element = new StringElement("foobarbaz");