mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
DeadLetterQueueReader should handle missing segment files at startup
Change DeadLetterQueueReader, so that if a missing segment file is encountered at startup, the next valid entry will be used instead Fixes #7433 Fixes #7457
This commit is contained in:
parent
240048b88d
commit
8da0737252
3 changed files with 63 additions and 4 deletions
|
@ -25,6 +25,7 @@ import org.logstash.DLQEntry;
|
||||||
import org.logstash.Timestamp;
|
import org.logstash.Timestamp;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.nio.file.FileSystems;
|
import java.nio.file.FileSystems;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.StandardWatchEventKinds;
|
import java.nio.file.StandardWatchEventKinds;
|
||||||
|
@ -48,7 +49,7 @@ public final class DeadLetterQueueReader implements Closeable {
|
||||||
private final ConcurrentSkipListSet<Path> segments;
|
private final ConcurrentSkipListSet<Path> segments;
|
||||||
private final WatchService watchService;
|
private final WatchService watchService;
|
||||||
|
|
||||||
public DeadLetterQueueReader(Path queuePath) throws Exception {
|
public DeadLetterQueueReader(Path queuePath) throws IOException {
|
||||||
this.queuePath = queuePath;
|
this.queuePath = queuePath;
|
||||||
this.watchService = FileSystems.getDefault().newWatchService();
|
this.watchService = FileSystems.getDefault().newWatchService();
|
||||||
this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
|
this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
|
||||||
|
@ -131,8 +132,18 @@ public final class DeadLetterQueueReader implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException {
|
public void setCurrentReaderAndPosition(Path segmentPath, long position) throws IOException {
|
||||||
currentReader = new RecordIOReader(segmentPath);
|
// If the provided segment Path exist, then set the reader to start from the supplied position
|
||||||
currentReader.seekToOffset(position);
|
if (Files.exists(segmentPath)) {
|
||||||
|
currentReader = new RecordIOReader(segmentPath);
|
||||||
|
currentReader.seekToOffset(position);
|
||||||
|
}else{
|
||||||
|
// Otherwise, set the current reader to be at the beginning of the next
|
||||||
|
// segment.
|
||||||
|
Path next = segments.higher(segmentPath);
|
||||||
|
if (next != null){
|
||||||
|
currentReader = new RecordIOReader(next);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Path getCurrentSegment() {
|
public Path getCurrentSegment() {
|
||||||
|
|
|
@ -40,6 +40,7 @@ import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
|
import static org.logstash.common.io.RecordIOWriter.BLOCK_SIZE;
|
||||||
|
import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
|
||||||
|
|
||||||
public class DeadLetterQueueReaderTest {
|
public class DeadLetterQueueReaderTest {
|
||||||
private Path dir;
|
private Path dir;
|
||||||
|
@ -153,6 +154,53 @@ public class DeadLetterQueueReaderTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSeekToStartOfRemovedLog() throws Exception {
|
||||||
|
writeSegmentSizeEntries(3);
|
||||||
|
Path startLog = dir.resolve("1.log");
|
||||||
|
validateEntries(startLog, 1, 3, 1);
|
||||||
|
startLog.toFile().delete();
|
||||||
|
validateEntries(startLog, 2, 3, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSeekToMiddleOfRemovedLog() throws Exception {
|
||||||
|
writeSegmentSizeEntries(3);
|
||||||
|
Path startLog = dir.resolve("1.log");
|
||||||
|
startLog.toFile().delete();
|
||||||
|
validateEntries(startLog, 2, 3, 32);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeSegmentSizeEntries(int count) throws IOException {
|
||||||
|
Event event = new Event(Collections.emptyMap());
|
||||||
|
DLQEntry templateEntry = new DLQEntry(event, "1", "1", String.valueOf(0));
|
||||||
|
int size = templateEntry.serialize().length + RecordIOWriter.RECORD_HEADER_SIZE + VERSION_SIZE;
|
||||||
|
DeadLetterQueueWriter writeManager = null;
|
||||||
|
try {
|
||||||
|
writeManager = new DeadLetterQueueWriter(dir, size, 10000000);
|
||||||
|
for (int i = 1; i <= count; i++) {
|
||||||
|
writeManager.writeEntry(new DLQEntry(event, "1", "1", String.valueOf(i)));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
writeManager.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void validateEntries(Path firstLog, int startEntry, int endEntry, int startPosition) throws IOException, InterruptedException {
|
||||||
|
DeadLetterQueueReader readManager = null;
|
||||||
|
try {
|
||||||
|
readManager = new DeadLetterQueueReader(dir);
|
||||||
|
readManager.setCurrentReaderAndPosition(firstLog, startPosition);
|
||||||
|
for (int i = startEntry; i <= endEntry; i++) {
|
||||||
|
DLQEntry readEntry = readManager.pollEntry(100);
|
||||||
|
assertThat(readEntry.getReason(), equalTo(String.valueOf(i)));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
readManager.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Notes on these tests:
|
// Notes on these tests:
|
||||||
// These tests are designed to test specific edge cases where events end at block boundaries, hence the specific
|
// These tests are designed to test specific edge cases where events end at block boundaries, hence the specific
|
||||||
// sizes of the char arrays being used to pad the events
|
// sizes of the char arrays being used to pad the events
|
||||||
|
|
|
@ -116,7 +116,7 @@ describe "Test Monitoring API" do
|
||||||
logging_put_assert logstash_service.monitoring_api.logging_put({"logger." => "INFO"})
|
logging_put_assert logstash_service.monitoring_api.logging_put({"logger." => "INFO"})
|
||||||
logging_get_assert logstash_service, "INFO", "TRACE"
|
logging_get_assert logstash_service, "INFO", "TRACE"
|
||||||
|
|
||||||
#package logger
|
#package logger
|
||||||
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "DEBUG"})
|
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "DEBUG"})
|
||||||
expect(logstash_service.monitoring_api.logging_get["loggers"]["logstash.agent"]).to eq ("DEBUG")
|
expect(logstash_service.monitoring_api.logging_get["loggers"]["logstash.agent"]).to eq ("DEBUG")
|
||||||
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "INFO"})
|
logging_put_assert logstash_service.monitoring_api.logging_put({"logger.logstash.agent" => "INFO"})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue