Update based on code review comments

Test cleanup

Fixes #7789
This commit is contained in:
Rob Bavey 2017-07-24 14:14:56 -04:00
parent 9f7bed2498
commit c86a6f2923
2 changed files with 41 additions and 59 deletions

View file

@ -79,7 +79,7 @@ public final class RecordIOReader implements Closeable {
int matchingBlock = UNSET; int matchingBlock = UNSET;
int lowBlock = 0; int lowBlock = 0;
int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE; int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE;
while (lowBlock < highBlock) { while (lowBlock < highBlock) {
int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0); int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0);
seekToBlock(middle); seekToBlock(middle);

View file

@ -29,6 +29,7 @@ import org.logstash.Event;
import org.logstash.Timestamp; import org.logstash.Timestamp;
import org.logstash.ackedqueue.StringElement; import org.logstash.ackedqueue.StringElement;
import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Collections; import java.util.Collections;
@ -113,84 +114,65 @@ public class DeadLetterQueueReaderTest {
@Test @Test
public void testSeek() throws Exception { public void testSeek() throws Exception {
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000);
Event event = new Event(Collections.emptyMap()); Event event = new Event(Collections.emptyMap());
Timestamp target = null;
long currentEpoch = System.currentTimeMillis(); long currentEpoch = System.currentTimeMillis();
for (int i = 0; i < 1000; i++){ int TARGET_EVENT = 543;
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
writeManager.writeEntry(entry);
if (i == 543) {
target = entry.getEntryTime();
}
} writeEntries(event, 0, 1000, currentEpoch);
writeManager.close(); seekReadAndVerify(new Timestamp(currentEpoch + TARGET_EVENT),
String.valueOf(TARGET_EVENT));
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
readManager.seekToNextEvent(target);
DLQEntry entry = readManager.pollEntry(100);
assertThat(entry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
assertThat(entry.getReason(), equalTo("543"));
} }
@Test @Test
public void testWriteStopSmallWriteSeekByTimestamp() throws Exception { public void testWriteStopSmallWriteSeekByTimestamp() throws Exception {
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); int FIRST_WRITE_EVENT_COUNT = 100;
int SECOND_WRITE_EVENT_COUNT = 100;
int OFFSET = 200;
Event event = new Event(Collections.emptyMap()); Event event = new Event(Collections.emptyMap());
Timestamp target = null; long startTime = System.currentTimeMillis();
long currentEpoch = System.currentTimeMillis();
for (int i = 0; i < 100; i++) { writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime);
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000);
writeManager.writeEntry(entry);
target = entry.getEntryTime();
}
writeManager.close();
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT),
String.valueOf(FIRST_WRITE_EVENT_COUNT));
for (int i = 200; i < 300; i++){
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++));
writeManager.writeEntry(entry);
}
writeManager.close();
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
readManager.seekToNextEvent(new Timestamp(target));
DLQEntry readEntry = readManager.pollEntry(100);
assertThat(readEntry.getReason(), equalTo("99"));
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
} }
@Test @Test
public void testWriteStopBigWriteSeekByTimestamp() throws Exception { public void testWriteStopBigWriteSeekByTimestamp() throws Exception {
DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); int FIRST_WRITE_EVENT_COUNT = 100;
int SECOND_WRITE_EVENT_COUNT = 2000;
int OFFSET = 200;
Event event = new Event(Collections.emptyMap()); Event event = new Event(Collections.emptyMap());
Timestamp target = null; long startTime = System.currentTimeMillis();
long currentEpoch = System.currentTimeMillis();
for (int i = 0; i < 100; i++) { writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime);
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000);
writeManager.writeEntry(entry);
target = entry.getEntryTime(); seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT),
String.valueOf(FIRST_WRITE_EVENT_COUNT));
}
private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception {
try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) {
readManager.seekToNextEvent(new Timestamp(seekTarget));
DLQEntry readEntry = readManager.pollEntry(100);
assertThat(readEntry.getReason(), equalTo(expectedValue));
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(seekTarget.toIso8601()));
} }
writeManager.close(); }
writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException {
try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) {
for (int i = 200; i < 3000; i++){ for (int i = offset; i <= offset + numberOfEvents; i++) {
DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++));
writeManager.writeEntry(entry); writeManager.writeEntry(entry);
}
} }
writeManager.close();
DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir);
readManager.seekToNextEvent(new Timestamp(target));
DLQEntry readEntry = readManager.pollEntry(100);
assertThat(readEntry.getReason(), equalTo("99"));
assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601()));
} }
} }