diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index a728c5b07..72f818946 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -205,7 +205,7 @@ module LogStash; class Pipeline < BasePipeline @input_queue_client = @queue.write_client @filter_queue_client = @queue.read_client - @signal_queue = Queue.new + @signal_queue = java.util.concurrent.LinkedBlockingQueue.new # Note that @inflight_batches as a central mechanism for tracking inflight # batches will fail if we have multiple read clients here. @filter_queue_client.set_events_metric(metric.namespace([:stats, :events])) @@ -449,7 +449,7 @@ module LogStash; class Pipeline < BasePipeline @filter_queue_client.set_batch_dimensions(batch_size, batch_delay) while true - signal = @signal_queue.empty? ? NO_SIGNAL : @signal_queue.pop + signal = @signal_queue.poll || NO_SIGNAL shutdown_requested |= signal.shutdown? # latch on shutdown signal batch = @filter_queue_client.read_batch # metrics are started in read_batch @@ -630,7 +630,7 @@ module LogStash; class Pipeline < BasePipeline # Each worker thread will receive this exactly once! @worker_threads.each do |t| @logger.debug("Pushing shutdown", default_logging_keys(:thread => t.inspect)) - @signal_queue.push(SHUTDOWN) + @signal_queue.put(SHUTDOWN) end @worker_threads.each do |t| @@ -681,7 +681,7 @@ module LogStash; class Pipeline < BasePipeline def flush if @flushing.compare_and_set(false, true) @logger.debug? && @logger.debug("Pushing flush onto pipeline", default_logging_keys) - @signal_queue.push(FLUSH) + @signal_queue.put(FLUSH) end end