From 240048b88d2bfca385384ff8c9897eea1caa2364 Mon Sep 17 00:00:00 2001 From: Armin Date: Mon, 14 Aug 2017 16:50:20 +0200 Subject: [PATCH] #7990 bring back batch acking for PQ Fixes #7991 --- logstash-core/lib/logstash/pipeline.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index e23b40742..1d0ec7f25 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -477,7 +477,10 @@ module LogStash; class Pipeline < BasePipeline filter_batch(batch) end flush_filters_to_batch(batch, :final => false) if signal.flush? - output_batch(batch) if batch.size > 0 + if batch.size > 0 + output_batch(batch) + @filter_queue_client.close_batch(batch) + end # keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown. break if (shutdown_requested && !draining_queue?) end