MINOR: Fix incorrect use of blocking queue for the pipeline's signal queue

Fixes #7675
This commit is contained in:
Armin 2017-07-13 15:33:20 +02:00 committed by Armin Braun
parent 33945337ba
commit c741c836a7

View file

@ -205,7 +205,7 @@ module LogStash; class Pipeline < BasePipeline
@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
@signal_queue = Queue.new
@signal_queue = java.util.concurrent.LinkedBlockingQueue.new
# Note that @inflight_batches as a central mechanism for tracking inflight
# batches will fail if we have multiple read clients here.
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
@ -449,7 +449,7 @@ module LogStash; class Pipeline < BasePipeline
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
while true
signal = @signal_queue.empty? ? NO_SIGNAL : @signal_queue.pop
signal = @signal_queue.poll || NO_SIGNAL
shutdown_requested |= signal.shutdown? # latch on shutdown signal
batch = @filter_queue_client.read_batch # metrics are started in read_batch
@ -630,7 +630,7 @@ module LogStash; class Pipeline < BasePipeline
# Each worker thread will receive this exactly once!
@worker_threads.each do |t|
@logger.debug("Pushing shutdown", default_logging_keys(:thread => t.inspect))
@signal_queue.push(SHUTDOWN)
@signal_queue.put(SHUTDOWN)
end
@worker_threads.each do |t|
@ -681,7 +681,7 @@ module LogStash; class Pipeline < BasePipeline
def flush
if @flushing.compare_and_set(false, true)
@logger.debug? && @logger.debug("Pushing flush onto pipeline", default_logging_keys)
@signal_queue.push(FLUSH)
@signal_queue.put(FLUSH)
end
end