mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
c0047586bb
commit
d67b06da6b
2 changed files with 41 additions and 59 deletions
|
@ -79,7 +79,7 @@ public final class RecordIOReader implements Closeable {
|
||||||
int matchingBlock = UNSET;
|
int matchingBlock = UNSET;
|
||||||
int lowBlock = 0;
|
int lowBlock = 0;
|
||||||
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
|
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
|
||||||
|
|
||||||
while (lowBlock < highBlock) {
|
while (lowBlock < highBlock) {
|
||||||
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
|
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
|
||||||
seekToBlock(middle);
|
seekToBlock(middle);
|
||||||
|
|
|
@ -29,6 +29,7 @@ import org.logstash.Event;
|
||||||
import org.logstash.Timestamp;
|
import org.logstash.Timestamp;
|
||||||
import org.logstash.ackedqueue.StringElement;
|
import org.logstash.ackedqueue.StringElement;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
|
@ -113,84 +114,65 @@ public class DeadLetterQueueReaderTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSeek() throws Exception {
|
public void testSeek() throws Exception {
|
||||||
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
|
||||||
Event event = new Event(Collections.emptyMap());
|
Event event = new Event(Collections.emptyMap());
|
||||||
Timestamp target = null;
|
|
||||||
long currentEpoch = System.currentTimeMillis();
|
long currentEpoch = System.currentTimeMillis();
|
||||||
for (int i = 0; i < 1000; i++){
|
int TARGET_EVENT = 543;
|
||||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
|
||||||
writeManager.writeEntry(entry);
|
|
||||||
if (i == 543) {
|
|
||||||
target = entry.getEntryTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
writeEntries(event, 0, 1000, currentEpoch);
|
||||||
writeManager.close();
|
seekReadAndVerify(new Timestamp(currentEpoch + TARGET_EVENT),
|
||||||
|
String.valueOf(TARGET_EVENT));
|
||||||
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
|
|
||||||
readManager.seekToNextEvent(target);
|
|
||||||
DLQEntry entry = readManager.pollEntry(100);
|
|
||||||
assertThat(entry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
|
|
||||||
assertThat(entry.getReason(), equalTo("543"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
|
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
|
||||||
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
int FIRST_WRITE_EVENT_COUNT = 100;
|
||||||
|
int SECOND_WRITE_EVENT_COUNT = 100;
|
||||||
|
int OFFSET = 200;
|
||||||
|
|
||||||
Event event = new Event(Collections.emptyMap());
|
Event event = new Event(Collections.emptyMap());
|
||||||
Timestamp target = null;
|
long startTime = System.currentTimeMillis();
|
||||||
long currentEpoch = System.currentTimeMillis();
|
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime);
|
||||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000);
|
||||||
writeManager.writeEntry(entry);
|
|
||||||
target = entry.getEntryTime();
|
|
||||||
}
|
|
||||||
writeManager.close();
|
|
||||||
|
|
||||||
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT),
|
||||||
|
String.valueOf(FIRST_WRITE_EVENT_COUNT));
|
||||||
for (int i = 200; i < 300; i++){
|
|
||||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
|
||||||
writeManager.writeEntry(entry);
|
|
||||||
}
|
|
||||||
writeManager.close();
|
|
||||||
|
|
||||||
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
|
|
||||||
readManager.seekToNextEvent(new Timestamp(target));
|
|
||||||
DLQEntry readEntry = readManager.pollEntry(100);
|
|
||||||
assertThat(readEntry.getReason(), equalTo("99"));
|
|
||||||
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testWriteStopBigWriteSeekByTimestamp() throws Exception {
|
public void testWriteStopBigWriteSeekByTimestamp() throws Exception {
|
||||||
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
int FIRST_WRITE_EVENT_COUNT = 100;
|
||||||
|
int SECOND_WRITE_EVENT_COUNT = 2000;
|
||||||
|
int OFFSET = 200;
|
||||||
|
|
||||||
Event event = new Event(Collections.emptyMap());
|
Event event = new Event(Collections.emptyMap());
|
||||||
Timestamp target = null;
|
long startTime = System.currentTimeMillis();
|
||||||
long currentEpoch = System.currentTimeMillis();
|
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime);
|
||||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000);
|
||||||
writeManager.writeEntry(entry);
|
|
||||||
target = entry.getEntryTime();
|
seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT),
|
||||||
|
String.valueOf(FIRST_WRITE_EVENT_COUNT));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception {
|
||||||
|
try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
|
||||||
|
readManager.seekToNextEvent(new Timestamp(seekTarget));
|
||||||
|
DLQEntry readEntry = readManager.pollEntry(100);
|
||||||
|
assertThat(readEntry.getReason(), equalTo(expectedValue));
|
||||||
|
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(seekTarget.toIso8601()));
|
||||||
}
|
}
|
||||||
writeManager.close();
|
}
|
||||||
|
|
||||||
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
|
private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException {
|
||||||
|
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) {
|
||||||
for (int i = 200; i < 3000; i++){
|
for (int i = offset; i <= offset + numberOfEvents; i++) {
|
||||||
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
|
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++));
|
||||||
writeManager.writeEntry(entry);
|
writeManager.writeEntry(entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
writeManager.close();
|
|
||||||
|
|
||||||
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
|
|
||||||
readManager.seekToNextEvent(new Timestamp(target));
|
|
||||||
DLQEntry readEntry = readManager.pollEntry(100);
|
|
||||||
assertThat(readEntry.getReason(), equalTo("99"));
|
|
||||||
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue