Remove closed DeadLetterQueueWriters from 'Factory'

Fix bug where reloading a pipeline would close the DLQWriter, but
leave the closed version cached in the factory object, stopping
the reloaded pipeline from being able to write to the dead letter
queue.

Fixes #7840
This commit is contained in:
Rob Bavey 2017-07-28 17:22:18 -04:00
parent cb2f01b4b6
commit 860fd2c5e6
3 changed files with 51 additions and 16 deletions

View file

@ -93,6 +93,13 @@ module LogStash; class BasePipeline
end
end
def close_dlq_writer
@dlq_writer.close
if settings.get_value("dead_letter_queue.enable")
DeadLetterQueueFactory.release(pipeline_id)
end
end
def compile_lir
sources_with_metadata = [
SourceWithMetadata.new("str", "pipeline", 0, 0, self.config_str)
@ -339,7 +346,7 @@ module LogStash; class Pipeline < BasePipeline
def close
@filter_queue_client.close
@queue.close
@dlq_writer.close
close_dlq_writer
end
def transition_to_running

View file

@ -56,13 +56,19 @@ public class DeadLetterQueueFactory {
* @return The write manager for the specific id's dead-letter-queue context
*/
public static DeadLetterQueueWriter getWriter(String id, String dlqPath, long maxQueueSize) {
return REGISTRY.computeIfAbsent(id, k -> {
try {
return new DeadLetterQueueWriter(Paths.get(dlqPath, k), MAX_SEGMENT_SIZE_BYTES, maxQueueSize);
} catch (IOException e) {
logger.error("unable to create dead letter queue writer", e);
}
return null;
});
return REGISTRY.computeIfAbsent(id, key -> newWriter(key, dlqPath, maxQueueSize));
}
public static DeadLetterQueueWriter release(String id) {
return REGISTRY.remove(id);
}
private static DeadLetterQueueWriter newWriter(final String id, final String dlqPath, final long maxQueueSize) {
try {
return new DeadLetterQueueWriter(Paths.get(dlqPath, id), MAX_SEGMENT_SIZE_BYTES, maxQueueSize);
} catch (IOException e) {
logger.error("unable to create dead letter queue writer", e);
}
return null;
}
}

View file

@ -32,6 +32,7 @@ import static junit.framework.TestCase.assertSame;
import static org.junit.Assert.assertTrue;
public class DeadLetterQueueFactoryTest {
public static final String PIPELINE_NAME = "pipelineA";
private Path dir;
@Rule
@ -43,12 +44,33 @@ public class DeadLetterQueueFactoryTest {
}
@Test
public void test() throws IOException {
Path pipelineA = dir.resolve("pipelineA");
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter("pipelineA", pipelineA.toString(), 10000);
assertSame(writer, writer2);
writer.close();
public void testSameBeforeRelease() throws IOException {
try {
Path pipelineA = dir.resolve(PIPELINE_NAME);
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
DeadLetterQueueWriter writer2 = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertSame(writer, writer2);
writer.close();
} finally {
DeadLetterQueueFactory.release(PIPELINE_NAME);
}
}
@Test
public void testOpenableAfterRelease() throws IOException {
try {
Path pipelineA = dir.resolve(PIPELINE_NAME);
DeadLetterQueueWriter writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
writer.close();
DeadLetterQueueFactory.release(PIPELINE_NAME);
writer = DeadLetterQueueFactory.getWriter(PIPELINE_NAME, pipelineA.toString(), 10000);
assertTrue(writer.isOpen());
writer.close();
}finally{
DeadLetterQueueFactory.release(PIPELINE_NAME);
}
}
}