cleanup force shutdown code

Fixes #7955
This commit is contained in:
Joao Duarte 2017-08-09 11:35:33 +01:00 committed by João Duarte
parent d45b15a366
commit 044b356719

View file

@ -230,7 +230,6 @@ module LogStash; class Pipeline < BasePipeline
@ready = Concurrent::AtomicBoolean.new(false) @ready = Concurrent::AtomicBoolean.new(false)
@running = Concurrent::AtomicBoolean.new(false) @running = Concurrent::AtomicBoolean.new(false)
@flushing = Concurrent::AtomicReference.new(false) @flushing = Concurrent::AtomicReference.new(false)
@force_shutdown = Concurrent::AtomicBoolean.new(false)
@outputs_registered = Concurrent::AtomicBoolean.new(false) @outputs_registered = Concurrent::AtomicBoolean.new(false)
end # def initialize end # def initialize
@ -467,14 +466,9 @@ module LogStash; class Pipeline < BasePipeline
filter_batch(batch) filter_batch(batch)
end end
flush_filters_to_batch(batch, :final => false) if signal.flush? flush_filters_to_batch(batch, :final => false) if signal.flush?
if batch.size > 0 output_batch(batch) if batch.size > 0
output_batch(batch)
unless @force_shutdown.true? # ack the current batch
@filter_queue_client.close_batch(batch)
end
end
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown. # keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if (shutdown_requested && !draining_queue?) || @force_shutdown.true? break if (shutdown_requested && !draining_queue?)
end end
# we are shutting down, queue is drained if it was required, now perform a final flush. # we are shutting down, queue is drained if it was required, now perform a final flush.
@ -482,15 +476,12 @@ module LogStash; class Pipeline < BasePipeline
batch = @filter_queue_client.new_batch batch = @filter_queue_client.new_batch
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here @filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
flush_filters_to_batch(batch, :final => true) flush_filters_to_batch(batch, :final => true)
return if @force_shutdown.true? # Do not ack the current batch
output_batch(batch) output_batch(batch)
@filter_queue_client.close_batch(batch) @filter_queue_client.close_batch(batch)
end end
def filter_batch(batch) def filter_batch(batch)
batch.each do |event| batch.each do |event|
return if @force_shutdown.true?
filter_func(event).each do |e| filter_func(event).each do |e|
#these are both original and generated events #these are both original and generated events
batch.merge(e) unless e.cancelled? batch.merge(e) unless e.cancelled?
@ -528,7 +519,6 @@ module LogStash; class Pipeline < BasePipeline
# Now that we have our output to event mapping we can just invoke each output # Now that we have our output to event mapping we can just invoke each output
# once with its list of events # once with its list of events
output_events_map.each do |output, events| output_events_map.each do |output, events|
return if @force_shutdown.true?
output.multi_receive(events) output.multi_receive(events)
end end
@ -617,10 +607,6 @@ module LogStash; class Pipeline < BasePipeline
@logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id) @logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id)
end # def shutdown end # def shutdown
def force_shutdown!
@force_shutdown.make_true
end
def wait_for_workers def wait_for_workers
@logger.debug("Closing inputs", default_logging_keys) @logger.debug("Closing inputs", default_logging_keys)
@worker_threads.map(&:join) @worker_threads.map(&:join)
@ -667,7 +653,6 @@ module LogStash; class Pipeline < BasePipeline
flushers = options[:final] ? @shutdown_flushers : @periodic_flushers flushers = options[:final] ? @shutdown_flushers : @periodic_flushers
flushers.each do |flusher| flushers.each do |flusher|
return if @force_shutdown.true?
flusher.call(options, &block) flusher.call(options, &block)
end end
end end
@ -709,8 +694,6 @@ module LogStash; class Pipeline < BasePipeline
# @param options [Hash] # @param options [Hash]
def flush_filters_to_batch(batch, options = {}) def flush_filters_to_batch(batch, options = {})
flush_filters(options) do |event| flush_filters(options) do |event|
return if @force_shutdown.true?
unless event.cancelled? unless event.cancelled?
@logger.debug? and @logger.debug("Pushing flushed events", default_logging_keys(:event => event)) @logger.debug? and @logger.debug("Pushing flushed events", default_logging_keys(:event => event))
batch.merge(event) batch.merge(event)