- clean up some log messages

- make SIGINT (^C) and SIGTERM start the shutdown sequence.
- Wrap LogStash::Outputs::Base#recieve with #handle which checks
  for the shutdown event and automatically handles it.
This commit is contained in:
Jordan Sissel 2011-05-15 00:40:17 -07:00
parent 28ca682b3f
commit a61da6a8f3
5 changed files with 60 additions and 25 deletions

View file

@ -117,7 +117,7 @@ class LogStash::Agent
# These are 'unknown' flags that begin --<plugin>-flag
# Put any plugin paths into the ruby library path for requiring later.
@plugin_paths.each do |p|
@logger.info "Adding #{p.inspect} to ruby load path"
@logger.debug("Adding #{p.inspect} to ruby load path")
$:.unshift p
end
@ -149,7 +149,7 @@ class LogStash::Agent
# and add any options to our option parser.
klass_name = name.capitalize
if c.const_defined?(klass_name)
@logger.info("Found plugin class #{c}::#{klass_name})")
@logger.debug("Found plugin class #{c}::#{klass_name})")
klass = c.const_get(klass_name)
# See LogStash::Config::Mixin::DSL#options
klass.options(@opts)
@ -219,6 +219,7 @@ class LogStash::Agent
public
def run(&block)
LogStash::Util::set_thread_name(self.class.name)
register_signal_handlers
ok = parse_options
if !ok
@ -274,7 +275,7 @@ class LogStash::Agent
input_target = @filters.length > 0 ? filter_queue : output_queue
# Start inputs
@inputs.each do |input|
@logger.info(["Starting input", input])
@logger.debug(["Starting input", input])
@plugins[input] = Thread.new(input, input_target) do |*args|
run_input(*args)
end
@ -327,6 +328,7 @@ class LogStash::Agent
# then stop the event loop
end # def stop
# TODO(sissel): Is this method even used anymore?
protected
def filter(event)
@filters.each do |f|
@ -335,14 +337,16 @@ class LogStash::Agent
end
end # def filter
# TODO(sissel): Is this method even used anymore?
protected
def output(event)
# TODO(sissel): write to a multiqueue and do 1 thread per output?
@outputs.each do |o|
o.receive(event)
o.handle(event)
end # each output
end # def output
# TODO(sissel): Is this method even used anymore?
protected
# Process a message
def receive(event)
@ -369,14 +373,14 @@ class LogStash::Agent
end
# Now wait until the queues we were given are empty.
#@logger.info(@plugins)
#@logger.debug(@plugins)
loop do
@logger.info("Waiting for plugins to finish.")
@logger.debug("Waiting for plugins to finish.")
remaining = @plugins.select { |plugin, thread| plugin.running? }
break if remaining.size == 0
plugin = finished_queue.pop
@logger.info("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
@logger.debug("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
end # loop
# When we get here, all inputs have finished, all messages are done
@ -386,27 +390,35 @@ class LogStash::Agent
end # def shutdown
public
def register_signal_handler
def register_signal_handlers
# TODO(sissel): This doesn't work well in jruby since ObjectSpace is disabled
# by default.
Signal.trap("USR2") do
#Signal.trap("USR2") do
# TODO(sissel): Make this a function.
#counts = Hash.new { |h,k| h[k] = 0 }
#ObjectSpace.each_object do |obj|
#counts[obj.class] += 1
#end
@logger.info("SIGUSR1 received. Dumping state")
@logger.info("#{self.class.name} config")
@logger.info([" Inputs:", @inputs])
@logger.info([" Filters:", @filters])
@logger.info([" Outputs:", @outputs])
#@logger.info("SIGUSR1 received. Dumping state")
#@logger.info("#{self.class.name} config")
#@logger.info([" Inputs:", @inputs])
#@logger.info([" Filters:", @filters])
##@logger.info([" Outputs:", @outputs])
#@logger.info("Dumping counts of objects by class")
#counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value|
#@logger.info("Class: [#{value}] #{key}")
#end
end # SIGUSR1
##end
#end # SIGUSR1
Signal.trap("INT") do
shutdown
end
Signal.trap("TERM") do
shutdown
end
end # def register_signal_handler
private
@ -457,7 +469,7 @@ class LogStash::Agent
begin
while event = queue.pop do
@logger.debug("Sending event to #{output.to_s}")
output.receive(event)
output.handle(event)
end
rescue Exception => e
@logger.warn(["Output #{output.to_s} thread exception", e])
@ -481,10 +493,10 @@ class LogStash::Agent
remaining = @plugins.count do |plugin, thread|
plugin.is_a?(pluginclass) and plugin.running?
end
@logger.info("#{pluginclass} still running: #{remaining}")
@logger.debug("#{pluginclass} still running: #{remaining}")
if remaining == 0
@logger.info("All #{pluginclass} finished. Shutting down.")
@logger.debug("All #{pluginclass} finished. Shutting down.")
# Send 'shutdown' to the filters.
queue << LogStash::SHUTDOWN if !queue.nil?

View file

@ -20,7 +20,7 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
event = LogStash::Event.new
begin
event.message = $stdin.readline.chomp
rescue EOFError => e
rescue *[EOFError, IOError] => e
@logger.info("Got EOF from stdin input. Ending")
finished
return
@ -32,4 +32,9 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
queue << event
end # loop
end # def run
public
def teardown
$stdin.close
end # def teardown
end # class LogStash::Inputs::Stdin

View file

@ -74,11 +74,6 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
public
def receive(event)
if event == LogStash::SHUTDOWN
finished
return
end
loop do
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
@ -102,4 +97,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
def to_s
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
end
public
def teardown
@bunny.close_connection
end # def teardown
end # class LogStash::Outputs::Amqp

View file

@ -28,4 +28,14 @@ class LogStash::Outputs::Base < LogStash::Plugin
def receive(event)
raise "#{self.class}#receive must be overidden"
end # def receive
public
def handle(event)
if event == LogStash::SHUTDOWN
finished
return
end
receive(event)
end # def handle
end # class LogStash::Outputs::Base

View file

@ -12,6 +12,7 @@ class LogStash::Plugin
# By default, shutdown is assumed a no-op for all plugins.
# If you need to take special efforts to shutdown (like waiting for
# an operation to complete, etc)
teardown
@logger.info("Got shutdown signal for #{self}")
@shutdown_queue = queue
@ -37,6 +38,13 @@ class LogStash::Plugin
end
end # def finished
# Subclasses should implement this teardown method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
public
def teardown
# nothing by default
end
public
def finished?
return @plugin_state == :finished