Fix handling of last event in block

The last event in a block was being incorrectly handled in such a way that caused unexpected
behavior depending on the contents of the final event, it could cause plugins to fail, or
Logstash to fail with an OutOfMemoryError

Fixes #7871
This commit is contained in:
Rob Bavey 2017-08-01 17:36:32 -04:00
parent 73f00842fa
commit b5b1ca3c15
2 changed files with 127 additions and 4 deletions

View file

@ -145,6 +145,10 @@ public final class RecordIOReader implements Closeable {
*
*/
int seekToStartOfEventInBlock() {
// Already consumed all the bytes in this block.
if (currentBlock.position() == currentBlockSizeReadFromChannel){
return -1;
}
while (true) {
RecordType type = RecordType.fromByte(currentBlock.array()[currentBlock.arrayOffset() + currentBlock.position()]);
if (RecordType.COMPLETE.equals(type) || RecordType.START.equals(type)) {
@ -156,7 +160,7 @@ public final class RecordIOReader implements Closeable {
return -1;
}
}
}
}
/**
*

View file

@ -31,12 +31,15 @@ import org.logstash.ackedqueue.StringElement;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Random;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
public class DeadLetterQueueReaderTest {
private Path dir;
@ -124,6 +127,124 @@ public class DeadLetterQueueReaderTest {
}
// Notes on these tests:
// These tests are designed to test specific edge cases where events end at block boundaries, hence the specific
// sizes of the char arrays being used to pad the events
// This test tests for a single event that ends on a block boundary
@Test
public void testBlockBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32616;
Event event = new Event();
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
Arrays.fill(field, 'e');
event.setField("T", new String(field));
Timestamp timestamp = new Timestamp();
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
for (int i = 0; i < 2; i++) {
DLQEntry entry = new DLQEntry(event, "", "", "", timestamp);
assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE));
writeManager.writeEntry(entry);
}
}
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
for (int i = 0; i < 2;i++) {
readManager.pollEntry(100);
}
}
}
// This test has multiple messages, with a message ending on a block boundary
@Test
public void testBlockBoundaryMultiple() throws Exception {
Event event = new Event(Collections.emptyMap());
char[] field = new char[8053];
Arrays.fill(field, 'x');
event.setField("message", new String(field));
long startTime = System.currentTimeMillis();
int messageSize = 0;
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
for (int i = 1; i <= 5; i++) {
DLQEntry entry = new DLQEntry(event, "", "", "", new Timestamp(startTime++));
messageSize += entry.serialize().length;
writeManager.writeEntry(entry);
if (i == 4){
assertThat(messageSize + (RecordIOWriter.RECORD_HEADER_SIZE * 4), is(BLOCK_SIZE));
}
}
}
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
for (int i = 0; i < 5;i++) {
readManager.pollEntry(100);
}
}
}
// This test tests for a single event that ends on a block and segment boundary
@Test
public void testBlockAndSegmentBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32616;
Event event = new Event();
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
Arrays.fill(field, 'e');
event.setField("T", new String(field));
Timestamp timestamp = new Timestamp();
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, 1_000_000_000)) {
for (int i = 0; i < 2; i++) {
DLQEntry entry = new DLQEntry(event, "", "", "", timestamp);
assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE));
writeManager.writeEntry(entry);
}
}
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
for (int i = 0; i < 2;i++) {
readManager.pollEntry(100);
}
}
}
@Test
public void testWriteReadRandomEventSize() throws Exception {
Event event = new Event(Collections.emptyMap());
int eventCount = 3000;
int maxEventSize = BLOCK_SIZE * 2;
long startTime = System.currentTimeMillis();
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000L)) {
for (int i = 0; i < eventCount; i++) {
char[] field = new char[(int)(Math.random() * (maxEventSize))];
Arrays.fill(field, randomFillItem());
event.setField("message", new String(field));
DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), new Timestamp(startTime++));
writeManager.writeEntry(entry);
}
}
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
for (int i = 0; i < eventCount;i++) {
DLQEntry entry = readManager.pollEntry(100);
assertThat(entry.getReason(), is(String.valueOf(i)));
}
}
}
// Select a random char to fill the list with.
// Randomly selects a valid value for RecordType, or a non-valid value.
private char randomFillItem() {
char[] valid = new char[RecordType.values().length + 1];
int j = 0;
valid[j] = 'x';
for (RecordType type : RecordType.values()){
valid[j++] = (char)type.toByte();
}
Random random = new Random();
return valid[random.nextInt(valid.length)];
}
@Test
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
int FIRST_WRITE_EVENT_COUNT = 100;
@ -156,7 +277,6 @@ public class DeadLetterQueueReaderTest {
String.valueOf(FIRST_WRITE_EVENT_COUNT));
}
private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception {
try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
readManager.seekToNextEvent(new Timestamp(seekTarget));
@ -167,12 +287,11 @@ public class DeadLetterQueueReaderTest {
}
private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException {
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) {
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) {
for (int i = offset; i <= offset + numberOfEvents; i++) {
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++));
writeManager.writeEntry(entry);
}
}
}
}