Add additional unit test and more broad checking

Fixes #7871
This commit is contained in:
Rob Bavey 2017-08-02 17:34:14 -04:00
parent b5b1ca3c15
commit 710ac1505c
2 changed files with 29 additions and 1 deletions

View file

@ -146,7 +146,7 @@ public final class RecordIOReader implements Closeable {
*/
int seekToStartOfEventInBlock() {
// Already consumed all the bytes in this block.
if (currentBlock.position() == currentBlockSizeReadFromChannel){
if (currentBlock.position() >= currentBlockSizeReadFromChannel){
return -1;
}
while (true) {

View file

@ -115,6 +115,34 @@ public class DeadLetterQueueReaderTest {
manager.close();
}
// This test checks that polling after a block has been mostly filled with an event is handled correctly.
@Test
public void testRereadFinalBlock() throws Exception {
Event event = new Event(Collections.emptyMap());
// Fill event with not quite enough characters to fill block. Fill event with valid RecordType characters - this
// was the cause of https://github.com/elastic/logstash/issues/7868
char[] field = new char[32500];
Arrays.fill(field, 's');
event.setField("message", new String(field));
long startTime = System.currentTimeMillis();
int messageSize = 0;
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10 * 1024 * 1024, 1_000_000_000)) {
for (int i = 0; i < 2; i++) {
DLQEntry entry = new DLQEntry(event, "", "", "", new Timestamp(startTime++));
messageSize += entry.serialize().length;
writeManager.writeEntry(entry);
}
}
try (DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
for (int i = 0; i < 3;i++) {
readManager.pollEntry(100);
}
}
}
@Test
public void testSeek() throws Exception {
Event event = new Event(Collections.emptyMap());