mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Fix seek by Timestamp bugs
Fixes 2 seekByTimestamp failure scenarios that occur when a segment is closed with entries smaller than BLOCK_SIZE If the next segment is > BLOCK_SIZE, then the seek will advance to the next segment. (Skipping past all events in the closed segment) If the next segment < BLOCK_SIZE, then the seek will advance to the start of the small segment. (Processing entries prior to the timestamp) Fixes #7789
This commit is contained in:
parent
c8445ce246
commit
c0047586bb
2 changed files with 58 additions and 8 deletions
|
@ -79,11 +79,7 @@ public final class RecordIOReader implements Closeable {
|
|||
int matchingBlock = UNSET;
|
||||
int lowBlock = 0;
|
||||
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
|
||||
|
||||
if (highBlock == 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
while (lowBlock < highBlock) {
|
||||
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
|
||||
seekToBlock(middle);
|
||||
|
|
|
@ -134,9 +134,63 @@ public class DeadLetterQueueReaderTest {
|
|||
assertThat(entry.getReason(), equalTo("543"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testInvalidDirectory() throws Exception {
|
||||
DeadLetterQueueReader reader = new DeadLetterQueueReader(dir);
|
||||
assertThat(reader.pollEntry(100), is(nullValue()));
|
||||
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
|
||||
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
||||
Event event = new Event(Collections.emptyMap());
|
||||
Timestamp target = null;
|
||||
long currentEpoch = System.currentTimeMillis();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
||||
writeManager.writeEntry(entry);
|
||||
target = entry.getEntryTime();
|
||||
}
|
||||
writeManager.close();
|
||||
|
||||
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
||||
|
||||
for (int i = 200; i < 300; i++){
|
||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
||||
writeManager.writeEntry(entry);
|
||||
}
|
||||
writeManager.close();
|
||||
|
||||
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
|
||||
readManager.seekToNextEvent(new Timestamp(target));
|
||||
DLQEntry readEntry = readManager.pollEntry(100);
|
||||
assertThat(readEntry.getReason(), equalTo("99"));
|
||||
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteStopBigWriteSeekByTimestamp() throws Exception {
|
||||
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
||||
Event event = new Event(Collections.emptyMap());
|
||||
Timestamp target = null;
|
||||
long currentEpoch = System.currentTimeMillis();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
||||
writeManager.writeEntry(entry);
|
||||
target = entry.getEntryTime();
|
||||
}
|
||||
writeManager.close();
|
||||
|
||||
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
||||
|
||||
for (int i = 200; i < 3000; i++){
|
||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
||||
writeManager.writeEntry(entry);
|
||||
}
|
||||
writeManager.close();
|
||||
|
||||
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
|
||||
readManager.seekToNextEvent(new Timestamp(target));
|
||||
DLQEntry readEntry = readManager.pollEntry(100);
|
||||
assertThat(readEntry.getReason(), equalTo("99"));
|
||||
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue