CLEANUP: Flatten AbstractByteBufferPageIO + MMapPageIO

Fixes #9011
This commit is contained in:
Armin 2018-01-23 10:36:17 +01:00 committed by Armin Braun
parent 2026b071da
commit 54da9f9024
4 changed files with 316 additions and 326 deletions

View file

@ -1,293 +0,0 @@
package org.logstash.ackedqueue.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ackedqueue.SequencedList;
public abstract class AbstractByteBufferPageIO implements PageIO {
public static class PageIOInvalidElementException extends IOException {
public PageIOInvalidElementException(String message) { super(message); }
}
public static class PageIOInvalidVersionException extends IOException {
public PageIOInvalidVersionException(String message) { super(message); }
}
public static final byte VERSION_ONE = 1;
public static final int VERSION_SIZE = Byte.BYTES;
public static final int CHECKSUM_SIZE = Integer.BYTES;
public static final int LENGTH_SIZE = Integer.BYTES;
public static final int SEQNUM_SIZE = Long.BYTES;
public static final int HEADER_SIZE = 1; // version byte
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
// Size of: Header + Sequence Number + Length + Checksum
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;
public static final boolean VERIFY_CHECKSUM = true;
private static final Logger logger = LogManager.getLogger(AbstractByteBufferPageIO.class);
protected int capacity; // page capacity is an int per the ByteBuffer class.
protected final int pageNum;
protected long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
protected int elementCount;
protected int head; // head is the write position and is an int per ByteBuffer class position
protected byte version;
private CRC32 checkSummer;
private final IntVector offsetMap;
public AbstractByteBufferPageIO(int pageNum, int capacity) {
this.minSeqNum = 0;
this.elementCount = 0;
this.version = 0;
this.head = 0;
this.pageNum = pageNum;
this.capacity = capacity;
this.offsetMap = new IntVector();
this.checkSummer = new CRC32();
}
// @return the concrete class buffer
protected abstract ByteBuffer getBuffer();
@Override
public void open(long minSeqNum, int elementCount) throws IOException {
getBuffer().position(0);
this.version = getBuffer().get();
validateVersion(this.version);
this.head = 1;
this.minSeqNum = minSeqNum;
this.elementCount = elementCount;
if (this.elementCount > 0) {
// verify first seqNum to be same as expected minSeqNum
long seqNum = getBuffer().getLong();
if (seqNum != this.minSeqNum) { throw new IOException(String.format("first seqNum=%d is different than minSeqNum=%d", seqNum, this.minSeqNum)); }
// reset back position to first seqNum
getBuffer().position(this.head);
for (int i = 0; i < this.elementCount; i++) {
// verify that seqNum must be of strict + 1 increasing order
readNextElement(this.minSeqNum + i, !VERIFY_CHECKSUM);
}
}
}
// recover will overwrite/update/set this object minSeqNum, capacity and elementCount attributes
// to reflect what it recovered from the page
@Override
public void recover() throws IOException {
getBuffer().position(0);
this.version = getBuffer().get();
validateVersion(this.version);
this.head = 1;
// force minSeqNum to actual first element seqNum
this.minSeqNum = getBuffer().getLong();
// reset back position to first seqNum
getBuffer().position(this.head);
// reset elementCount to 0 and increment to octal number of valid elements found
this.elementCount = 0;
for (int i = 0; ; i++) {
try {
// verify that seqNum must be of strict + 1 increasing order
readNextElement(this.minSeqNum + i, VERIFY_CHECKSUM);
this.elementCount += 1;
} catch (PageIOInvalidElementException e) {
// simply stop at first invalid element
logger.debug("PageIO recovery element index:{}, readNextElement exception: {}", i, e.getMessage());
break;
}
}
// if we were not able to read any element just reset minSeqNum to zero
if (this.elementCount <= 0) {
this.minSeqNum = 0;
}
}
// we don't have different versions yet so simply check if the version is VERSION_ONE for basic integrity check
// and if an unexpected version byte is read throw PageIOInvalidVersionException
private static void validateVersion(byte version) throws PageIOInvalidVersionException {
if (version != VERSION_ONE) {
throw new PageIOInvalidVersionException(String
.format("Expected page version=%d but found version=%d", VERSION_ONE, version));
}
}
// read and validate next element at page head
// @param verifyChecksum if true the actual element data will be read + checksumed and compared to written checksum
private void readNextElement(long expectedSeqNum, boolean verifyChecksum) throws PageIOInvalidElementException {
// if there is no room for the seqNum and length bytes stop here
// TODO: I know this isn't a great exception message but at the time of writing I couldn't come up with anything better :P
if (this.head + SEQNUM_SIZE + LENGTH_SIZE > capacity) { throw new PageIOInvalidElementException(
"cannot read seqNum and length bytes past buffer capacity"); }
int elementOffset = this.head;
int newHead = this.head;
ByteBuffer buffer = getBuffer();
long seqNum = buffer.getLong();
newHead += SEQNUM_SIZE;
if (seqNum != expectedSeqNum) { throw new PageIOInvalidElementException(
String.format("Element seqNum %d is expected to be %d", seqNum, expectedSeqNum)); }
int length = buffer.getInt();
newHead += LENGTH_SIZE;
// length must be > 0
if (length <= 0) { throw new PageIOInvalidElementException("Element invalid length"); }
// if there is no room for the proposed data length and checksum just stop here
if (newHead + length + CHECKSUM_SIZE > capacity) { throw new PageIOInvalidElementException(
"cannot read element payload and checksum past buffer capacity"); }
if (verifyChecksum) {
// read data and compute checksum;
this.checkSummer.reset();
final int prevLimit = buffer.limit();
buffer.limit(buffer.position() + length);
this.checkSummer.update(buffer);
buffer.limit(prevLimit);
int checksum = buffer.getInt();
int computedChecksum = (int) this.checkSummer.getValue();
if (computedChecksum != checksum) { throw new PageIOInvalidElementException(
"Element invalid checksum"); }
}
// at this point we recovered a valid element
this.offsetMap.add(elementOffset);
this.head = newHead + length + CHECKSUM_SIZE;
buffer.position(this.head);
}
@Override
public void create() throws IOException {
getBuffer().position(0);
getBuffer().put(VERSION_ONE);
this.head = 1;
this.minSeqNum = 0L;
this.elementCount = 0;
}
@Override
public void write(byte[] bytes, long seqNum) {
write(bytes, seqNum, bytes.length, checksum(bytes));
}
protected int write(byte[] bytes, long seqNum, int length, int checksum) {
// since writes always happen at head, we can just append head to the offsetMap
assert this.offsetMap.size() == this.elementCount :
String.format("offsetMap size=%d != elementCount=%d", this.offsetMap.size(), this.elementCount);
int initialHead = this.head;
ByteBuffer buffer = getBuffer();
buffer.position(this.head);
buffer.putLong(seqNum);
buffer.putInt(length);
buffer.put(bytes);
buffer.putInt(checksum);
this.head += persistedByteCount(bytes.length);
assert this.head == buffer.position() :
String.format("head=%d != buffer position=%d", this.head, buffer.position());
if (this.elementCount <= 0) {
this.minSeqNum = seqNum;
}
this.offsetMap.add(initialHead);
this.elementCount++;
return initialHead;
}
@Override
public SequencedList<byte[]> read(long seqNum, int limit) throws IOException {
assert seqNum >= this.minSeqNum :
String.format("seqNum=%d < minSeqNum=%d", seqNum, this.minSeqNum);
assert seqNum <= maxSeqNum() :
String.format("seqNum=%d is > maxSeqNum=%d", seqNum, maxSeqNum());
List<byte[]> elements = new ArrayList<>();
final LongVector seqNums = new LongVector(limit);
int offset = this.offsetMap.get((int)(seqNum - this.minSeqNum));
ByteBuffer buffer = getBuffer();
buffer.position(offset);
for (int i = 0; i < limit; i++) {
long readSeqNum = buffer.getLong();
assert readSeqNum == (seqNum + i) :
String.format("unmatched seqNum=%d to readSeqNum=%d", seqNum + i, readSeqNum);
int readLength = buffer.getInt();
byte[] readBytes = new byte[readLength];
buffer.get(readBytes);
int checksum = buffer.getInt();
int computedChecksum = checksum(readBytes);
if (computedChecksum != checksum) {
throw new IOException(String.format("computed checksum=%d != checksum for file=%d", computedChecksum, checksum));
}
elements.add(readBytes);
seqNums.add(readSeqNum);
if (seqNum + i >= maxSeqNum()) {
break;
}
}
return new SequencedList<>(elements, seqNums);
}
@Override
public int getCapacity() { return this.capacity; }
@Override
public long getMinSeqNum() { return this.minSeqNum; }
@Override
public int getElementCount() { return this.elementCount; }
@Override
public boolean hasSpace(int bytes) {
int bytesLeft = this.capacity - this.head;
return persistedByteCount(bytes) <= bytesLeft;
}
@Override
public int persistedByteCount(int byteCount) {
return SEQNUM_SIZE + LENGTH_SIZE + byteCount + CHECKSUM_SIZE;
}
@Override
public int getHead() {
return this.head;
}
protected int checksum(byte[] bytes) {
checkSummer.reset();
checkSummer.update(bytes, 0, bytes.length);
return (int) checkSummer.getValue();
}
private long maxSeqNum() {
return this.minSeqNum + this.elementCount - 1;
}
}

