Add check to stop messages looping through DLQ

Add check to stop events from being written to the DLQ that
have previously been written to the DLQ

Fixes #7838
This commit is contained in:
Rob Bavey 2017-07-28 12:36:56 -04:00
parent caef6fdd6f
commit 526b28159b
2 changed files with 50 additions and 16 deletions

View file

@ -45,6 +45,7 @@ public final class DeadLetterQueueWriter implements Closeable {
static final String SEGMENT_FILE_PATTERN = "%d.log";
static final String LOCK_FILE = ".lock";
public static final String DEAD_LETTER_QUEUE_METADATA_KEY = "dead_letter_queue";
private final long maxSegmentSize;
private final long maxQueueSize;
private LongAdder currentQueueSize;
@ -132,6 +133,12 @@ public final class DeadLetterQueueWriter implements Closeable {
}
private void innerWriteEntry(DLQEntry entry) throws IOException {
Event event = entry.getEvent();
if (alreadyProcessed(event)) {
logger.warn("Event previously submitted to dead letter queue. Skipping...");
return;
}
byte[] record = entry.serialize();
int eventPayloadSize = RECORD_HEADER_SIZE + record.length;
if (currentQueueSize.longValue() + eventPayloadSize > maxQueueSize) {
@ -144,6 +151,18 @@ public final class DeadLetterQueueWriter implements Closeable {
currentQueueSize.add(currentWriter.writeEvent(record));
}
/**
* Method to determine whether the event has already been processed by the DLQ - currently this
* just checks the metadata to see if metadata has been added to the event that indicates that
* it has already gone through the DLQ.
* TODO: Add metadata around 'depth' to enable >1 iteration through the DLQ if required.
* @param event Logstash Event
* @return boolean indicating whether the event is eligible to be added to the DLQ
*/
private boolean alreadyProcessed(final Event event) {
return event.getMetadata() != null && event.getMetadata().containsKey(DEAD_LETTER_QUEUE_METADATA_KEY);
}
@Override
public synchronized void close() throws IOException {
if (currentWriter != null) {

View file

@ -32,11 +32,11 @@ import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
@ -53,6 +53,8 @@ public class DeadLetterQueueWriterTest {
dir = temporaryFolder.newFolder().toPath();
}
private static long EMPTY_DLQ = VERSION_SIZE; // Only the version field has been written
@Test
public void testLockFileManagement() throws Exception {
Path lockFile = dir.resolve(".lock");
@ -99,38 +101,51 @@ public class DeadLetterQueueWriterTest {
writer.close();
}
@Test
public void testDoesNotWriteMessagesAlreadyRoutedThroughDLQ() throws Exception {
Event dlqEvent = new Event();
dlqEvent.setField("[@metadata][dead_letter_queue][plugin_type]", "dead_letter_queue");
DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason");
DLQEntry dlqEntry = new DLQEntry(dlqEvent, "type", "id", "reason");
try (DeadLetterQueueWriter writer = new DeadLetterQueueWriter(dir, 1000, 1000000);) {
writer.writeEntry(entry);
long dlqLengthAfterEvent = dlqLength();
assertThat(dlqLengthAfterEvent, is(not(EMPTY_DLQ)));
writer.writeEntry(dlqEntry);
assertThat(dlqLength(), is(dlqLengthAfterEvent));
}
}
@Test
public void testDoesNotWriteBeyondLimit() throws Exception {
DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason");
int payloadLength = RECORD_HEADER_SIZE + VERSION_SIZE + entry.serialize().length;
final int MESSAGE_COUNT= 5;
long queueLength = payloadLength * MESSAGE_COUNT;
long MAX_QUEUE_LENGTH = payloadLength * MESSAGE_COUNT;
DeadLetterQueueWriter writer = null;
try{
writer = new DeadLetterQueueWriter(dir, payloadLength, queueLength);
writer = new DeadLetterQueueWriter(dir, payloadLength, MAX_QUEUE_LENGTH);
for (int i = 0; i < MESSAGE_COUNT; i++)
writer.writeEntry(entry);
long size = Files.list(dir)
.filter(p -> p.toString().endsWith(".log"))
.mapToLong(p -> p.toFile().length())
.sum();
assertThat(size, is(queueLength));
assertThat(dlqLength(), is(MAX_QUEUE_LENGTH));
writer.writeEntry(entry);
size = Files.list(dir)
.filter(p -> p.toString().endsWith(".log"))
.mapToLong(p -> p.toFile().length())
.sum();
assertThat(size, is(queueLength));
assertThat(dlqLength(), is(MAX_QUEUE_LENGTH));
} finally {
if (writer != null) {
writer.close();
}
}
}
private long dlqLength() throws IOException {
return Files.list(dir)
.filter(p -> p.toString().endsWith(".log"))
.mapToLong(p -> p.toFile().length())
.sum();
}
}