Backport PR #14093 to 7.17:Fix of DLQ stream position retrieval

Adds some tests to proof that the offset position retrieved from the `RecordIOReader` effectively point to the start of the next available event and not to the last channel position retrieved.
Adds fixes for the problem, moving the concept of `channelPosition` to `streamPosition` differentiating the position on the stream from the position on the channel. However the already published interface (method getChannelPosition) is not renamed to avoid the introduction of a breaking change.
This commit is contained in:
Andrea Selva 2022-05-17 15:36:26 +02:00 committed by GitHub
parent bd39da7b2c
commit 7de598490d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 232 additions and 49 deletions

View file

@ -63,14 +63,17 @@ import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
*/
public final class RecordIOReader implements Closeable {
enum SegmentStatus { EMPTY, VALID, INVALID }
private static final Logger logger = LogManager.getLogger(RecordIOReader.class);
private static final int UNSET = -1;
private final FileChannel channel;
private final Path path;
private ByteBuffer currentBlock;
private int currentBlockSizeReadFromChannel;
private final Path path;
private long channelPosition;
private static final int UNSET = -1;
enum SegmentStatus { EMPTY, VALID, INVALID}
private long streamPosition;
public RecordIOReader(Path path) throws IOException {
this.path = path;
@ -87,7 +90,7 @@ public final class RecordIOReader implements Closeable {
"Invalid version on DLQ data file %s. Expected version: %c. Version found on file: %c",
path, VERSION, versionInFile));
}
this.channelPosition = this.channel.position();
this.streamPosition = this.channel.position();
}
public Path getPath() {
@ -95,20 +98,28 @@ public final class RecordIOReader implements Closeable {
}
public void seekToBlock(int bid) throws IOException {
seekToOffset(bid * BLOCK_SIZE + VERSION_SIZE);
seekToOffset(bid * (long) BLOCK_SIZE + VERSION_SIZE);
}
public void seekToOffset(long channelOffset) throws IOException {
currentBlock.rewind();
currentBlockSizeReadFromChannel = 0;
channel.position(channelOffset);
channelPosition = channel.position();
// align to block boundary and move from that with relative positioning
long segmentOffset = channelOffset - VERSION_SIZE;
long blockIndex = segmentOffset / BLOCK_SIZE;
long blockStartOffset = blockIndex * BLOCK_SIZE;
channel.position(blockStartOffset + VERSION_SIZE);
int readBytes = channel.read(currentBlock);
currentBlockSizeReadFromChannel = readBytes;
currentBlock.position((int) segmentOffset % BLOCK_SIZE);
streamPosition = channelOffset;
}
public <T> byte[] seekToNextEventPosition(T target, Function<byte[], T> keyExtractor, Comparator<T> keyComparator) throws IOException {
int matchingBlock = UNSET;
int lowBlock = 0;
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
// blocks are 0-based so the highest is the number of blocks - 1
int highBlock = ((int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE) - 1;
while (lowBlock < highBlock) {
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
@ -155,25 +166,26 @@ public final class RecordIOReader implements Closeable {
}
public long getChannelPosition() {
return channelPosition;
return streamPosition;
}
void consumeBlock(boolean rewind) throws IOException {
if (rewind) {
currentBlockSizeReadFromChannel = 0;
currentBlock.rewind();
} else if (currentBlockSizeReadFromChannel == BLOCK_SIZE) {
if (!rewind && currentBlockSizeReadFromChannel == BLOCK_SIZE) {
// already read enough, no need to read more
return;
}
int processedPosition = currentBlock.position();
if (rewind) {
currentBlockSizeReadFromChannel = 0;
currentBlock.rewind();
}
currentBlock.mark();
try {
// Move to last written to position
currentBlock.position(currentBlockSizeReadFromChannel);
channel.read(currentBlock);
currentBlockSizeReadFromChannel = currentBlock.position();
} finally {
currentBlock.position(processedPosition);
currentBlock.reset();
}
}
@ -190,22 +202,29 @@ public final class RecordIOReader implements Closeable {
*/
int seekToStartOfEventInBlock() {
// Already consumed all the bytes in this block.
if (currentBlock.position() >= currentBlockSizeReadFromChannel){
if (currentBlock.position() >= currentBlockSizeReadFromChannel) {
return -1;
}
while (true) {
RecordType type = RecordType.fromByte(currentBlock.array()[currentBlock.arrayOffset() + currentBlock.position()]);
if (RecordType.COMPLETE.equals(type) || RecordType.START.equals(type)) {
if (type == null) {
return -1;
}
switch (type) {
case COMPLETE:
case START:
return currentBlock.position();
} else if (RecordType.END.equals(type)) {
case MIDDLE:
return -1;
case END:
// reached END record, move forward to the next record in the block if it's present
RecordHeader header = RecordHeader.get(currentBlock);
currentBlock.position(currentBlock.position() + header.getSize());
// If this is the end of stream, then cannot seek to start of block
if (this.isEndOfStream()){
if (this.isEndOfStream()) {
return -1;
}
} else {
return -1;
break;
}
}
}
@ -234,11 +253,12 @@ public final class RecordIOReader implements Closeable {
private void maybeRollToNextBlock() throws IOException {
// check block position state
if (currentBlock.remaining() < RECORD_HEADER_SIZE + 1) {
streamPosition = this.channel.position();
consumeBlock(true);
}
}
private void getRecord(ByteBuffer buffer, RecordHeader header) {
private void getRecord(ByteBuffer buffer, RecordHeader header) throws IOException {
Checksum computedChecksum = new CRC32();
computedChecksum.update(currentBlock.array(), currentBlock.position(), header.getSize());
@ -248,6 +268,13 @@ public final class RecordIOReader implements Closeable {
buffer.put(currentBlock.array(), currentBlock.position(), header.getSize());
currentBlock.position(currentBlock.position() + header.getSize());
if (currentBlock.remaining() < RECORD_HEADER_SIZE + 1) {
// if the block buffer doesn't contain enough space for another record
// update position to last channel position.
streamPosition = channel.position();
} else {
streamPosition += header.getSize();
}
}
public byte[] readEvent() throws IOException {
@ -256,6 +283,7 @@ public final class RecordIOReader implements Closeable {
return null;
}
RecordHeader header = RecordHeader.get(currentBlock);
streamPosition += RECORD_HEADER_SIZE;
int cumReadSize = 0;
int bufferSize = header.getTotalEventSize().orElseGet(header::getSize);
ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
@ -264,16 +292,13 @@ public final class RecordIOReader implements Closeable {
while (cumReadSize < bufferSize) {
maybeRollToNextBlock();
RecordHeader nextHeader = RecordHeader.get(currentBlock);
streamPosition += RECORD_HEADER_SIZE;
getRecord(buffer, nextHeader);
cumReadSize += nextHeader.getSize();
}
return buffer.array();
} catch (ClosedByInterruptException e) {
return null;
} finally {
if (channel.isOpen()) {
channelPosition = channel.position();
}
}
}
@ -295,7 +320,7 @@ public final class RecordIOReader implements Closeable {
this.currentBlock = ByteBuffer.wrap(bufferState.blockContents);
this.currentBlock.position(bufferState.currentBlockPosition);
this.channel.position(bufferState.channelPosition);
this.channelPosition = channel.position();
this.streamPosition = channel.position();
this.currentBlockSizeReadFromChannel = bufferState.currentBlockSizeReadFromChannel;
}
@ -305,7 +330,7 @@ public final class RecordIOReader implements Closeable {
private long channelPosition;
private byte[] blockContents;
BufferState(Builder builder){
BufferState(Builder builder) {
this.currentBlockPosition = builder.currentBlockPosition;
this.currentBlockSizeReadFromChannel = builder.currentBlockSizeReadFromChannel;
this.channelPosition = builder.channelPosition;
@ -317,28 +342,28 @@ public final class RecordIOReader implements Closeable {
currentBlockPosition, currentBlockSizeReadFromChannel, channelPosition);
}
final static class Builder{
final static class Builder {
private int currentBlockPosition;
private int currentBlockSizeReadFromChannel;
private long channelPosition;
private byte[] blockContents;
Builder currentBlockPosition(final int currentBlockPosition){
Builder currentBlockPosition(final int currentBlockPosition) {
this.currentBlockPosition = currentBlockPosition;
return this;
}
Builder currentBlockSizeReadFromChannel(final int currentBlockSizeReadFromChannel){
Builder currentBlockSizeReadFromChannel(final int currentBlockSizeReadFromChannel) {
this.currentBlockSizeReadFromChannel = currentBlockSizeReadFromChannel;
return this;
}
Builder channelPosition(final long channelPosition){
Builder channelPosition(final long channelPosition) {
this.channelPosition = channelPosition;
return this;
}
Builder blockContents(final byte[] blockContents){
Builder blockContents(final byte[] blockContents) {
this.blockContents = blockContents;
return this;
}
@ -364,7 +389,7 @@ public final class RecordIOReader implements Closeable {
if (moreEvents) segmentStatus = SegmentStatus.VALID;
}
return segmentStatus;
} catch (IOException | IllegalStateException e){
} catch (IOException | IllegalStateException e) {
logger.warn("Error reading segment file {}", path, e);
return SegmentStatus.INVALID;
}

View file

@ -31,7 +31,11 @@ import org.logstash.Timestamp;
import org.logstash.ackedqueue.StringElement;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@ -44,10 +48,15 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
public class DeadLetterQueueReaderTest {
public static final int INTERNAL_FRAG_PAYLOAD_SIZE = BLOCK_SIZE - RECORD_HEADER_SIZE - 5;
private Path dir;
private int defaultDlqSize = 100_000_000; // 100mb
@ -516,6 +525,136 @@ public class DeadLetterQueueReaderTest {
}
}
@Test
public void testStoreReaderPositionAndRestart() throws IOException, InterruptedException {
// write some data into a segment file
Path segmentPath = dir.resolve(segmentFileName(0));
RecordIOWriter writer = new RecordIOWriter(segmentPath);
for (int j = 0; j < 10; j++) {
writer.writeEvent((new StringElement("" + j)).serialize());
}
writer.close();
// read the first event and save read position
Path currentSegment;
long currentPosition;
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals("0", new String(rawStr, StandardCharsets.UTF_8));
currentSegment = reader.getCurrentSegment();
currentPosition = reader.getCurrentPosition();
}
// reopen the reader from the last saved position and read next element
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
reader.setCurrentReaderAndPosition(currentSegment, currentPosition);
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals("1", new String(rawStr, StandardCharsets.UTF_8));
}
}
@Test
public void testReaderWithBlockInternalFragmentation() throws IOException, InterruptedException {
writeSegmentWithFirstBlockContainingInternalFragmentation();
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals(stringOf(INTERNAL_FRAG_PAYLOAD_SIZE, 'A'), new String(rawStr, StandardCharsets.UTF_8));
rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8));
}
}
private static String stringOf(int length, char ch) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
sb.append(ch);
}
return sb.toString();
}
@Test
public void testStoreReaderPositionWithBlocksWithInternalFragmentation() throws IOException, InterruptedException {
writeSegmentWithFirstBlockContainingInternalFragmentation();
// read the first event and save read position
Path currentSegment;
long currentPosition;
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals(stringOf(INTERNAL_FRAG_PAYLOAD_SIZE, 'A'), new String(rawStr, StandardCharsets.UTF_8));
currentSegment = reader.getCurrentSegment();
currentPosition = reader.getCurrentPosition();
}
// reopen the reader from the last saved position and read next element
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
reader.setCurrentReaderAndPosition(currentSegment, currentPosition);
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8));
}
}
@Test
public void testStoreReaderPositionWithBlocksWithInternalFragmentationOnceMessageIsBiggerThenBlock() throws IOException, InterruptedException {
final int payloadSize = INTERNAL_FRAG_PAYLOAD_SIZE + BLOCK_SIZE;
byte[] almostFullBlockPayload = new byte[payloadSize];
Arrays.fill(almostFullBlockPayload, (byte) 'A');
Path segmentPath = dir.resolve(segmentFileName(0));
RecordIOWriter writer = new RecordIOWriter(segmentPath);
writer.writeEvent(almostFullBlockPayload);
// write a second segment with small payload
byte[] smallPayload = new byte[10];
Arrays.fill(smallPayload, (byte) 'B');
writer.writeEvent(smallPayload);
writer.close();
// read the first event and save read position
Path currentSegment;
long currentPosition;
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals(stringOf(payloadSize, 'A'), new String(rawStr, StandardCharsets.UTF_8));
currentSegment = reader.getCurrentSegment();
currentPosition = reader.getCurrentPosition();
}
// reopen the reader from the last saved position and read next element
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
reader.setCurrentReaderAndPosition(currentSegment, currentPosition);
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8));
}
}
private void writeSegmentWithFirstBlockContainingInternalFragmentation() throws IOException {
byte[] almostFullBlockPayload = new byte[INTERNAL_FRAG_PAYLOAD_SIZE];
Arrays.fill(almostFullBlockPayload, (byte) 'A');
Path segmentPath = dir.resolve(segmentFileName(0));
RecordIOWriter writer = new RecordIOWriter(segmentPath);
writer.writeEvent(almostFullBlockPayload);
// write a second segment with small payload
byte[] smallPayload = new byte[10];
Arrays.fill(smallPayload, (byte) 'B');
writer.writeEvent(smallPayload);
writer.close();
}
private int randomBetween(int from, int to){
Random r = new Random();
@ -554,4 +693,20 @@ public class DeadLetterQueueReaderTest {
}
}
}
@Test
public void testRestartFromCommitPointRealData() throws IOException, InterruptedException, URISyntaxException {
URL url = this.getClass().getResource("1.log");
Path path = Paths.get(url.toURI());
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(path.getParent())) {
reader.setCurrentReaderAndPosition(path, 0x3593F0);
for (int i = 0; i < 10_000 - 3_376; i++) {
byte[] rawStr = reader.pollEntryBytes();
assertNotNull(rawStr);
assertThat(new String(rawStr, StandardCharsets.UTF_8), containsString("Could not index event to Elasticsearch. status: 400"));
}
}
}
}

View file

@ -93,7 +93,6 @@ public class RecordIOReaderTest {
RecordIOReader reader = new RecordIOReader(file);
reader.seekToBlock(1);
reader.consumeBlock(true);
assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026));
reader.close();
@ -157,24 +156,28 @@ public class RecordIOReaderTest {
int blocks = (int)Math.ceil(expectedSize / (double)BLOCK_SIZE);
int fillSize = expectedSize - (blocks * RECORD_HEADER_SIZE);
try(RecordIOWriter writer = new RecordIOWriter(file)){
try (RecordIOWriter writer = new RecordIOWriter(file)) {
for (char i = 0; i < eventCount; i++) {
char[] blockSize = fillArray(fillSize);
blockSize[0] = i;
assertThat(writer.writeEvent(new StringElement(new String(blockSize)).serialize()), is((long)expectedSize));
byte[] payload = new StringElement(new String(blockSize)).serialize();
assertThat(writer.writeEvent(payload), is((long)expectedSize));
}
}
try(RecordIOReader reader = new RecordIOReader(file)) {
Function<byte[], Character> toChar = (b) -> (char) ByteBuffer.wrap(b).get(0);
try (RecordIOReader reader = new RecordIOReader(file)) {
Comparator<Character> charComparator = Comparator.comparing(Function.identity());
for (char i = 0; i < eventCount; i++) {
reader.seekToNextEventPosition(i, toChar, Comparator.comparing(o -> o));
assertThat(toChar.apply(reader.readEvent()), equalTo(i));
reader.seekToNextEventPosition(i, RecordIOReaderTest::extractFirstChar, charComparator);
assertThat(extractFirstChar(reader.readEvent()), equalTo(i));
}
}
}
private static Character extractFirstChar(byte[] b) {
return (char) ByteBuffer.wrap(b).get(0);
}
@Test
public void testObviouslyInvalidSegment() throws Exception {
assertThat(RecordIOReader.getSegmentStatus(file), is(RecordIOReader.SegmentStatus.INVALID));