Add dead_letter_queue.max_bytes setting

Add setting for dead_letter_queue.max_bytes to allow a user
to set the maximum possible size of a dead letter queue on disk.

Resolves #7633

Fixes #7638
This commit is contained in:
Rob Bavey 2017-07-10 23:44:06 -04:00
parent 892a2d90db
commit af96fa36a5
7 changed files with 56 additions and 7 deletions

View file

@ -159,7 +159,12 @@
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false
#
# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Deafault is 1024mb
# dead_letter_queue.max_bytes: 1024mb
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#

View file

@ -54,6 +54,7 @@ module LogStash
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Setting::TimeValue.new("slowlog.threshold.info", "-1"),
Setting::TimeValue.new("slowlog.threshold.debug", "-1"),

View file

@ -87,7 +87,7 @@ module LogStash; class BasePipeline
def dlq_writer
if settings.get_value("dead_letter_queue.enable")
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"))
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"), settings.get_value("dead_letter_queue.max_bytes"))
else
@dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
end

View file

@ -48,7 +48,7 @@ module LogStash; module Util
def self.get(pipeline_id)
if LogStash::SETTINGS.get("dead_letter_queue.enable")
return DeadLetterQueueWriter.new(
DeadLetterQueueFactory.getWriter(pipeline_id, LogStash::SETTINGS.get("path.dead_letter_queue")))
DeadLetterQueueFactory.getWriter(pipeline_id, LogStash::SETTINGS.get("path.dead_letter_queue"), LogStash::SETTINGS.get('dead_letter_queue.max_bytes')))
else
return DeadLetterQueueWriter.new(nil)
end

View file

@ -51,12 +51,14 @@ public class DeadLetterQueueFactory {
* @param id The identifier context for this dlq manager
* @param dlqPath The path to use for the queue's backing data directory. contains sub-directories
* for each id
* @param maxQueueSize Maximum size of the dead letter queue (in bytes). No entries will be written
* that would make the size of this dlq greater than this value
* @return The write manager for the specific id's dead-letter-queue context
*/
public static DeadLetterQueueWriter getWriter(String id, String dlqPath) {
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize) {
return REGISTRY.computeIfAbsent(id, k -> {
try {
return new DeadLetterQueueWriter(Paths.get(dlqPath, k), MAX_SEGMENT_SIZE_BYTES, Long.MAX_VALUE);
return new DeadLetterQueueWriter(Paths.get(dlqPath, k), MAX_SEGMENT_SIZE_BYTES, maxQueueSize);
} catch (IOException e) {
logger.error("unable to create dead letter queue writer", e);
}

View file

@ -45,9 +45,9 @@ public class DeadLetterQueueFactoryTest {
@Test
public void test() throws IOException {
Path pipelineA = dir.resolve("pipelineA");
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString());
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString());
DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000);
assertSame(writer, writer2);
writer.close();
}

View file

@ -26,15 +26,21 @@ import org.junit.rules.TemporaryFolder;
import org.logstash.DLQEntry;
import org.logstash.Event;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;
import static org.logstash.common.io.RecordIOWriter.VERSION_SIZE;
public class DeadLetterQueueWriterTest {
private Path dir;
@ -92,4 +98,39 @@ public class DeadLetterQueueWriterTest {
writer.writeEntry(entry);
writer.close();
}
@Test
public void testDoesNotWriteBeyondLimit() throws Exception {
DLQEntry entry = new DLQEntry(new Event(), "type", "id", "reason");
int payloadLength = RECORD_HEADER_SIZE + VERSION_SIZE + entry.serialize().length;
final int MESSAGE_COUNT= 5;
long queueLength = payloadLength * MESSAGE_COUNT;
DeadLetterQueueWriter writer = null;
try{
writer = new DeadLetterQueueWriter(dir, payloadLength, queueLength);
for (int i = 0; i < MESSAGE_COUNT; i++)
writer.writeEntry(entry);
long size = Files.list(dir)
.filter(p -> p.toString().endsWith(".log"))
.mapToLong(p -> p.toFile().length())
.sum();
assertThat(size, is(queueLength));
writer.writeEntry(entry);
size = Files.list(dir)
.filter(p -> p.toString().endsWith(".log"))
.mapToLong(p -> p.toFile().length())
.sum();
assertThat(size, is(queueLength));
} finally {
if (writer != null) {
writer.close();
}
}
}
}