diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 084ffbe5e..2211ca86c 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -93,6 +93,13 @@ module LogStash; class BasePipeline end end + def close_dlq_writer + @dlq_writer.close + if settings.get_value("dead_letter_queue.enable") + DeadLetterQueueFactory.release(pipeline_id) + end + end + def compile_lir sources_with_metadata = [ SourceWithMetadata.new("str", "pipeline", 0, 0, self.config_str) @@ -340,7 +347,7 @@ module LogStash; class Pipeline < BasePipeline def close @filter_queue_client.close @queue.close - @dlq_writer.close + close_dlq_writer end def transition_to_running diff --git a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java index 19bbd737a..2a884b668 100644 --- a/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java +++ b/logstash-core/src/main/java/org/logstash/common/DeadLetterQueueFactory.java @@ -56,13 +56,19 @@ public class DeadLetterQueueFactory { * @return The write manager for the specific id's dead-letter-queue context */ public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize) { - return REGISTRY.computeIfAbsent(id, k -> { - try { - return new DeadLetterQueueWriter(Paths.get(dlqPath, k), MAX_SEGMENT_SIZE_BYTES, maxQueueSize); - } catch (IOException e) { - logger.error("unable to create dead letter queue writer", e); - } - return null; - }); + return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize)); + } + + public static DeadLetterQueueWriter release(String id) { + return REGISTRY.remove(id); + } + + private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize) { + try { + return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize); + } catch (IOException e) { + logger.error("unable to create dead letter queue writer", e); + } + return null; } } diff --git a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java index 3b63416fb..bbd2f2490 100644 --- a/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/common/DeadLetterQueueFactoryTest.java @@ -32,6 +32,7 @@ import static junit.framework.TestCase.assertSame; import static org.junit.Assert.assertTrue; public class DeadLetterQueueFactoryTest { + public static final String PIPELINE_NAME = "pipelineA"; private Path dir; @Rule @@ -43,12 +44,33 @@ public class DeadLetterQueueFactoryTest { } @Test - public void test() throws IOException { - Path pipelineA = dir.resolve("pipelineA"); - DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000); - assertTrue(writer.isOpen()); - DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000); - assertSame(writer, writer2); - writer.close(); + public void testSameBeforeRelease() throws IOException { + try { + Path pipelineA = dir.resolve(PIPELINE_NAME); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000); + assertTrue(writer.isOpen()); + DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000); + assertSame(writer, writer2); + writer.close(); + } finally { + DeadLetterQueueFactory.release(PIPELINE_NAME); + } } + + @Test + public void testOpenableAfterRelease() throws IOException { + try { + Path pipelineA = dir.resolve(PIPELINE_NAME); + DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000); + assertTrue(writer.isOpen()); + writer.close(); + DeadLetterQueueFactory.release(PIPELINE_NAME); + writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000); + assertTrue(writer.isOpen()); + writer.close(); + }finally{ + DeadLetterQueueFactory.release(PIPELINE_NAME); + } + } + } \ No newline at end of file