From d67b06da6bd0008bfaa5493b509943b9d3c31c76 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Mon, 24 Jul 2017 14:14:56 -0400 Subject: [PATCH] Update based on code review comments Test cleanup Fixes #7789 --- .../logstash/common/io/RecordIOReader.java | 2 +- .../common/io/DeadLetterQueueReaderTest.java | 98 ++++++++----------- 2 files changed, 41 insertions(+), 59 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index e4a073af6..c867dfdf1 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -79,7 +79,7 @@ public final class RecordIOReader implements Closeable { int matchingBlock = UNSET; int lowBlock = 0; int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE; - + while (lowBlock < highBlock) { int middle = (int) Math.ceil((highBlock + lowBlock) / 2.0); seekToBlock(middle); diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index d9e6387c6..3f477d825 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -29,6 +29,7 @@ import org.logstash.Event; import org.logstash.Timestamp; import org.logstash.ackedqueue.StringElement; +import java.io.IOException; import java.nio.file.Path; import java.util.Collections; @@ -113,84 +114,65 @@ public class DeadLetterQueueReaderTest { @Test public void testSeek() throws Exception { - DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); Event event = new Event(Collections.emptyMap()); - Timestamp target = null; long currentEpoch = System.currentTimeMillis(); - for (int i = 0; i < 1000; i++){ - DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); - writeManager.writeEntry(entry); - if (i == 543) { - target = entry.getEntryTime(); - } + int TARGET_EVENT = 543; - } - writeManager.close(); - - DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir); - readManager.seekToNextEvent(target); - DLQEntry entry = readManager.pollEntry(100); - assertThat(entry.getEntryTime().toIso8601(), equalTo(target.toIso8601())); - assertThat(entry.getReason(), equalTo("543")); + writeEntries(event, 0, 1000, currentEpoch); + seekReadAndVerify(new Timestamp(currentEpoch + TARGET_EVENT), + String.valueOf(TARGET_EVENT)); } @Test public void testWriteStopSmallWriteSeekByTimestamp() throws Exception { - DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); + int FIRST_WRITE_EVENT_COUNT = 100; + int SECOND_WRITE_EVENT_COUNT = 100; + int OFFSET = 200; + Event event = new Event(Collections.emptyMap()); - Timestamp target = null; - long currentEpoch = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); - for (int i = 0; i < 100; i++) { - DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); - writeManager.writeEntry(entry); - target = entry.getEntryTime(); - } - writeManager.close(); + writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime); + writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000); - writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); - - for (int i = 200; i < 300; i++){ - DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); - writeManager.writeEntry(entry); - } - writeManager.close(); - - DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir); - readManager.seekToNextEvent(new Timestamp(target)); - DLQEntry readEntry = readManager.pollEntry(100); - assertThat(readEntry.getReason(), equalTo("99")); - assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601())); + seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT), + String.valueOf(FIRST_WRITE_EVENT_COUNT)); } @Test public void testWriteStopBigWriteSeekByTimestamp() throws Exception { - DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); + int FIRST_WRITE_EVENT_COUNT = 100; + int SECOND_WRITE_EVENT_COUNT = 2000; + int OFFSET = 200; + Event event = new Event(Collections.emptyMap()); - Timestamp target = null; - long currentEpoch = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); - for (int i = 0; i < 100; i++) { - DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); - writeManager.writeEntry(entry); - target = entry.getEntryTime(); + writeEntries(event, 0, FIRST_WRITE_EVENT_COUNT, startTime); + writeEntries(event, OFFSET, SECOND_WRITE_EVENT_COUNT, startTime + 1_000); + + seekReadAndVerify(new Timestamp(startTime + FIRST_WRITE_EVENT_COUNT), + String.valueOf(FIRST_WRITE_EVENT_COUNT)); + } + + + private void seekReadAndVerify(final Timestamp seekTarget, final String expectedValue) throws Exception { + try(DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir)) { + readManager.seekToNextEvent(new Timestamp(seekTarget)); + DLQEntry readEntry = readManager.pollEntry(100); + assertThat(readEntry.getReason(), equalTo(expectedValue)); + assertThat(readEntry.getEntryTime().toIso8601(), equalTo(seekTarget.toIso8601())); } - writeManager.close(); + } - writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000); - - for (int i = 200; i < 3000; i++){ - DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(currentEpoch++)); - writeManager.writeEntry(entry); + private void writeEntries(final Event event, int offset, final int numberOfEvents, long startTime) throws IOException { + try(DeadLetterQueueWriter writeManager = new DeadLetterQueueWriter(dir, 10000000, 10000000)) { + for (int i = offset; i <= offset + numberOfEvents; i++) { + DLQEntry entry = new DLQEntry(event, "foo", "bar", String.valueOf(i), new Timestamp(startTime++)); + writeManager.writeEntry(entry); + } } - writeManager.close(); - - DeadLetterQueueReader readManager = new DeadLetterQueueReader(dir); - readManager.seekToNextEvent(new Timestamp(target)); - DLQEntry readEntry = readManager.pollEntry(100); - assertThat(readEntry.getReason(), equalTo("99")); - assertThat(readEntry.getEntryTime().toIso8601(), equalTo(target.toIso8601())); } }