Remove closed DeadLetterQueueWriters from 'Factory'

Fix bug where reloading a pipeline would close the DLQWriter, but
leave the closed version cached in the factory object, stopping
the reloaded pipeline from being able to write to the dead letter
queue.

Fixes #7840
This commit is contained in:
Rob Bavey 2017-07-28 17:22:18 -04:00
parent 21503af882
commit 2a235a5f04
3 changed files with 51 additions and 16 deletions

View file

@ -93,6 +93,13 @@ module LogStash; class BasePipeline
end end
end end
def close_dlq_writer
@dlq_writer.close
if settings.get_value("dead_letter_queue.enable")
DeadLetterQueueFactory.release(pipeline_id)
end
end
def compile_lir def compile_lir
sources_with_metadata = [ sources_with_metadata = [
SourceWithMetadata.new("str", "pipeline", 0, 0, self.config_str) SourceWithMetadata.new("str", "pipeline", 0, 0, self.config_str)
@ -340,7 +347,7 @@ module LogStash; class Pipeline < BasePipeline
def close def close
@filter_queue_client.close @filter_queue_client.close
@queue.close @queue.close
@dlq_writer.close close_dlq_writer
end end
def transition_to_running def transition_to_running

View file

@ -56,13 +56,19 @@ public class DeadLetterQueueFactory {
* @return The write manager for the specific id's dead-letter-queue context * @return The write manager for the specific id's dead-letter-queue context
*/ */
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize) { public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize) {
return REGISTRY.computeIfAbsent(id, k -> { return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize));
try { }
return new DeadLetterQueueWriter(Paths.get(dlqPath, k), MAX_SEGMENT_SIZE_BYTES, maxQueueSize);
} catch (IOException e) { public static DeadLetterQueueWriter release(String id) {
logger.error("unable to create dead letter queue writer", e); return REGISTRY.remove(id);
} }
return null;
}); private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize) {
try {
return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize);
} catch (IOException e) {
logger.error("unable to create dead letter queue writer", e);
}
return null;
} }
} }

View file

@ -32,6 +32,7 @@ import static junit.framework.TestCase.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class DeadLetterQueueFactoryTest { public class DeadLetterQueueFactoryTest {
public static final String PIPELINE_NAME = "pipelineA";
private Path dir; private Path dir;
@Rule @Rule
@ -43,12 +44,33 @@ public class DeadLetterQueueFactoryTest {
} }
@Test @Test
public void test() throws IOException { public void testSameBeforeRelease() throws IOException {
Path pipelineA = dir.resolve("pipelineA"); try {
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000); Path pipelineA = dir.resolve(PIPELINE_NAME);
assertTrue(writer.isOpen()); DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000); assertTrue(writer.isOpen());
assertSame(writer, writer2); DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
writer.close(); assertSame(writer, writer2);
writer.close();
} finally {
DeadLetterQueueFactory.release(PIPELINE_NAME);
}
} }
@Test
public void testOpenableAfterRelease() throws IOException {
try {
Path pipelineA = dir.resolve(PIPELINE_NAME);
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
writer.close();
DeadLetterQueueFactory.release(PIPELINE_NAME);
writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
writer.close();
}finally{
DeadLetterQueueFactory.release(PIPELINE_NAME);
}
}
} }