mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
3de8506057
commit
84be8d34a6
2 changed files with 19 additions and 1 deletions
|
@ -33,7 +33,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
# Note that this setting may not be useful for all outputs.
|
||||
config :workers, :validate => :number, :default => 1
|
||||
|
||||
attr_reader :worker_plugins
|
||||
attr_reader :worker_plugins, :worker_queue
|
||||
|
||||
public
|
||||
def workers_not_supported(message=nil)
|
||||
|
|
|
@ -252,6 +252,7 @@ class LogStash::Pipeline
|
|||
#
|
||||
# This method is intended to be called from another thread
|
||||
def shutdown
|
||||
Thread.new { report_inflight_events(@input_to_filter, @filter_to_output, @outputs) }
|
||||
@input_threads.each do |thread|
|
||||
# Interrupt all inputs
|
||||
@logger.info("Sending shutdown signal to input thread", :thread => thread)
|
||||
|
@ -321,4 +322,21 @@ class LogStash::Pipeline
|
|||
end
|
||||
end # flush_filters_to_output!
|
||||
|
||||
private
|
||||
def report_inflight_events(input_to_filter, filter_to_output, outputs)
|
||||
loop do
|
||||
sleep 5
|
||||
report = []
|
||||
report << "*** BEGIN INFLIGHT EVENTS REPORT ****"
|
||||
report << "input_to_filter => #{input_to_filter.size} events" if input_to_filter.size > 0
|
||||
report << "filter_to_output => #{filter_to_output.size} events" if filter_to_output.size > 0
|
||||
outputs.each do |output|
|
||||
next unless output.worker_queue
|
||||
report << "#{output} => #{output.worker_queue.size} events" if output.worker_queue.size > 0
|
||||
end
|
||||
report << "*** END INFLIGHT EVENTS REPORT ****"
|
||||
@logger.warn report.join("\n")
|
||||
end
|
||||
end
|
||||
|
||||
end # class Pipeline
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue