DLQ: Multiple bug fixes related to event seeks

The fixes in this commit combine to fix #7855

Fixes 3 bugs:
  seekToNextEventPosition:
    handles the case where a null event is encountered during the search for a matching event
    handle restoration of buffer position when the next event consumed goes across block
     boundaries

  seekToStartOfEventInBlock:
    handles the case where the only event in a block is an 'end' event

Also:
 Adds new RecordIOReaderTest, and moves tests over from RecordIOWriterTest that seem to fit
that better

Fixes #7948
This commit is contained in:
Rob Bavey 2017-08-04 17:29:44 -04:00
parent 2226b9cd3c
commit 53e3cb3be5
5 changed files with 241 additions and 120 deletions

View file

@ -65,6 +65,9 @@ public final class DeadLetterQueueReader implements Closeable {
currentReader = new RecordIOReader(segment);
byte[] event = currentReader.seekToNextEventPosition(timestamp, (b) -> {
try {
if (b == null){
return null;
}
return DLQEntry.deserialize(b).getEntryTime();
} catch (IOException e) {
return null;

View file

@ -25,6 +25,7 @@ import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Comparator;
import java.util.function.Function;
import java.util.zip.CRC32;
@ -40,7 +41,7 @@ import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
public final class RecordIOReader implements Closeable {
private final FileChannel channel;
private final ByteBuffer currentBlock;
private ByteBuffer currentBlock;
private int currentBlockSizeReadFromChannel;
private final Path path;
private long channelPosition;
@ -83,7 +84,13 @@ public final class RecordIOReader implements Closeable {
while (lowBlock < highBlock) {
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
seekToBlock(middle);
T found = keyExtractor.apply(readEvent());
byte[] readEvent = readEvent();
// If the event is null, scan from the low block upwards
if (readEvent == null){
matchingBlock = lowBlock;
break;
}
T found = keyExtractor.apply(readEvent);
int compare = keyComparator.compare(found, target);
if (compare > 0) {
highBlock = middle - 1;
@ -100,18 +107,21 @@ public final class RecordIOReader implements Closeable {
// now sequential scan to event
seekToBlock(matchingBlock);
int currentPosition = 0;
int compare = -1;
byte[] event = null;
BufferState restorePoint = null;
while (compare < 0) {
currentPosition = currentBlock.position();
// Save the buffer state when reading the next event, to restore to if a matching event is found.
restorePoint = saveBufferState();
event = readEvent();
if (event == null) {
return null;
}
compare = keyComparator.compare(keyExtractor.apply(event), target);
}
currentBlock.position(currentPosition);
if (restorePoint != null) {
restoreFrom(restorePoint);
}
return event;
}
@ -146,7 +156,7 @@ public final class RecordIOReader implements Closeable {
*/
int seekToStartOfEventInBlock() {
// Already consumed all the bytes in this block.
if (currentBlock.position() >= currentBlockSizeReadFromChannel){
if (currentBlock.position() >= currentBlockSizeReadFromChannel){
return -1;
}
while (true) {
@ -156,6 +166,10 @@ public final class RecordIOReader implements Closeable {
} else if (RecordType.END.equals(type)) {
RecordHeader header = RecordHeader.get(currentBlock);
currentBlock.position(currentBlock.position() + header.getSize());
// If this is the end of stream, then cannot seek to start of block
if (this.isEndOfStream()){
return -1;
}
} else {
return -1;
}
@ -233,4 +247,35 @@ public final class RecordIOReader implements Closeable {
public void close() throws IOException {
channel.close();
}
private BufferState saveBufferState() throws IOException {
BufferState bufferState = new BufferState();
bufferState.blockContents = Arrays.copyOf(this.currentBlock.array(), this.currentBlock.array().length);
bufferState.channelPosition = channel.position();
bufferState.currentBlockPosition = currentBlock.position();
bufferState.currentBlockSizeReadFromChannel = this.currentBlockSizeReadFromChannel;
return bufferState;
}
private void restoreFrom(BufferState bufferState) throws IOException {
this.currentBlock = ByteBuffer.wrap(bufferState.blockContents);
this.currentBlock.position(bufferState.currentBlockPosition);
this.channel.position(bufferState.channelPosition);
this.channelPosition = channel.position();
this.currentBlockSizeReadFromChannel = bufferState.currentBlockSizeReadFromChannel;
}
final static class BufferState {
int currentBlockPosition;
int currentBlockSizeReadFromChannel;
long channelPosition;
byte[] blockContents;
public String toString() {
return String.format("CurrentBlockPosition:%d, currentBlockSizeReadFromChannel: %d, channelPosition: %d",
currentBlockPosition, currentBlockSizeReadFromChannel, channelPosition);
}
}
}

View file

@ -123,9 +123,7 @@ public class DeadLetterQueueReaderTest {
// Fill event with not quite enough characters to fill block. Fill event with valid RecordType characters - this
// was the cause of https://github.com/elastic/logstash/issues/7868
char[] field = new char[32500];
Arrays.fill(field, 's');
event.setField("message", new String(field));
event.setField("message", generateMessageContent(32500));
long startTime = System.currentTimeMillis();
int messageSize = 0;
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
@ -216,9 +214,7 @@ public class DeadLetterQueueReaderTest {
public void testBlockAndSegmentBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32616;
Event event = new Event();
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
Arrays.fill(field, 'e');
event.setField("T", new String(field));
event.setField("T", generateMessageContent(PAD_FOR_BLOCK_SIZE_EVENT));
Timestamp timestamp = new Timestamp();
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, 1_000_000_000)) {
@ -245,9 +241,7 @@ public class DeadLetterQueueReaderTest {
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000L)) {
for (int i = 0; i < eventCount; i++) {
char[] field = new char[(int)(Math.random() * (maxEventSize))];
Arrays.fill(field, randomFillItem());
event.setField("message", new String(field));
event.setField("message", generateMessageContent((int)(Math.random() * (maxEventSize))));
DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), new Timestamp(startTime++));
writeManager.writeEntry(entry);
}
@ -260,19 +254,6 @@ public class DeadLetterQueueReaderTest {
}
}
// Select a random char to fill the list with.
// Randomly selects a valid value for RecordType, or a non-valid value.
private char randomFillItem() {
char[] valid = new char[RecordType.values().length + 1];
int j = 0;
valid[j] = 'x';
for (RecordType type : RecordType.values()){
valid[j++] = (char)type.toByte();
}
Random random = new Random();
return valid[random.nextInt(valid.length)];
}
@Test
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
int FIRST_WRITE_EVENT_COUNT = 100;
@ -305,6 +286,21 @@ public class DeadLetterQueueReaderTest {
String.valueOf(FIRST_WRITE_EVENT_COUNT));
}
private String generateMessageContent(int size) {
char[] valid = new char[RecordType.values().length + 1];
int j = 0;
valid[j] = 'x';
for (RecordType type : RecordType.values()){
valid[j++] = (char)type.toByte();
}
Random random = new Random();
char fillWith = valid[random.nextInt(valid.length)];
char[] fillArray = new char[size];
Arrays.fill(fillArray, fillWith);
return new String(fillArray);
}
private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception {
try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
readManager.seekToNextEvent(seekTarget);
@ -315,7 +311,7 @@ public class DeadLetterQueueReaderTest {
}
private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException {
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) {
try (DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) {
for (int i = offset; i <= offset + numberOfEvents; i++) {
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++));
writeManager.writeEntry(entry);

View file

@ -0,0 +1,160 @@
package org.logstash.common.io;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.StringElement;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
public class RecordIOReaderTest {
private Path file;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void setUp() throws Exception {
file = temporaryFolder.newFile("test").toPath();
}
@Test
public void testReadEmptyBlock() throws Exception {
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
assertThat(reader.readEvent(), is(nullValue()));
writer.close();
reader.close();
}
@Test
public void testSeekToStartFromEndWithoutNextRecord() throws Exception {
char[] tooBig = new char[BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'c');
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
RecordIOReader reader = new RecordIOReader(file);
reader.seekToBlock(1);
reader.consumeBlock(true);
assertThat(reader.seekToStartOfEventInBlock(), equalTo(-1));
reader.close();
writer.close();
}
@Test
public void testSeekToStartFromEndWithNextRecordPresent() throws Exception {
char[] tooBig = new char[BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'c');
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
writer.writeEvent(input.serialize());
RecordIOReader reader = new RecordIOReader(file);
reader.seekToBlock(1);
reader.consumeBlock(true);
assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026));
reader.close();
writer.close();
}
@Test
public void testReadMiddle() throws Exception {
char[] tooBig = fillArray(3 * BLOCK_SIZE + 1000);
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
byte[] inputSerialized = input.serialize();
writer.writeEvent(inputSerialized);
reader.seekToBlock(1);
assertThat(reader.readEvent(), is(nullValue()));
writer.writeEvent(inputSerialized);
reader.seekToBlock(1);
assertThat(reader.readEvent(), is(not(nullValue())));
writer.close();
reader.close();
}
@Test
public void testFind() throws Exception {
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]);
for (int i = 0; i < 20000; i++) {
intBuffer.rewind();
intBuffer.putInt(i);
writer.writeEvent(intBuffer.array());
}
Function<byte[], Object> toInt = (b) -> ByteBuffer.wrap(b).getInt();
reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2));
int nextVal = (int) toInt.apply(reader.readEvent());
assertThat(nextVal, equalTo(34));
}
@Test
public void testSeekBlockSizeEvents() throws Exception {
writeSeekAndVerify(10, BLOCK_SIZE);
}
@Test
public void testSeekHalfBlockSizeEvents() throws Exception {
writeSeekAndVerify(10, BLOCK_SIZE/2);
}
@Test
public void testSeekDoubleBlockSizeEvents() throws Exception {
writeSeekAndVerify(10, BLOCK_SIZE * 2);
}
private void writeSeekAndVerify(final int eventCount, final int expectedSize) throws IOException {
int blocks = (int)Math.ceil(expectedSize / (double)BLOCK_SIZE);
int fillSize = (int) (expectedSize - (blocks * RECORD_HEADER_SIZE));
try(RecordIOWriter writer = new RecordIOWriter(file)){
for (char i = 0; i < eventCount; i++) {
char[] blockSize = fillArray(fillSize);
blockSize[0] = i;
assertThat(writer.writeEvent(new StringElement(new String(blockSize)).serialize()), is((long)expectedSize));
}
}
try(RecordIOReader reader = new RecordIOReader(file)) {
Function<byte[], Character> toChar = (b) -> (char) ByteBuffer.wrap(b).get(0);
for (char i = 0; i < eventCount; i++) {
reader.seekToNextEventPosition(i, toChar, Comparator.comparing(o -> ((Character) o)));
assertThat(toChar.apply(reader.readEvent()), equalTo(i));
}
}
}
private char[] fillArray(final int fillSize) {
char[] blockSize= new char[fillSize];
Arrays.fill(blockSize, 'e');
return blockSize;
}
}

View file

@ -6,15 +6,12 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.logstash.ackedqueue.StringElement;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Comparator;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
@ -30,15 +27,6 @@ public class RecordIOWriterTest {
file = temporaryFolder.newFile("test").toPath();
}
@Test
public void testReadEmptyBlock() throws Exception {
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
assertThat(reader.readEvent(), is(nullValue()));
writer.close();
reader.close();
}
@Test
public void testSingleComplete() throws Exception {
StringElement input = new StringElement("element");
@ -51,46 +39,9 @@ public class RecordIOWriterTest {
writer.close();
}
@Test
public void testSeekToStartFromEndWithoutNextRecord() throws Exception {
char[] tooBig = new char[BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'c');
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
RecordIOReader reader = new RecordIOReader(file);
reader.seekToBlock(1);
reader.consumeBlock(true);
assertThat(reader.seekToStartOfEventInBlock(), equalTo(-1));
reader.close();
writer.close();
}
@Test
public void testSeekToStartFromEndWithNextRecordPresent() throws Exception {
char[] tooBig = new char[BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'c');
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
writer.writeEvent(input.serialize());
RecordIOReader reader = new RecordIOReader(file);
reader.seekToBlock(1);
reader.consumeBlock(true);
assertThat(reader.seekToStartOfEventInBlock(), equalTo(1026));
reader.close();
writer.close();
}
@Test
public void testFitsInTwoBlocks() throws Exception {
char[] tooBig = new char[BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'c');
char[] tooBig = fillArray(BLOCK_SIZE + 1000);
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
@ -99,8 +50,7 @@ public class RecordIOWriterTest {
@Test
public void testFitsInThreeBlocks() throws Exception {
char[] tooBig = new char[2 * BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'r');
char[] tooBig = fillArray(2 * BLOCK_SIZE + 1000);
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
writer.writeEvent(input.serialize());
@ -116,8 +66,7 @@ public class RecordIOWriterTest {
@Test
public void testReadWhileWrite() throws Exception {
char[] tooBig = new char[2 * BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'r');
char[] tooBig = fillArray(2 * BLOCK_SIZE + 1000);
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
@ -150,41 +99,9 @@ public class RecordIOWriterTest {
reader.close();
}
@Test
public void testReadMiddle() throws Exception {
char[] tooBig = new char[3 * BLOCK_SIZE + 1000];
Arrays.fill(tooBig, 'r');
StringElement input = new StringElement(new String(tooBig));
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
byte[] inputSerialized = input.serialize();
writer.writeEvent(inputSerialized);
reader.seekToBlock(1);
assertThat(reader.readEvent(), is(nullValue()));
writer.writeEvent(inputSerialized);
reader.seekToBlock(1);
assertThat(reader.readEvent(), is(not(nullValue())));
writer.close();
reader.close();
}
@Test
public void testFind() throws Exception {
RecordIOWriter writer = new RecordIOWriter(file);
RecordIOReader reader = new RecordIOReader(file);
ByteBuffer intBuffer = ByteBuffer.wrap(new byte[4]);
for (int i = 0; i < 20000; i++) {
intBuffer.rewind();
intBuffer.putInt(i);
writer.writeEvent(intBuffer.array());
}
Function<byte[], Object> toInt = (b) -> ByteBuffer.wrap(b).getInt();
reader.seekToNextEventPosition(34, toInt, (o1, o2) -> ((Integer) o1).compareTo((Integer) o2));
int nextVal = (int) toInt.apply(reader.readEvent());
assertThat(nextVal, equalTo(34));
private char[] fillArray(final int fillSize) {
char[] blockSize= new char[fillSize];
Arrays.fill(blockSize, 'e');
return blockSize;
}
}