View file

@ -7,9 +7,28 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.CRC32;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.LogstashJavaCompat;
// TODO: this essentially a copy of ByteBufferPageIO and should be DRY'ed - temp impl to test file based stress test
public class MmapPageIO extends AbstractByteBufferPageIO {
import org.logstash.ackedqueue.SequencedList;
public final class MmapPageIO implements PageIO {
public static final byte VERSION_ONE = 1;
public static final int VERSION_SIZE = Byte.BYTES;
public static final int CHECKSUM_SIZE = Integer.BYTES;
public static final int LENGTH_SIZE = Integer.BYTES;
public static final int SEQNUM_SIZE = Long.BYTES;
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
public static final int HEADER_SIZE = 1; // version byte
// Size of: Header + Sequence Number + Length + Checksum
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;
public static final boolean VERIFY_CHECKSUM = true;
private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class);
/**
* Cleaner function for forcing unmapping of backing {@link MmapPageIO#buffer}.
@ -17,21 +36,99 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
private static final ByteBufferCleaner BUFFER_CLEANER =
LogstashJavaCompat.setupBytebufferCleaner();
private File file;
private final File file;
private final CRC32 checkSummer;
private final IntVector offsetMap;
private FileChannel channel;
protected MappedByteBuffer buffer;
private int capacity; // page capacity is an int per the ByteBuffer class.
private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
private int elementCount;
private int head; // head is the write position and is an int per ByteBuffer class position
private byte version;
private MappedByteBuffer buffer;
public MmapPageIO(int pageNum, int capacity, String dirPath) {
super(pageNum, capacity);
this.minSeqNum = 0;
this.elementCount = 0;
this.version = 0;
this.head = 0;
this.capacity = capacity;
this.offsetMap = new IntVector();
this.checkSummer = new CRC32();
this.file = Paths.get(dirPath, "page." + pageNum).toFile();
}
@Override
public void open(long minSeqNum, int elementCount) throws IOException {
mapFile();
super.open(minSeqNum, elementCount);
buffer.position(0);
this.version = buffer.get();
validateVersion(this.version);
this.head = 1;
this.minSeqNum = minSeqNum;
this.elementCount = elementCount;
if (this.elementCount > 0) {
// verify first seqNum to be same as expected minSeqNum
long seqNum = buffer.getLong();
if (seqNum != this.minSeqNum) {
throw new IOException(String.format("first seqNum=%d is different than minSeqNum=%d", seqNum, this.minSeqNum));
}
// reset back position to first seqNum
buffer.position(this.head);
for (int i = 0; i < this.elementCount; i++) {
// verify that seqNum must be of strict + 1 increasing order
readNextElement(this.minSeqNum + i, !VERIFY_CHECKSUM);
}
}
}
@Override
public SequencedList<byte[]> read(long seqNum, int limit) throws IOException {
assert seqNum >= this.minSeqNum :
String.format("seqNum=%d < minSeqNum=%d", seqNum, this.minSeqNum);
assert seqNum <= maxSeqNum() :
String.format("seqNum=%d is > maxSeqNum=%d", seqNum, maxSeqNum());
List<byte[]> elements = new ArrayList<>();
final LongVector seqNums = new LongVector(limit);
int offset = this.offsetMap.get((int) (seqNum - this.minSeqNum));
buffer.position(offset);
for (int i = 0; i < limit; i++) {
long readSeqNum = buffer.getLong();
assert readSeqNum == (seqNum + i) :
String.format("unmatched seqNum=%d to readSeqNum=%d", seqNum + i, readSeqNum);
int readLength = buffer.getInt();
byte[] readBytes = new byte[readLength];
buffer.get(readBytes);
int checksum = buffer.getInt();
int computedChecksum = checksum(readBytes);
if (computedChecksum != checksum) {
throw new IOException(String.format("computed checksum=%d != checksum for file=%d", computedChecksum, checksum));
}
elements.add(readBytes);
seqNums.add(readSeqNum);
if (seqNum + i >= maxSeqNum()) {
break;
}
}
return new SequencedList<>(elements, seqNums);
}
// recover will overwrite/update/set this object minSeqNum, capacity and elementCount attributes
@ -39,27 +136,35 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
@Override
public void recover() throws IOException {
mapFile();
super.recover();
}
buffer.position(0);
this.version = buffer.get();
validateVersion(this.version);
this.head = 1;
// memory map data file to this.buffer and read initial version byte
private void mapFile() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");
// force minSeqNum to actual first element seqNum
this.minSeqNum = buffer.getLong();
// reset back position to first seqNum
buffer.position(this.head);
if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file);
// reset elementCount to 0 and increment to octal number of valid elements found
this.elementCount = 0;
for (int i = 0; ; i++) {
try {
// verify that seqNum must be of strict + 1 increasing order
readNextElement(this.minSeqNum + i, VERIFY_CHECKSUM);
this.elementCount += 1;
} catch (MmapPageIO.PageIOInvalidElementException e) {
// simply stop at first invalid element
LOGGER.debug("PageIO recovery element index:{}, readNextElement exception: {}", i, e.getMessage());
break;
}
}
int pageFileCapacity = (int)raf.length();
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;
if (this.capacity < MIN_CAPACITY) { throw new IOException(String.format("Page file size is too small to hold elements")); }
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
this.buffer.load();
// if we were not able to read any element just reset minSeqNum to zero
if (this.elementCount <= 0) {
this.minSeqNum = 0;
}
}
@Override
@ -68,8 +173,11 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
super.create();
buffer.position(0);
buffer.put(VERSION_ONE);
this.head = 1;
this.minSeqNum = 0L;
this.elementCount = 0;
}
@Override
@ -100,6 +208,11 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
Files.delete(this.file.toPath());
}
@Override
public void write(byte[] bytes, long seqNum) {
write(bytes, seqNum, bytes.length, checksum(bytes));
}
@Override
public void close() throws IOException {
if (this.buffer != null) {
@ -108,7 +221,9 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
}
if (this.channel != null) {
if (this.channel.isOpen()) { this.channel.force(false); }
if (this.channel.isOpen()) {
this.channel.force(false);
}
this.channel.close(); // close can be called multiple times
}
this.channel = null;
@ -116,7 +231,175 @@ public class MmapPageIO extends AbstractByteBufferPageIO {
}
@Override
protected MappedByteBuffer getBuffer() {
return this.buffer;
public int getCapacity() {
return this.capacity;
}
@Override
public long getMinSeqNum() {
return this.minSeqNum;
}
@Override
public int getElementCount() {
return this.elementCount;
}
@Override
public boolean hasSpace(int bytes) {
int bytesLeft = this.capacity - this.head;
return persistedByteCount(bytes) <= bytesLeft;
}
@Override
public int persistedByteCount(int byteCount) {
return SEQNUM_SIZE + LENGTH_SIZE + byteCount + CHECKSUM_SIZE;
}
@Override
public int getHead() {
return this.head;
}
private int checksum(byte[] bytes) {
checkSummer.reset();
checkSummer.update(bytes, 0, bytes.length);
return (int) checkSummer.getValue();
}
private long maxSeqNum() {
return this.minSeqNum + this.elementCount - 1;
}
// memory map data file to this.buffer and read initial version byte
private void mapFile() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");
if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file);
}
int pageFileCapacity = (int) raf.length();
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;
if (this.capacity < MIN_CAPACITY) {
throw new IOException(String.format("Page file size is too small to hold elements"));
}
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
this.buffer.load();
}
// read and validate next element at page head
// @param verifyChecksum if true the actual element data will be read + checksumed and compared to written checksum
private void readNextElement(long expectedSeqNum, boolean verifyChecksum) throws MmapPageIO.PageIOInvalidElementException {
// if there is no room for the seqNum and length bytes stop here
// TODO: I know this isn't a great exception message but at the time of writing I couldn't come up with anything better :P
if (this.head + SEQNUM_SIZE + LENGTH_SIZE > capacity) {
throw new MmapPageIO.PageIOInvalidElementException(
"cannot read seqNum and length bytes past buffer capacity");
}
int elementOffset = this.head;
int newHead = this.head;
long seqNum = buffer.getLong();
newHead += SEQNUM_SIZE;
if (seqNum != expectedSeqNum) {
throw new MmapPageIO.PageIOInvalidElementException(
String.format("Element seqNum %d is expected to be %d", seqNum, expectedSeqNum));
}
int length = buffer.getInt();
newHead += LENGTH_SIZE;
// length must be > 0
if (length <= 0) {
throw new MmapPageIO.PageIOInvalidElementException("Element invalid length");
}
// if there is no room for the proposed data length and checksum just stop here
if (newHead + length + CHECKSUM_SIZE > capacity) {
throw new MmapPageIO.PageIOInvalidElementException(
"cannot read element payload and checksum past buffer capacity");
}
if (verifyChecksum) {
// read data and compute checksum;
this.checkSummer.reset();
final int prevLimit = buffer.limit();
buffer.limit(buffer.position() + length);
this.checkSummer.update(buffer);
buffer.limit(prevLimit);
int checksum = buffer.getInt();
int computedChecksum = (int) this.checkSummer.getValue();
if (computedChecksum != checksum) {
throw new MmapPageIO.PageIOInvalidElementException(
"Element invalid checksum");
}
}
// at this point we recovered a valid element
this.offsetMap.add(elementOffset);
this.head = newHead + length + CHECKSUM_SIZE;
buffer.position(this.head);
}
private int write(byte[] bytes, long seqNum, int length, int checksum) {
// since writes always happen at head, we can just append head to the offsetMap
assert this.offsetMap.size() == this.elementCount :
String.format("offsetMap size=%d != elementCount=%d", this.offsetMap.size(), this.elementCount);
int initialHead = this.head;
buffer.position(this.head);
buffer.putLong(seqNum);
buffer.putInt(length);
buffer.put(bytes);
buffer.putInt(checksum);
this.head += persistedByteCount(bytes.length);
assert this.head == buffer.position() :
String.format("head=%d != buffer position=%d", this.head, buffer.position());
if (this.elementCount <= 0) {
this.minSeqNum = seqNum;
}
this.offsetMap.add(initialHead);
this.elementCount++;
return initialHead;
}
// we don't have different versions yet so simply check if the version is VERSION_ONE for basic integrity check
// and if an unexpected version byte is read throw PageIOInvalidVersionException
private static void validateVersion(byte version)
throws MmapPageIO.PageIOInvalidVersionException {
if (version != VERSION_ONE) {
throw new MmapPageIO.PageIOInvalidVersionException(String
.format("Expected page version=%d but found version=%d", VERSION_ONE, version));
}
}
public static final class PageIOInvalidElementException extends IOException {
private static final long serialVersionUID = 1L;
public PageIOInvalidElementException(String message) {
super(message);
}
}
public static final class PageIOInvalidVersionException extends IOException {
private static final long serialVersionUID = 1L;
public PageIOInvalidVersionException(String message) {
super(message);
}
}
}

View file

@ -23,8 +23,8 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
import org.logstash.ackedqueue.io.LongVector;
import org.logstash.ackedqueue.io.MmapPageIO;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@ -91,7 +91,7 @@ public class QueueTest {
@Test(timeout = 5000)
public void writeToFullyAckedHeadpage() throws IOException {
final Queueable element = new StringElement("foobarbaz");
final int page = element.serialize().length * 2 + AbstractByteBufferPageIO.MIN_CAPACITY;
final int page = element.serialize().length * 2 + MmapPageIO.MIN_CAPACITY;
// Queue that can only hold one element per page.
try (Queue q = new Queue(
TestSettings.persistedQueueSettings(page, page * 2 - 1, dataPath))) {

View file

@ -1,7 +1,7 @@
package org.logstash.ackedqueue;
import java.io.IOException;
import org.logstash.ackedqueue.io.AbstractByteBufferPageIO;
import org.logstash.ackedqueue.io.MmapPageIO;
/**
* Class containing common methods to help DRY up acked queue tests.
@ -15,6 +15,6 @@ public class QueueTestHelpers {
* @throws IOException Throws if a serialization error occurs
*/
public static int singleElementCapacityForByteBufferPageIO(final Queueable element) throws IOException {
return AbstractByteBufferPageIO.WRAPPER_SIZE + element.serialize().length;
return MmapPageIO.WRAPPER_SIZE + element.serialize().length;
}
}