diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index 583a3d306..d775c12c2 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -63,14 +63,17 @@ import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; */ public final class RecordIOReader implements Closeable { + enum SegmentStatus { EMPTY, VALID, INVALID } + private static final Logger logger = LogManager.getLogger(RecordIOReader.class); + private static final int UNSET = -1; + private final FileChannel channel; + private final Path path; + private ByteBuffer currentBlock; private int currentBlockSizeReadFromChannel; - private final Path path; - private long channelPosition; - private static final int UNSET = -1; - enum SegmentStatus { EMPTY, VALID, INVALID} + private long streamPosition; public RecordIOReader(Path path) throws IOException { this.path = path; @@ -87,7 +90,7 @@ public final class RecordIOReader implements Closeable { "Invalid version on DLQ data file %s. Expected version: %c. Version found on file: %c", path, VERSION, versionInFile)); } - this.channelPosition = this.channel.position(); + this.streamPosition = this.channel.position(); } public Path getPath() { @@ -95,20 +98,28 @@ public final class RecordIOReader implements Closeable { } public void seekToBlock(int bid) throws IOException { - seekToOffset(bid * BLOCK_SIZE + VERSION_SIZE); + seekToOffset(bid * (long) BLOCK_SIZE + VERSION_SIZE); } public void seekToOffset(long channelOffset) throws IOException { currentBlock.rewind(); - currentBlockSizeReadFromChannel = 0; - channel.position(channelOffset); - channelPosition = channel.position(); + + // align to block boundary and move from that with relative positioning + long segmentOffset = channelOffset - VERSION_SIZE; + long blockIndex = segmentOffset / BLOCK_SIZE; + long blockStartOffset = blockIndex * BLOCK_SIZE; + channel.position(blockStartOffset + VERSION_SIZE); + int readBytes = channel.read(currentBlock); + currentBlockSizeReadFromChannel = readBytes; + currentBlock.position((int) segmentOffset % BLOCK_SIZE); + streamPosition = channelOffset; } public byte[] seekToNextEventPosition(T target, Function keyExtractor, Comparator keyComparator) throws IOException { int matchingBlock = UNSET; int lowBlock = 0; - int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE; + // blocks are 0-based so the highest is the number of blocks - 1 + int highBlock = ((int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE) - 1; while (lowBlock < highBlock) { int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0); @@ -155,25 +166,26 @@ public final class RecordIOReader implements Closeable { } public long getChannelPosition() { - return channelPosition; + return streamPosition; } void consumeBlock(boolean rewind) throws IOException { - if (rewind) { - currentBlockSizeReadFromChannel = 0; - currentBlock.rewind(); - } else if (currentBlockSizeReadFromChannel == BLOCK_SIZE) { + if (!rewind && currentBlockSizeReadFromChannel == BLOCK_SIZE) { // already read enough, no need to read more return; } - int processedPosition = currentBlock.position(); + if (rewind) { + currentBlockSizeReadFromChannel = 0; + currentBlock.rewind(); + } + currentBlock.mark(); try { // Move to last written to position currentBlock.position(currentBlockSizeReadFromChannel); channel.read(currentBlock); currentBlockSizeReadFromChannel = currentBlock.position(); } finally { - currentBlock.position(processedPosition); + currentBlock.reset(); } } @@ -190,23 +202,30 @@ public final class RecordIOReader implements Closeable { */ int seekToStartOfEventInBlock() { // Already consumed all the bytes in this block. - if (currentBlock.position() >= currentBlockSizeReadFromChannel){ + if (currentBlock.position() >= currentBlockSizeReadFromChannel) { return -1; } while (true) { RecordType type = RecordType.fromByte(currentBlock.array()[currentBlock.arrayOffset() + currentBlock.position()]); - if (RecordType.COMPLETE.equals(type) || RecordType.START.equals(type)) { - return currentBlock.position(); - } else if (RecordType.END.equals(type)) { - RecordHeader header = RecordHeader.get(currentBlock); - currentBlock.position(currentBlock.position() + header.getSize()); - // If this is the end of stream, then cannot seek to start of block - if (this.isEndOfStream()){ - return -1; - } - } else { + if (type == null) { return -1; } + switch (type) { + case COMPLETE: + case START: + return currentBlock.position(); + case MIDDLE: + return -1; + case END: + // reached END record, move forward to the next record in the block if it's present + RecordHeader header = RecordHeader.get(currentBlock); + currentBlock.position(currentBlock.position() + header.getSize()); + // If this is the end of stream, then cannot seek to start of block + if (this.isEndOfStream()) { + return -1; + } + break; + } } } @@ -234,11 +253,12 @@ public final class RecordIOReader implements Closeable { private void maybeRollToNextBlock() throws IOException { // check block position state if (currentBlock.remaining() < RECORD_HEADER_SIZE + 1) { + streamPosition = this.channel.position(); consumeBlock(true); } } - private void getRecord(ByteBuffer buffer, RecordHeader header) { + private void getRecord(ByteBuffer buffer, RecordHeader header) throws IOException { Checksum computedChecksum = new CRC32(); computedChecksum.update(currentBlock.array(), currentBlock.position(), header.getSize()); @@ -248,6 +268,13 @@ public final class RecordIOReader implements Closeable { buffer.put(currentBlock.array(), currentBlock.position(), header.getSize()); currentBlock.position(currentBlock.position() + header.getSize()); + if (currentBlock.remaining() < RECORD_HEADER_SIZE + 1) { + // if the block buffer doesn't contain enough space for another record + // update position to last channel position. + streamPosition = channel.position(); + } else { + streamPosition += header.getSize(); + } } public byte[] readEvent() throws IOException { @@ -256,6 +283,7 @@ public final class RecordIOReader implements Closeable { return null; } RecordHeader header = RecordHeader.get(currentBlock); + streamPosition += RECORD_HEADER_SIZE; int cumReadSize = 0; int bufferSize = header.getTotalEventSize().orElseGet(header::getSize); ByteBuffer buffer = ByteBuffer.allocate(bufferSize); @@ -264,16 +292,13 @@ public final class RecordIOReader implements Closeable { while (cumReadSize < bufferSize) { maybeRollToNextBlock(); RecordHeader nextHeader = RecordHeader.get(currentBlock); + streamPosition += RECORD_HEADER_SIZE; getRecord(buffer, nextHeader); cumReadSize += nextHeader.getSize(); } return buffer.array(); } catch (ClosedByInterruptException e) { return null; - } finally { - if (channel.isOpen()) { - channelPosition = channel.position(); - } } } @@ -295,7 +320,7 @@ public final class RecordIOReader implements Closeable { this.currentBlock = ByteBuffer.wrap(bufferState.blockContents); this.currentBlock.position(bufferState.currentBlockPosition); this.channel.position(bufferState.channelPosition); - this.channelPosition = channel.position(); + this.streamPosition = channel.position(); this.currentBlockSizeReadFromChannel = bufferState.currentBlockSizeReadFromChannel; } @@ -305,7 +330,7 @@ public final class RecordIOReader implements Closeable { private long channelPosition; private byte[] blockContents; - BufferState(Builder builder){ + BufferState(Builder builder) { this.currentBlockPosition = builder.currentBlockPosition; this.currentBlockSizeReadFromChannel = builder.currentBlockSizeReadFromChannel; this.channelPosition = builder.channelPosition; @@ -317,28 +342,28 @@ public final class RecordIOReader implements Closeable { currentBlockPosition, currentBlockSizeReadFromChannel, channelPosition); } - final static class Builder{ + final static class Builder { private int currentBlockPosition; private int currentBlockSizeReadFromChannel; private long channelPosition; private byte[] blockContents; - Builder currentBlockPosition(final int currentBlockPosition){ + Builder currentBlockPosition(final int currentBlockPosition) { this.currentBlockPosition = currentBlockPosition; return this; } - Builder currentBlockSizeReadFromChannel(final int currentBlockSizeReadFromChannel){ + Builder currentBlockSizeReadFromChannel(final int currentBlockSizeReadFromChannel) { this.currentBlockSizeReadFromChannel = currentBlockSizeReadFromChannel; return this; } - Builder channelPosition(final long channelPosition){ + Builder channelPosition(final long channelPosition) { this.channelPosition = channelPosition; return this; } - Builder blockContents(final byte[] blockContents){ + Builder blockContents(final byte[] blockContents) { this.blockContents = blockContents; return this; } @@ -364,7 +389,7 @@ public final class RecordIOReader implements Closeable { if (moreEvents) segmentStatus = SegmentStatus.VALID; } return segmentStatus; - } catch (IOException | IllegalStateException e){ + } catch (IOException | IllegalStateException e) { logger.warn("Error reading segment file {}", path, e); return SegmentStatus.INVALID; } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index b77c7ff7c..e6b85c962 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -31,7 +31,11 @@ import org.logstash.Timestamp; import org.logstash.ackedqueue.StringElement; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -44,10 +48,15 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; +import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; public class DeadLetterQueueReaderTest { + public static final int INTERNAL_FRAG_PAYLOAD_SIZE = BLOCK_SIZE - RECORD_HEADER_SIZE - 5; private Path dir; private int defaultDlqSize = 100_000_000; // 100mb @@ -516,6 +525,136 @@ public class DeadLetterQueueReaderTest { } } + @Test + public void testStoreReaderPositionAndRestart() throws IOException, InterruptedException { + // write some data into a segment file + Path segmentPath = dir.resolve(segmentFileName(0)); + RecordIOWriter writer = new RecordIOWriter(segmentPath); + for (int j = 0; j < 10; j++) { + writer.writeEvent((new StringElement("" + j)).serialize()); + } + writer.close(); + + // read the first event and save read position + Path currentSegment; + long currentPosition; + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals("0", new String(rawStr, StandardCharsets.UTF_8)); + currentSegment = reader.getCurrentSegment(); + currentPosition = reader.getCurrentPosition(); + } + + // reopen the reader from the last saved position and read next element + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + reader.setCurrentReaderAndPosition(currentSegment, currentPosition); + + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals("1", new String(rawStr, StandardCharsets.UTF_8)); + } + } + + @Test + public void testReaderWithBlockInternalFragmentation() throws IOException, InterruptedException { + writeSegmentWithFirstBlockContainingInternalFragmentation(); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals(stringOf(INTERNAL_FRAG_PAYLOAD_SIZE, 'A'), new String(rawStr, StandardCharsets.UTF_8)); + + rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8)); + } + } + + private static String stringOf(int length, char ch) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) { + sb.append(ch); + } + return sb.toString(); + } + + @Test + public void testStoreReaderPositionWithBlocksWithInternalFragmentation() throws IOException, InterruptedException { + writeSegmentWithFirstBlockContainingInternalFragmentation(); + + // read the first event and save read position + Path currentSegment; + long currentPosition; + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals(stringOf(INTERNAL_FRAG_PAYLOAD_SIZE, 'A'), new String(rawStr, StandardCharsets.UTF_8)); + currentSegment = reader.getCurrentSegment(); + currentPosition = reader.getCurrentPosition(); + } + + // reopen the reader from the last saved position and read next element + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + reader.setCurrentReaderAndPosition(currentSegment, currentPosition); + + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8)); + } + } + + @Test + public void testStoreReaderPositionWithBlocksWithInternalFragmentationOnceMessageIsBiggerThenBlock() throws IOException, InterruptedException { + final int payloadSize = INTERNAL_FRAG_PAYLOAD_SIZE + BLOCK_SIZE; + byte[] almostFullBlockPayload = new byte[payloadSize]; + Arrays.fill(almostFullBlockPayload, (byte) 'A'); + Path segmentPath = dir.resolve(segmentFileName(0)); + RecordIOWriter writer = new RecordIOWriter(segmentPath); + writer.writeEvent(almostFullBlockPayload); + + // write a second segment with small payload + byte[] smallPayload = new byte[10]; + Arrays.fill(smallPayload, (byte) 'B'); + writer.writeEvent(smallPayload); + + writer.close(); + + // read the first event and save read position + Path currentSegment; + long currentPosition; + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals(stringOf(payloadSize, 'A'), new String(rawStr, StandardCharsets.UTF_8)); + currentSegment = reader.getCurrentSegment(); + currentPosition = reader.getCurrentPosition(); + } + + // reopen the reader from the last saved position and read next element + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + reader.setCurrentReaderAndPosition(currentSegment, currentPosition); + + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertEquals("BBBBBBBBBB", new String(rawStr, StandardCharsets.UTF_8)); + } + } + + private void writeSegmentWithFirstBlockContainingInternalFragmentation() throws IOException { + byte[] almostFullBlockPayload = new byte[INTERNAL_FRAG_PAYLOAD_SIZE]; + Arrays.fill(almostFullBlockPayload, (byte) 'A'); + Path segmentPath = dir.resolve(segmentFileName(0)); + RecordIOWriter writer = new RecordIOWriter(segmentPath); + writer.writeEvent(almostFullBlockPayload); + + // write a second segment with small payload + byte[] smallPayload = new byte[10]; + Arrays.fill(smallPayload, (byte) 'B'); + writer.writeEvent(smallPayload); + + writer.close(); + } private int randomBetween(int from, int to){ Random r = new Random(); @@ -554,4 +693,20 @@ public class DeadLetterQueueReaderTest { } } } + + @Test + public void testRestartFromCommitPointRealData() throws IOException, InterruptedException, URISyntaxException { + URL url = this.getClass().getResource("1.log"); + Path path = Paths.get(url.toURI()); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(path.getParent())) { + reader.setCurrentReaderAndPosition(path, 0x3593F0); + + for (int i = 0; i < 10_000 - 3_376; i++) { + byte[] rawStr = reader.pollEntryBytes(); + assertNotNull(rawStr); + assertThat(new String(rawStr, StandardCharsets.UTF_8), containsString("Could not index event to Elasticsearch. status: 400")); + } + } + } } diff --git a/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java index 4dacb14f1..29f216ac7 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java @@ -93,7 +93,6 @@ public class RecordIOReaderTest { RecordIOReader reader = new RecordIOReader(file); reader.seekToBlock(1); - reader.consumeBlock(true); assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026)); reader.close(); @@ -157,24 +156,28 @@ public class RecordIOReaderTest { int blocks = (int)Math.ceil(expectedSize / (double)BLOCK_SIZE); int fillSize = expectedSize - (blocks * RECORD_HEADER_SIZE); - try(RecordIOWriter writer = new RecordIOWriter(file)){ + try (RecordIOWriter writer = new RecordIOWriter(file)) { for (char i = 0; i < eventCount; i++) { char[] blockSize = fillArray(fillSize); blockSize[0] = i; - assertThat(writer.writeEvent(new StringElement(new String(blockSize)).serialize()), is((long)expectedSize)); + byte[] payload = new StringElement(new String(blockSize)).serialize(); + assertThat(writer.writeEvent(payload), is((long)expectedSize)); } } - try(RecordIOReader reader = new RecordIOReader(file)) { - Function toChar = (b) -> (char) ByteBuffer.wrap(b).get(0); - + try (RecordIOReader reader = new RecordIOReader(file)) { + Comparator charComparator = Comparator.comparing(Function.identity()); for (char i = 0; i < eventCount; i++) { - reader.seekToNextEventPosition(i, toChar, Comparator.comparing(o -> o)); - assertThat(toChar.apply(reader.readEvent()), equalTo(i)); + reader.seekToNextEventPosition(i, RecordIOReaderTest::extractFirstChar, charComparator); + assertThat(extractFirstChar(reader.readEvent()), equalTo(i)); } } } + private static Character extractFirstChar(byte[] b) { + return (char) ByteBuffer.wrap(b).get(0); + } + @Test public void testObviouslyInvalidSegment() throws Exception { assertThat(RecordIOReader.getSegmentStatus(file), is(RecordIOReader.SegmentStatus.INVALID)); diff --git a/logstash-core/src/test/resources/org/logstash/common/io/1.log b/logstash-core/src/test/resources/org/logstash/common/io/1.log new file mode 100644 index 000000000..39c7bdd04 Binary files /dev/null and b/logstash-core/src/test/resources/org/logstash/common/io/1.log differ