diff --git a/logstash-core/src/main/java/org/logstash/DLQEntry.java b/logstash-core/src/main/java/org/logstash/DLQEntry.java index 786249015..fd44dc534 100644 --- a/logstash-core/src/main/java/org/logstash/DLQEntry.java +++ b/logstash-core/src/main/java/org/logstash/DLQEntry.java @@ -112,4 +112,15 @@ public class DLQEntry implements Cloneable, Serializable, Queueable { public Timestamp getEntryTime() { return entryTime; } + + @Override + public String toString() { + return "DLQEntry{" + + "event=" + event + + ", pluginType='" + pluginType + '\'' + + ", pluginId='" + pluginId + '\'' + + ", reason='" + reason + '\'' + + ", entryTime=" + entryTime + + '}'; + } } diff --git a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java index 9df8271c6..a21f60a32 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/RecordIOReader.java @@ -43,6 +43,7 @@ public class RecordIOReader { private int currentBlockSizeReadFromChannel; private final Path path; private long channelPosition; + private static final int UNSET = -1; public RecordIOReader(Path path) throws IOException { this.path = path; @@ -74,7 +75,7 @@ public class RecordIOReader { } public byte[] seekToNextEventPosition(T target, Function keyExtractor, Comparator keyComparator) throws IOException { - int matchingBlock; + int matchingBlock = UNSET; int lowBlock = 0; int highBlock = (int) (channel.size() - VERSION_SIZE) / BLOCK_SIZE; @@ -96,7 +97,9 @@ public class RecordIOReader { break; } } - matchingBlock = lowBlock; + if (matchingBlock == UNSET) { + matchingBlock = lowBlock; + } // now sequential scan to event seekToBlock(matchingBlock);