From 53e3cb3be5b9a369ed5bd440502f978ed6d61457 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Fri, 4 Aug 2017 17:29:44 -0400 Subject: [PATCH] DLQ: Multiple bug fixes related to event seeks The fixes in this commit combine to fix #7855 Fixes 3 bugs: seekToNextEventPosition: handles the case where a null event is encountered during the search for a matching event handle restoration of buffer position when the next event consumed goes across block boundaries seekToStartOfEventInBlock: handles the case where the only event in a block is an 'end' event Also: Adds new RecordIOReaderTest, and moves tests over from RecordIOWriterTest that seem to fit that better Fixes #7948 --- .../common/io/DeadLetterQueueReader.java | 3 + .../logstash/common/io/RecordIOReader.java | 57 ++++++- .../common/io/DeadLetterQueueReaderTest.java | 42 +++-- .../common/io/RecordIOReaderTest.java | 160 ++++++++++++++++++ .../common/io/RecordIOWriterTest.java | 99 +---------- 5 files changed, 241 insertions(+), 120 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java index c228fd363..3d4baeded 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java @@ -65,6 +65,9 @@ public final class DeadLetterQueueReader implements Closeable { currentReader = new RecordIOReader(segment); byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> { try { + if (b == null){ + return null; + } return DLQEntry.deserialize(b).getEntryTime(); } catch (IOException e) { return null; diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index d20618e68..1fd5fc794 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -25,6 +25,7 @@ import java.nio.channels.ClosedByInterruptException; import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.Arrays; import java.util.Comparator; import java.util.function.Function; import java.util.zip.CRC32; @@ -40,7 +41,7 @@ import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; public final class RecordIOReader implements Closeable { private final FileChannel channel; - private final ByteBuffer currentBlock; + private ByteBuffer currentBlock; private int currentBlockSizeReadFromChannel; private final Path path; private long channelPosition; @@ -83,7 +84,13 @@ public final class RecordIOReader implements Closeable { while (lowBlock < highBlock) { int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0); seekToBlock(middle); - T found = keyExtractor.apply(readEvent()); + byte[] readEvent = readEvent(); + // If the event is null, scan from the low block upwards + if (readEvent == null){ + matchingBlock = lowBlock; + break; + } + T found = keyExtractor.apply(readEvent); int compare = keyComparator.compare(found, target); if (compare > 0) { highBlock = middle - 1; @@ -100,18 +107,21 @@ public final class RecordIOReader implements Closeable { // now sequential scan to event seekToBlock(matchingBlock); - int currentPosition = 0; int compare = -1; byte[] event = null; + BufferState restorePoint = null; while (compare < 0) { - currentPosition = currentBlock.position(); + // Save the buffer state when reading the next event, to restore to if a matching event is found. + restorePoint = saveBufferState(); event = readEvent(); if (event == null) { return null; } compare = keyComparator.compare(keyExtractor.apply(event), target); } - currentBlock.position(currentPosition); + if (restorePoint != null) { + restoreFrom(restorePoint); + } return event; } @@ -146,7 +156,7 @@ public final class RecordIOReader implements Closeable { */ int seekToStartOfEventInBlock() { // Already consumed all the bytes in this block. - if (currentBlock.position() >= currentBlockSizeReadFromChannel){ + if (currentBlock.position() >= currentBlockSizeReadFromChannel){ return -1; } while (true) { @@ -156,6 +166,10 @@ public final class RecordIOReader implements Closeable { } else if (RecordType.END.equals(type)) { RecordHeader header = RecordHeader.get(currentBlock); currentBlock.position(currentBlock.position() + header.getSize()); + // If this is the end of stream, then cannot seek to start of block + if (this.isEndOfStream()){ + return -1; + } } else { return -1; } @@ -233,4 +247,35 @@ public final class RecordIOReader implements Closeable { public void close() throws IOException { channel.close(); } + + + private BufferState saveBufferState() throws IOException { + BufferState bufferState = new BufferState(); + bufferState.blockContents = Arrays.copyOf(this.currentBlock.array(), this.currentBlock.array().length); + bufferState.channelPosition = channel.position(); + bufferState.currentBlockPosition = currentBlock.position(); + bufferState.currentBlockSizeReadFromChannel = this.currentBlockSizeReadFromChannel; + return bufferState; + } + + private void restoreFrom(BufferState bufferState) throws IOException { + this.currentBlock = ByteBuffer.wrap(bufferState.blockContents); + this.currentBlock.position(bufferState.currentBlockPosition); + this.channel.position(bufferState.channelPosition); + this.channelPosition = channel.position(); + this.currentBlockSizeReadFromChannel = bufferState.currentBlockSizeReadFromChannel; + + } + + final static class BufferState { + int currentBlockPosition; + int currentBlockSizeReadFromChannel; + long channelPosition; + byte[] blockContents; + + public String toString() { + return String.format("CurrentBlockPosition:%d, currentBlockSizeReadFromChannel: %d, channelPosition: %d", + currentBlockPosition, currentBlockSizeReadFromChannel, channelPosition); + } + } } diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index c69a1f36a..305bae556 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -123,9 +123,7 @@ public class DeadLetterQueueReaderTest { // Fill event with not quite enough characters to fill block. Fill event with valid RecordType characters - this // was the cause of https://github.com/elastic/logstash/issues/7868 - char[] field = new char[32500]; - Arrays.fill(field, 's'); - event.setField("message", new String(field)); + event.setField("message", generateMessageContent(32500)); long startTime = System.currentTimeMillis(); int messageSize = 0; try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) { @@ -216,9 +214,7 @@ public class DeadLetterQueueReaderTest { public void testBlockAndSegmentBoundary() throws Exception { final int PAD_FOR_BLOCK_SIZE_EVENT = 32616; Event event = new Event(); - char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT]; - Arrays.fill(field, 'e'); - event.setField("T", new String(field)); + event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT)); Timestamp timestamp = new Timestamp(); try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, 1_000_000_000)) { @@ -245,9 +241,7 @@ public class DeadLetterQueueReaderTest { try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000L)) { for (int i = 0; i < eventCount; i++) { - char[] field = new char[(int)(Math.random() * (maxEventSize))]; - Arrays.fill(field, randomFillItem()); - event.setField("message", new String(field)); + event.setField("message", generateMessageContent((int)(Math.random() * (maxEventSize)))); DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), new Timestamp(startTime++)); writeManager.writeEntry(entry); } @@ -260,19 +254,6 @@ public class DeadLetterQueueReaderTest { } } - // Select a random char to fill the list with. - // Randomly selects a valid value for RecordType, or a non-valid value. - private char randomFillItem() { - char[] valid = new char[RecordType.values().length + 1]; - int j = 0; - valid[j] = 'x'; - for (RecordType type : RecordType.values()){ - valid[j++] = (char)type.toByte(); - } - Random random = new Random(); - return valid[random.nextInt(valid.length)]; - } - @Test public void testWriteStopSmallWriteSeekByTimestamp() throws Exception { int FIRST_WRITE_EVENT_COUNT = 100; @@ -305,6 +286,21 @@ public class DeadLetterQueueReaderTest { String.valueOf(FIRST_WRITE_EVENT_COUNT)); } + private String generateMessageContent(int size) { + char[] valid = new char[RecordType.values().length + 1]; + int j = 0; + valid[j] = 'x'; + for (RecordType type : RecordType.values()){ + valid[j++] = (char)type.toByte(); + } + Random random = new Random(); + char fillWith = valid[random.nextInt(valid.length)]; + + char[] fillArray = new char[size]; + Arrays.fill(fillArray, fillWith); + return new String(fillArray); + } + private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception { try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { readManager.seekToNextEvent(seekTarget); @@ -315,7 +311,7 @@ public class DeadLetterQueueReaderTest { } private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { - try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) { + try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) { for (int i = offset; i <= offset + numberOfEvents; i++) { DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); writeManager.writeEntry(entry); diff --git a/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java new file mode 100644 index 000000000..46a069364 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/io/RecordIOReaderTest.java @@ -0,0 +1,160 @@ +package org.logstash.common.io; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.logstash.ackedqueue.StringElement; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Random; +import java.util.function.Function; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; +import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; + +public class RecordIOReaderTest { + private Path file; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void setUp() throws Exception { + file = temporaryFolder.newFile("test").toPath(); + } + + @Test + public void testReadEmptyBlock() throws Exception { + RecordIOWriter writer = new RecordIOWriter(file); + RecordIOReader reader = new RecordIOReader(file); + assertThat(reader.readEvent(), is(nullValue())); + writer.close(); + reader.close(); + } + + @Test + public void testSeekToStartFromEndWithoutNextRecord() throws Exception { + char[] tooBig = new char[BLOCK_SIZE + 1000]; + Arrays.fill(tooBig, 'c'); + StringElement input = new StringElement(new String(tooBig)); + RecordIOWriter writer = new RecordIOWriter(file); + writer.writeEvent(input.serialize()); + + RecordIOReader reader = new RecordIOReader(file); + reader.seekToBlock(1); + reader.consumeBlock(true); + assertThat(reader.seekToStartOfEventInBlock(), equalTo(-1)); + + reader.close(); + writer.close(); + } + + @Test + public void testSeekToStartFromEndWithNextRecordPresent() throws Exception { + char[] tooBig = new char[BLOCK_SIZE + 1000]; + Arrays.fill(tooBig, 'c'); + StringElement input = new StringElement(new String(tooBig)); + RecordIOWriter writer = new RecordIOWriter(file); + writer.writeEvent(input.serialize()); + writer.writeEvent(input.serialize()); + + RecordIOReader reader = new RecordIOReader(file); + reader.seekToBlock(1); + reader.consumeBlock(true); + assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026)); + + reader.close(); + writer.close(); + } + + + @Test + public void testReadMiddle() throws Exception { + char[] tooBig = fillArray(3 * BLOCK_SIZE + 1000); + StringElement input = new StringElement(new String(tooBig)); + RecordIOWriter writer = new RecordIOWriter(file); + RecordIOReader reader = new RecordIOReader(file); + byte[] inputSerialized = input.serialize(); + + writer.writeEvent(inputSerialized); + reader.seekToBlock(1); + assertThat(reader.readEvent(), is(nullValue())); + writer.writeEvent(inputSerialized); + reader.seekToBlock(1); + assertThat(reader.readEvent(), is(not(nullValue()))); + + writer.close(); + reader.close(); + } + + @Test + public void testFind() throws Exception { + + RecordIOWriter writer = new RecordIOWriter(file); + RecordIOReader reader = new RecordIOReader(file); + ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]); + for (int i = 0; i < 20000; i++) { + intBuffer.rewind(); + intBuffer.putInt(i); + writer.writeEvent(intBuffer.array()); + } + + Function toInt = (b) -> ByteBuffer.wrap(b).getInt(); + reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2)); + int nextVal = (int) toInt.apply(reader.readEvent()); + assertThat(nextVal, equalTo(34)); + } + + @Test + public void testSeekBlockSizeEvents() throws Exception { + writeSeekAndVerify(10, BLOCK_SIZE); + } + + @Test + public void testSeekHalfBlockSizeEvents() throws Exception { + writeSeekAndVerify(10, BLOCK_SIZE/2); + } + + @Test + public void testSeekDoubleBlockSizeEvents() throws Exception { + writeSeekAndVerify(10, BLOCK_SIZE * 2); + } + + private void writeSeekAndVerify(final int eventCount, final int expectedSize) throws IOException { + int blocks = (int)Math.ceil(expectedSize / (double)BLOCK_SIZE); + int fillSize = (int) (expectedSize - (blocks * RECORD_HEADER_SIZE)); + + try(RecordIOWriter writer = new RecordIOWriter(file)){ + for (char i = 0; i < eventCount; i++) { + char[] blockSize = fillArray(fillSize); + blockSize[0] = i; + assertThat(writer.writeEvent(new StringElement(new String(blockSize)).serialize()), is((long)expectedSize)); + } + } + + try(RecordIOReader reader = new RecordIOReader(file)) { + Function toChar = (b) -> (char) ByteBuffer.wrap(b).get(0); + + for (char i = 0; i < eventCount; i++) { + reader.seekToNextEventPosition(i, toChar, Comparator.comparing(o -> ((Character) o))); + assertThat(toChar.apply(reader.readEvent()), equalTo(i)); + } + } + } + + private char[] fillArray(final int fillSize) { + char[] blockSize= new char[fillSize]; + Arrays.fill(blockSize, 'e'); + return blockSize; + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/io/RecordIOWriterTest.java b/logstash-core/src/test/java/org/logstash/common/io/RecordIOWriterTest.java index d3a55ac65..2edb06893 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/RecordIOWriterTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/RecordIOWriterTest.java @@ -6,15 +6,12 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.logstash.ackedqueue.StringElement; -import java.nio.ByteBuffer; + import java.nio.file.Path; import java.util.Arrays; -import java.util.Comparator; -import java.util.function.Function; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; @@ -30,15 +27,6 @@ public class RecordIOWriterTest { file = temporaryFolder.newFile("test").toPath(); } - @Test - public void testReadEmptyBlock() throws Exception { - RecordIOWriter writer = new RecordIOWriter(file); - RecordIOReader reader = new RecordIOReader(file); - assertThat(reader.readEvent(), is(nullValue())); - writer.close(); - reader.close(); - } - @Test public void testSingleComplete() throws Exception { StringElement input = new StringElement("element"); @@ -51,46 +39,9 @@ public class RecordIOWriterTest { writer.close(); } - @Test - public void testSeekToStartFromEndWithoutNextRecord() throws Exception { - char[] tooBig = new char[BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'c'); - StringElement input = new StringElement(new String(tooBig)); - RecordIOWriter writer = new RecordIOWriter(file); - writer.writeEvent(input.serialize()); - - RecordIOReader reader = new RecordIOReader(file); - reader.seekToBlock(1); - reader.consumeBlock(true); - assertThat(reader.seekToStartOfEventInBlock(), equalTo(-1)); - - reader.close(); - writer.close(); - } - - @Test - public void testSeekToStartFromEndWithNextRecordPresent() throws Exception { - char[] tooBig = new char[BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'c'); - StringElement input = new StringElement(new String(tooBig)); - RecordIOWriter writer = new RecordIOWriter(file); - writer.writeEvent(input.serialize()); - writer.writeEvent(input.serialize()); - - RecordIOReader reader = new RecordIOReader(file); - reader.seekToBlock(1); - reader.consumeBlock(true); - assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026)); - - reader.close(); - writer.close(); - } - - @Test public void testFitsInTwoBlocks() throws Exception { - char[] tooBig = new char[BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'c'); + char[] tooBig = fillArray(BLOCK_SIZE + 1000); StringElement input = new StringElement(new String(tooBig)); RecordIOWriter writer = new RecordIOWriter(file); writer.writeEvent(input.serialize()); @@ -99,8 +50,7 @@ public class RecordIOWriterTest { @Test public void testFitsInThreeBlocks() throws Exception { - char[] tooBig = new char[2 * BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'r'); + char[] tooBig = fillArray(2 * BLOCK_SIZE + 1000); StringElement input = new StringElement(new String(tooBig)); RecordIOWriter writer = new RecordIOWriter(file); writer.writeEvent(input.serialize()); @@ -116,8 +66,7 @@ public class RecordIOWriterTest { @Test public void testReadWhileWrite() throws Exception { - char[] tooBig = new char[2 * BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'r'); + char[] tooBig = fillArray(2 * BLOCK_SIZE + 1000); StringElement input = new StringElement(new String(tooBig)); RecordIOWriter writer = new RecordIOWriter(file); RecordIOReader reader = new RecordIOReader(file); @@ -150,41 +99,9 @@ public class RecordIOWriterTest { reader.close(); } - @Test - public void testReadMiddle() throws Exception { - char[] tooBig = new char[3 * BLOCK_SIZE + 1000]; - Arrays.fill(tooBig, 'r'); - StringElement input = new StringElement(new String(tooBig)); - RecordIOWriter writer = new RecordIOWriter(file); - RecordIOReader reader = new RecordIOReader(file); - byte[] inputSerialized = input.serialize(); - - writer.writeEvent(inputSerialized); - reader.seekToBlock(1); - assertThat(reader.readEvent(), is(nullValue())); - writer.writeEvent(inputSerialized); - reader.seekToBlock(1); - assertThat(reader.readEvent(), is(not(nullValue()))); - - writer.close(); - reader.close(); - } - - @Test - public void testFind() throws Exception { - - RecordIOWriter writer = new RecordIOWriter(file); - RecordIOReader reader = new RecordIOReader(file); - ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]); - for (int i = 0; i < 20000; i++) { - intBuffer.rewind(); - intBuffer.putInt(i); - writer.writeEvent(intBuffer.array()); - } - - Function toInt = (b) -> ByteBuffer.wrap(b).getInt(); - reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2)); - int nextVal = (int) toInt.apply(reader.readEvent()); - assertThat(nextVal, equalTo(34)); + private char[] fillArray(final int fillSize) { + char[] blockSize= new char[fillSize]; + Arrays.fill(blockSize, 'e'); + return blockSize; } } \ No newline at end of file