DeadLetterQueueReader should handle missing segment files at startup

Change DeadLetterQueueReader, so that if a missing segment file is
encountered at startup, the next valid entry will be used instead

Fixes #7433

Fixes #7457

Fixes #8396
This commit is contained in:
Rob Bavey 2017-06-05 16:40:58 -04:00
parent 63f8f86bde
commit ac6feed21d
3 changed files with 63 additions and 4 deletions

View file

@ -25,6 +25,7 @@ import org.logstash.DLQEntry;
import org.logstash.Timestamp;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
@ -48,7 +49,7 @@ public final class DeadLetterQueueReader implements Closeable {
private final ConcurrentSkipListSet<Path> segments;
private final WatchService watchService;
public DeadLetterQueueReader(Path queuePath) throws Exception {
public DeadLetterQueueReader(Path queuePath) throws IOException {
this.queuePath = queuePath;
this.watchService = FileSystems.getDefault().newWatchService();
this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
@ -131,8 +132,18 @@ public final class DeadLetterQueueReader implements Closeable {
}
public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException {
currentReader = new RecordIOReader(segmentPath);
currentReader.seekToOffset(position);
// If the provided segment Path exist, then set the reader to start from the supplied position
if (Files.exists(segmentPath)) {
currentReader = new RecordIOReader(segmentPath);
currentReader.seekToOffset(position);
}else{
// Otherwise, set the current reader to be at the beginning of the next
// segment.
Path next = segments.higher(segmentPath);
if (next != null){
currentReader = new RecordIOReader(next);
}
}
}
public Path getCurrentSegment() {

View file

@ -40,6 +40,7 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
public class DeadLetterQueueReaderTest {
private Path dir;
@ -153,6 +154,53 @@ public class DeadLetterQueueReaderTest {
}
@Test
public void testSeekToStartOfRemovedLog() throws Exception {
writeSegmentSizeEntries(3);
Path startLog = dir.resolve("1.log");
validateEntries(startLog, 1, 3, 1);
startLog.toFile().delete();
validateEntries(startLog, 2, 3, 1);
}
@Test
public void testSeekToMiddleOfRemovedLog() throws Exception {
writeSegmentSizeEntries(3);
Path startLog = dir.resolve("1.log");
startLog.toFile().delete();
validateEntries(startLog, 2, 3, 32);
}
private void writeSegmentSizeEntries(int count) throws IOException {
Event event = new Event(Collections.emptyMap());
DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0));
int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE;
DeadLetterQueueWriter writeManager = null;
try {
writeManager = new DeadLetterQueueWriter(dir, size, 10000000);
for (int i = 1; i <= count; i++) {
writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i)));
}
} finally {
writeManager.close();
}
}
private void validateEntries(Path firstLog, int startEntry, int endEntry, int startPosition) throws IOException, InterruptedException {
DeadLetterQueueReader readManager = null;
try {
readManager = new DeadLetterQueueReader(dir);
readManager.setCurrentReaderAndPosition(firstLog, startPosition);
for (int i = startEntry; i <= endEntry; i++) {
DLQEntry readEntry = readManager.pollEntry(100);
assertThat(readEntry.getReason(), equalTo(String.valueOf(i)));
}
} finally {
readManager.close();
}
}
// Notes on these tests:
// These tests are designed to test specific edge cases where events end at block boundaries, hence the specific
// sizes of the char arrays being used to pad the events

View file

@ -116,7 +116,7 @@ describe "Test Monitoring API" do
logging_put_assert logstash_service.monitoring_api.logging_put({"logger." => "INFO"})
logging_get_assert logstash_service, "INFO", "TRACE"
#package logger
#package logger
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "DEBUG"})
expect(logstash_service.monitoring_api.logging_get["loggers"]["logstash.agent"]).to eq ("DEBUG")
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "INFO"})