From 69ce6ebdb854e888c4de919bcd3f225452fa55b4 Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Wed, 16 Nov 2022 09:21:00 -0500 Subject: [PATCH] Backport PR#14605 to 7.17: Fix DLQ fails to start due to read 1 byte file (#14752) Note this is a manual cherry-pick backport, as it did not backport cleanly. This backport includes some changed/additional code to the original PR: * Added additional null check in seekToNextEvent that was previously not present in 7.17, but is required for this PR * Filter is added, but surrounding code is slightly different, but the intent is the same. **Backport PR #14605 to 8.5 branch, original message:** --- Fix DLQ fails to start due to read 1 byte file This commit ignores DLQ files that contain only the version number. These files have no content and should be skipped. Mapping 1 byte DLQ files to buffer causes java.lang.IllegalArgumentException: newPosition < 0: (-1 < 0) User is unable to start the pipeline using dead_letter_queue input - [ ] My code follows the style guidelines of this project - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] I have made corresponding change to the default configuration files (and/or docker env variables) - [ ] I have added tests that prove my fix is effective or that my feature works - [x] manually test the 1 byte file and can start Logstash follow the reproducer of #14599 - Fixed: #14599 --- .../common/io/DeadLetterQueueReader.java | 10 +++++++--- .../common/io/DeadLetterQueueReaderTest.java | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java index 75110249a..452c4703b 100644 --- a/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java +++ b/logstash-core/src/main/java/org/logstash/common/io/DeadLetterQueueReader.java @@ -78,7 +78,9 @@ public final class DeadLetterQueueReader implements Closeable { return id.apply(p1).compareTo(id.apply(p2)); }); - segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList())); + segments.addAll(getSegmentPaths(queuePath) + .filter(p -> p.toFile().length() > 1) // take the files that have content to process + .collect(Collectors.toList())); } public void seekToNextEvent(Timestamp timestamp) throws IOException { @@ -95,8 +97,10 @@ public final class DeadLetterQueueReader implements Closeable { return; } } - currentReader.close(); - currentReader = null; + if (currentReader != null) { + currentReader.close(); + currentReader = null; + } } private long pollNewSegments(long timeout) throws IOException, InterruptedException { diff --git a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java index e6b85c962..fa24402e6 100644 --- a/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java +++ b/logstash-core/src/test/java/org/logstash/common/io/DeadLetterQueueReaderTest.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; @@ -51,6 +52,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE; import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE; import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE; @@ -420,6 +422,22 @@ public class DeadLetterQueueReaderTest { } } + @Test + public void testSeekByTimestampWhenSegmentIs1Byte() throws IOException, InterruptedException { + final long startTime = System.currentTimeMillis(); + Files.write(dir.resolve("1.log"), "1".getBytes()); + + try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) { + + //Exercise + final Timestamp seekTarget = new Timestamp(startTime); + reader.seekToNextEvent(seekTarget); + + // Verify, no entry is available, reader should seek without exception + DLQEntry readEntry = reader.pollEntry(100); + assertNull("No entry is available after all segments are deleted", readEntry); + } + } @Test public void testWriteReadRandomEventSize() throws Exception { Event event = new Event(Collections.emptyMap());