Backport PR#14605 to 7.17: Fix DLQ fails to start due to read 1 byte file (#14752)

Note this is a manual cherry-pick backport, as it did not backport cleanly. This backport includes some changed/additional code to
the original PR:

* Added additional null check in seekToNextEvent that was previously not present in 7.17, but is required for this PR
* Filter is added, but surrounding code is slightly different, but the intent is the same.

**Backport PR #14605 to 8.5 branch, original message:**

---

<!-- Type of change
Please label this PR with the release version and one of the following labels, depending on the scope of your change:
- bug
- enhancement
- breaking change
- doc
-->

<!-- Add content to appear in  [Release Notes](https://www.elastic.co/guide/en/logstash/current/releasenotes.html), or add [rn:skip] to leave this PR out of release notes -->

Fix DLQ fails to start due to read 1 byte file

<!-- Mandatory
Explain here the changes you made on the PR. Please explain the WHAT: patterns used, algorithms implemented, design architecture, message processing, etc.

Example:
  Expose 'xpack.monitoring.elasticsearch.proxy' in the docker environment variables and update logstash.yml to surface this config option.

  This commit exposes the 'xpack.monitoring.elasticsearch.proxy' variable in the docker by adding it in env2yaml.go, which translates from
  being an environment variable to a proper yaml config.

  Additionally, this PR exposes this setting for both xpack monitoring & management to the logstash.yml file.
-->

This commit ignores DLQ files that contain only the version number. These files have no content and should be skipped.

Mapping 1 byte DLQ files to buffer causes java.lang.IllegalArgumentException: newPosition < 0: (-1 < 0)
User is unable to start the pipeline using dead_letter_queue input

<!-- Mandatory
Explain here the WHY or the IMPACT to the user, or the rationale/motivation for the changes.

Example:
  This PR fixes an issue that was preventing the docker image from using the proxy setting when sending xpack monitoring information.
  and/or
  This PR now allows the user to define the xpack monitoring proxy setting in the docker container.
-->

<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved

List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->

- [ ] My code follows the style guidelines of this project
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
- [ ] I have added tests that prove my fix is effective or that my feature works

<!-- Recommended
Add a checklist of things that are required to be reviewed in order to have the PR approved
-->
- [x] manually test the 1 byte file and can start Logstash

<!-- Recommended
Explain here how this PR will be tested by the reviewer: commands, dependencies, steps, etc.
-->

follow the reproducer of #14599

<!-- Recommended
Link related issues below. Insert the issue link or reference after the word "Closes" if merging this should automatically close it.

- Closes #123
- Relates #123
- Requires #123
- Superseeds #123
-->
- Fixed: #14599

<!-- Recommended
Explain here the different behaviors that this PR introduces or modifies in this project, user roles, environment configuration, etc.

If you are familiar with Gherkin test scenarios, we recommend its usage: https://cucumber.io/docs/gherkin/reference/
-->

<!-- Optional
Add here screenshots about how the project will be changed after the PR is applied. They could be related to web pages, terminal, etc, or any other image you consider important to be shared with the team.
-->

<!-- Recommended
Paste here output logs discovered while creating this PR, such as stack traces or integration logs, or any other output you consider important to be shared with the team.
-->
This commit is contained in:
Rob Bavey 2022-11-16 09:21:00 -05:00 committed by GitHub
parent a6db1b0db5
commit 69ce6ebdb8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 3 deletions

View file

@ -78,7 +78,9 @@ public final class DeadLetterQueueReader implements Closeable {
return id.apply(p1).compareTo(id.apply(p2));
});
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
segments.addAll(getSegmentPaths(queuePath)
.filter(p -> p.toFile().length() > 1) // take the files that have content to process
.collect(Collectors.toList()));
}
public void seekToNextEvent(Timestamp timestamp) throws IOException {
@ -95,9 +97,11 @@ public final class DeadLetterQueueReader implements Closeable {
return;
}
}
if (currentReader != null) {
currentReader.close();
currentReader = null;
}
}
private long pollNewSegments(long timeout) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();

View file

@ -34,6 +34,7 @@ import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
@ -51,6 +52,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
@ -420,6 +422,22 @@ public class DeadLetterQueueReaderTest {
}
}
@Test
public void testSeekByTimestampWhenSegmentIs1Byte() throws IOException, InterruptedException {
final long startTime = System.currentTimeMillis();
Files.write(dir.resolve("1.log"), "1".getBytes());
try (DeadLetterQueueReader reader = new DeadLetterQueueReader(dir)) {
//Exercise
final Timestamp seekTarget = new Timestamp(startTime);
reader.seekToNextEvent(seekTarget);
// Verify, no entry is available, reader should seek without exception
DLQEntry readEntry = reader.pollEntry(100);
assertNull("No entry is available after all segments are deleted", readEntry);
}
}
@Test
public void testWriteReadRandomEventSize() throws Exception {
Event event = new Event(Collections.emptyMap());