#7990 bring back batch acking for PQ

Fixes #7991
This commit is contained in:
Armin 2017-08-14 16:50:20 +02:00 committed by Armin Braun
parent 858500c02d
commit 8a9a1f28bf

View file

@ -477,7 +477,10 @@ module LogStash; class Pipeline < BasePipeline
filter_batch(batch)
end
flush_filters_to_batch(batch, :final => false) if signal.flush?
output_batch(batch) if batch.size > 0
if batch.size > 0
output_batch(batch)
@filter_queue_client.close_batch(batch)
end
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if (shutdown_requested && !draining_queue?)
end