mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Fix handling of last event in block
The last event in a block was being incorrectly handled in such a way that caused unexpected behavior depending on the contents of the final event, it could cause plugins to fail, or Logstash to fail with an OutOfMemoryError Fixes #7871 Fixes #7891
This commit is contained in:
parent
9d499c4308
commit
0c1b764440
2 changed files with 132 additions and 3 deletions
|
@ -145,6 +145,10 @@ public final class RecordIOReader implements Closeable {
|
|||
*
|
||||
*/
|
||||
int seekToStartOfEventInBlock() {
|
||||
// Already consumed all the bytes in this block.
|
||||
if (currentBlock.position() == currentBlockSizeReadFromChannel){
|
||||
return -1;
|
||||
}
|
||||
while (true) {
|
||||
RecordType type = RecordType.fromByte(currentBlock.array()[currentBlock.arrayOffset() + currentBlock.position()]);
|
||||
if (RecordType.COMPLETE.equals(type) || RecordType.START.equals(type)) {
|
||||
|
@ -156,7 +160,7 @@ public final class RecordIOReader implements Closeable {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -31,12 +31,15 @@ import org.logstash.ackedqueue.StringElement;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.CoreMatchers.nullValue;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
|
||||
|
||||
public class DeadLetterQueueReaderTest {
|
||||
private Path dir;
|
||||
|
@ -124,6 +127,124 @@ public class DeadLetterQueueReaderTest {
|
|||
}
|
||||
|
||||
|
||||
// Notes on these tests:
|
||||
// These tests are designed to test specific edge cases where events end at block boundaries, hence the specific
|
||||
// sizes of the char arrays being used to pad the events
|
||||
|
||||
// This test tests for a single event that ends on a block boundary
|
||||
@Test
|
||||
public void testBlockBoundary() throws Exception {
|
||||
|
||||
final int PAD_FOR_BLOCK_SIZE_EVENT = 32616;
|
||||
Event event = new Event();
|
||||
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
|
||||
Arrays.fill(field, 'e');
|
||||
event.setField("T", new String(field));
|
||||
Timestamp timestamp = new Timestamp();
|
||||
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "", "", "", timestamp);
|
||||
assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE));
|
||||
writeManager.writeEntry(entry);
|
||||
}
|
||||
}
|
||||
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||
for (int i = 0; i < 2;i++) {
|
||||
readManager.pollEntry(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This test has multiple messages, with a message ending on a block boundary
|
||||
@Test
|
||||
public void testBlockBoundaryMultiple() throws Exception {
|
||||
Event event = new Event(Collections.emptyMap());
|
||||
char[] field = new char[8053];
|
||||
Arrays.fill(field, 'x');
|
||||
event.setField("message", new String(field));
|
||||
long startTime = System.currentTimeMillis();
|
||||
int messageSize = 0;
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "", "", "", new Timestamp(startTime++));
|
||||
messageSize += entry.serialize().length;
|
||||
writeManager.writeEntry(entry);
|
||||
if (i == 4){
|
||||
assertThat(messageSize + (RecordIOWriter.RECORD_HEADER_SIZE * 4), is(BLOCK_SIZE));
|
||||
}
|
||||
}
|
||||
}
|
||||
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||
for (int i = 0; i < 5;i++) {
|
||||
readManager.pollEntry(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// This test tests for a single event that ends on a block and segment boundary
|
||||
@Test
|
||||
public void testBlockAndSegmentBoundary() throws Exception {
|
||||
final int PAD_FOR_BLOCK_SIZE_EVENT = 32616;
|
||||
Event event = new Event();
|
||||
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
|
||||
Arrays.fill(field, 'e');
|
||||
event.setField("T", new String(field));
|
||||
Timestamp timestamp = new Timestamp();
|
||||
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, BLOCK_SIZE, 1_000_000_000)) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "", "", "", timestamp);
|
||||
assertThat(entry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE, is(BLOCK_SIZE));
|
||||
writeManager.writeEntry(entry);
|
||||
}
|
||||
}
|
||||
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||
for (int i = 0; i < 2;i++) {
|
||||
readManager.pollEntry(100);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testWriteReadRandomEventSize() throws Exception {
|
||||
Event event = new Event(Collections.emptyMap());
|
||||
int eventCount = 3000;
|
||||
int maxEventSize = BLOCK_SIZE * 2;
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000L)) {
|
||||
for (int i = 0; i < eventCount; i++) {
|
||||
char[] field = new char[(int)(Math.random() * (maxEventSize))];
|
||||
Arrays.fill(field, randomFillItem());
|
||||
event.setField("message", new String(field));
|
||||
DLQEntry entry = new DLQEntry(event, "", "", String.valueOf(i), new Timestamp(startTime++));
|
||||
writeManager.writeEntry(entry);
|
||||
}
|
||||
}
|
||||
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||
for (int i = 0; i < eventCount;i++) {
|
||||
DLQEntry entry = readManager.pollEntry(100);
|
||||
assertThat(entry.getReason(), is(String.valueOf(i)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Select a random char to fill the list with.
|
||||
// Randomly selects a valid value for RecordType, or a non-valid value.
|
||||
private char randomFillItem() {
|
||||
char[] valid = new char[RecordType.values().length + 1];
|
||||
int j = 0;
|
||||
valid[j] = 'x';
|
||||
for (RecordType type : RecordType.values()){
|
||||
valid[j++] = (char)type.toByte();
|
||||
}
|
||||
Random random = new Random();
|
||||
return valid[random.nextInt(valid.length)];
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
|
||||
int FIRST_WRITE_EVENT_COUNT = 100;
|
||||
|
@ -156,7 +277,6 @@ public class DeadLetterQueueReaderTest {
|
|||
String.valueOf(FIRST_WRITE_EVENT_COUNT));
|
||||
}
|
||||
|
||||
|
||||
private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception {
|
||||
try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||
readManager.seekToNextEvent(new Timestamp(seekTarget));
|
||||
|
@ -167,7 +287,7 @@ public class DeadLetterQueueReaderTest {
|
|||
}
|
||||
|
||||
private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException {
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) {
|
||||
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 10_000_000)) {
|
||||
for (int i = offset; i <= offset + numberOfEvents; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++));
|
||||
writeManager.writeEntry(entry);
|
||||
|
@ -175,4 +295,9 @@ public class DeadLetterQueueReaderTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidDirectory() throws Exception {
|
||||
DeadLetterQueueReader reader = new DeadLetterQueueReader(dir);
|
||||
assertThat(reader.pollEntry(100), is(nullValue()));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue