diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 2211ca86c..d4b1ca2d7 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -230,7 +230,6 @@ module LogStash; class Pipeline < BasePipeline @ready = Concurrent::AtomicBoolean.new(false) @running = Concurrent::AtomicBoolean.new(false) @flushing = Concurrent::AtomicReference.new(false) - @force_shutdown = Concurrent::AtomicBoolean.new(false) @outputs_registered = Concurrent::AtomicBoolean.new(false) end # def initialize @@ -467,14 +466,9 @@ module LogStash; class Pipeline < BasePipeline filter_batch(batch) end flush_filters_to_batch(batch, :final => false) if signal.flush? - if batch.size > 0 - output_batch(batch) - unless @force_shutdown.true? # ack the current batch - @filter_queue_client.close_batch(batch) - end - end + output_batch(batch) if batch.size > 0 # keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown. - break if (shutdown_requested && !draining_queue?) || @force_shutdown.true? + break if (shutdown_requested && !draining_queue?) end # we are shutting down, queue is drained if it was required, now perform a final flush. @@ -482,15 +476,12 @@ module LogStash; class Pipeline < BasePipeline batch = @filter_queue_client.new_batch @filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here flush_filters_to_batch(batch, :final => true) - return if @force_shutdown.true? # Do not ack the current batch output_batch(batch) @filter_queue_client.close_batch(batch) end def filter_batch(batch) batch.each do |event| - return if @force_shutdown.true? - filter_func(event).each do |e| #these are both original and generated events batch.merge(e) unless e.cancelled? @@ -528,7 +519,6 @@ module LogStash; class Pipeline < BasePipeline # Now that we have our output to event mapping we can just invoke each output # once with its list of events output_events_map.each do |output, events| - return if @force_shutdown.true? output.multi_receive(events) end @@ -617,10 +607,6 @@ module LogStash; class Pipeline < BasePipeline @logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id) end # def shutdown - def force_shutdown! - @force_shutdown.make_true - end - def wait_for_workers @logger.debug("Closing inputs", default_logging_keys) @worker_threads.map(&:join) @@ -667,7 +653,6 @@ module LogStash; class Pipeline < BasePipeline flushers = options[:final] ? @shutdown_flushers : @periodic_flushers flushers.each do |flusher| - return if @force_shutdown.true? flusher.call(options, &block) end end @@ -709,8 +694,6 @@ module LogStash; class Pipeline < BasePipeline # @param options [Hash] def flush_filters_to_batch(batch, options = {}) flush_filters(options) do |event| - return if @force_shutdown.true? - unless event.cancelled? @logger.debug? and @logger.debug("Pushing flushed events", default_logging_keys(:event => event)) batch.merge(event)