diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index c867dfdf1..22701cf65 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -145,6 +145,10 @@ public final class RecordIOReader implements Closeable { * */ int seekToStartOfEventInBlock() { + // Already consumed all the bytes in this block. + if (currentBlock.position() == currentBlockSizeReadFromChannel){ + return -1; + } while (true) { RecordType type = RecordType.fromByte(currentBlock.array()[currentBlock.arrayOffset() + currentBlock.position()]); if (RecordType.COMPLETE.equals(type) || RecordType.START.equals(type)) { @@ -156,7 +160,7 @@ public final class RecordIOReader implements Closeable { return -1; } } - } + } /** * diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index 3f477d825..f20dc28c3 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -31,12 +31,15 @@ import org.logstash.ackedqueue.StringElement; import java.io.IOException; import java.nio.file.Path; +import java.util.Arrays; import java.util.Collections; +import java.util.Random; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; public class DeadLetterQueueReaderTest { private Path dir; @@ -124,6 +127,124 @@ public class DeadLetterQueueReaderTest { } + // Notes on these tests: + // These tests are designed to test specific edge cases where events end at block boundaries, hence the specific + // sizes of the char arrays being used to pad the events + + // This test tests for a single event that ends on a block boundary + @Test + public void testBlockBoundary() throws Exception { + + final int PAD_FOR_BLOCK_SIZE_EVENT = 32616; + Event event = new Event(); + char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT]; + Arrays.fill(field, 'e'); + event.setField("T", new String(field)); + Timestamp timestamp = new Timestamp(); + + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) { + for (int i = 0; i < 2; i++) { + DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); + assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE)); + writeManager.writeEntry(entry); + } + } + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + for (int i = 0; i < 2;i++) { + readManager.pollEntry(100); + } + } + } + + // This test has multiple messages, with a message ending on a block boundary + @Test + public void testBlockBoundaryMultiple() throws Exception { + Event event = new Event(Collections.emptyMap()); + char[] field = new char[8053]; + Arrays.fill(field, 'x'); + event.setField("message", new String(field)); + long startTime = System.currentTimeMillis(); + int messageSize = 0; + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) { + for (int i = 1; i <= 5; i++) { + DLQEntry entry = new DLQEntry(event, "", "", "", new Timestamp(startTime++)); + messageSize += entry.serialize().length; + writeManager.writeEntry(entry); + if (i == 4){ + assertThat(messageSize + (RecordIOWriter.RECORD_HEADER_SIZE * 4), is(BLOCK_SIZE)); + } + } + } + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + for (int i = 0; i < 5;i++) { + readManager.pollEntry(100); + } + } + } + + + // This test tests for a single event that ends on a block and segment boundary + @Test + public void testBlockAndSegmentBoundary() throws Exception { + final int PAD_FOR_BLOCK_SIZE_EVENT = 32616; + Event event = new Event(); + char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT]; + Arrays.fill(field, 'e'); + event.setField("T", new String(field)); + Timestamp timestamp = new Timestamp(); + + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, 1_000_000_000)) { + for (int i = 0; i < 2; i++) { + DLQEntry entry = new DLQEntry(event, "", "", "", timestamp); + assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE)); + writeManager.writeEntry(entry); + } + } + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + for (int i = 0; i < 2;i++) { + readManager.pollEntry(100); + } + } + } + + + @Test + public void testWriteReadRandomEventSize() throws Exception { + Event event = new Event(Collections.emptyMap()); + int eventCount = 3000; + int maxEventSize = BLOCK_SIZE * 2; + long startTime = System.currentTimeMillis(); + + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000L)) { + for (int i = 0; i < eventCount; i++) { + char[] field = new char[(int)(Math.random() * (maxEventSize))]; + Arrays.fill(field, randomFillItem()); + event.setField("message", new String(field)); + DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), new Timestamp(startTime++)); + writeManager.writeEntry(entry); + } + } + try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + for (int i = 0; i < eventCount;i++) { + DLQEntry entry = readManager.pollEntry(100); + assertThat(entry.getReason(), is(String.valueOf(i))); + } + } + } + + // Select a random char to fill the list with. + // Randomly selects a valid value for RecordType, or a non-valid value. + private char randomFillItem() { + char[] valid = new char[RecordType.values().length + 1]; + int j = 0; + valid[j] = 'x'; + for (RecordType type : RecordType.values()){ + valid[j++] = (char)type.toByte(); + } + Random random = new Random(); + return valid[random.nextInt(valid.length)]; + } + @Test public void testWriteStopSmallWriteSeekByTimestamp() throws Exception { int FIRST_WRITE_EVENT_COUNT = 100; @@ -156,7 +277,6 @@ public class DeadLetterQueueReaderTest { String.valueOf(FIRST_WRITE_EVENT_COUNT)); } - private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception { try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { readManager.seekToNextEvent(new Timestamp(seekTarget)); @@ -167,12 +287,11 @@ public class DeadLetterQueueReaderTest { } private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { - try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) { + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) { for (int i = offset; i <= offset + numberOfEvents; i++) { DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); writeManager.writeEntry(entry); } } } - }