mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
028576ba6f
commit
3a1b0f200e
2 changed files with 16 additions and 28 deletions
|
@ -14,7 +14,6 @@ require "logstash/util/defaults_printer"
|
|||
require "logstash/shutdown_controller"
|
||||
require "logstash/util/wrapped_synchronous_queue"
|
||||
require "logstash/pipeline_reporter"
|
||||
require "concurrent/timer_task"
|
||||
require "logstash/output_delegator"
|
||||
|
||||
module LogStash; class Pipeline
|
||||
|
@ -69,6 +68,7 @@ module LogStash; class Pipeline
|
|||
@settings = DEFAULT_SETTINGS.clone
|
||||
# @ready requires thread safety since it is typically polled from outside the pipeline thread
|
||||
@ready = Concurrent::AtomicBoolean.new(false)
|
||||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
@flushing = Concurrent::AtomicReference.new(false)
|
||||
settings.each {|setting, value| configure(setting, value) }
|
||||
|
||||
|
@ -128,7 +128,11 @@ module LogStash; class Pipeline
|
|||
|
||||
# Block until all inputs have stopped
|
||||
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread
|
||||
|
||||
@running.make_true
|
||||
wait_inputs
|
||||
@running.make_false
|
||||
|
||||
@logger.info("Input plugins stopped! Will shutdown filter/output workers.")
|
||||
|
||||
shutdown_flusher
|
||||
|
@ -398,36 +402,17 @@ module LogStash; class Pipeline
|
|||
end
|
||||
end
|
||||
|
||||
class FlusherObserver
|
||||
def initialize(logger, flushing)
|
||||
@logger = logger
|
||||
@flushing = flushing
|
||||
end
|
||||
|
||||
def update(time, result, exception)
|
||||
# This is a safeguard in the event that the timer task somehow times out
|
||||
# We still need to call it in the original (in case someone decides to call it directly)
|
||||
# but this is the safeguard for timer related issues causing @flushing not to be reset to false
|
||||
@flushing.set(false)
|
||||
|
||||
return unless exception
|
||||
@logger.warn("Error during flush!",
|
||||
:message => exception.message,
|
||||
:class => exception.class.name,
|
||||
:backtrace => exception.backtrace)
|
||||
end
|
||||
end
|
||||
|
||||
def start_flusher
|
||||
@flusher_task = Concurrent::TimerTask.new { flush }
|
||||
@flusher_task.execution_interval = @settings[:flush_interval]
|
||||
@flusher_task.timeout_interval = @settings[:flush_timeout_interval]
|
||||
@flusher_task.add_observer(FlusherObserver.new(@logger, @flushing))
|
||||
@flusher_task.execute
|
||||
@flusher_thread = Thread.new do
|
||||
while Stud.stoppable_sleep(5, 0.1) { @running.false? }
|
||||
flush
|
||||
break if @running.false?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown_flusher
|
||||
@flusher_task.shutdown
|
||||
@flusher_thread.join
|
||||
end
|
||||
|
||||
def flush
|
||||
|
|
|
@ -345,7 +345,10 @@ describe LogStash::Pipeline do
|
|||
Thread.new { pipeline.run }
|
||||
sleep 0.1 while !pipeline.ready?
|
||||
# give us a bit of time to flush the events
|
||||
wait(5).for { output.events.first["message"].split("\n").count }.to eq(number_of_events)
|
||||
wait(5).for do
|
||||
next unless output && output.events && output.events.first
|
||||
output.events.first["message"].split("\n").count
|
||||
end.to eq(number_of_events)
|
||||
pipeline.shutdown
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue