- force quit if shutdown procses takes longer than 10 seconds.

- use begin/rescue/retry instead of loops
- Make amqp support shutdowns properly
This commit is contained in:
Jordan Sissel 2011-05-16 00:45:44 -07:00
parent cc403795ff
commit 9ba030bedd
4 changed files with 86 additions and 50 deletions

View file

@ -364,24 +364,41 @@ class LogStash::Agent
@is_shutting_down = true
Thread.new do
@logger.info("Starting shutdown sequence")
LogStash::Util::set_thread_name("logstash shutdown process")
# TODO(sissel): Make this a flag
force_shutdown_time = Time.now + 10
finished_queue = Queue.new
# Tell everything to shutdown.
@logger.debug(@plugins.keys.collect(&:to_s))
@plugins.each do |plugin, thread|
@logger.debug("Telling to shutdown: #{plugin.to_s}")
plugin.shutdown(finished_queue)
end
# Now wait until the queues we were given are empty.
#@logger.debug(@plugins)
loop do
@logger.debug("Waiting for plugins to finish.")
remaining = @plugins.select { |plugin, thread| plugin.running? }
break if remaining.size == 0
remaining = @plugins.select { |plugin, thread| plugin.running? }
while remaining.size > 0
if (Time.now > force_shutdown_time)
@logger.warn("Time to quit, even if some plugins aren't finished yet.")
@logger.warn("Stuck plugins? #{remaining.map(&:first).join(", ")}")
break
end
plugin = finished_queue.pop
@logger.debug("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
end # loop
@logger.debug("Waiting for plugins to finish.")
plugin = finished_queue.pop(non_block=true) rescue nil
if plugin.nil?
sleep(1)
else
remaining = @plugins.select { |plugin, thread| plugin.running? }
@logger.debug("#{plugin.to_s} finished, waiting on " \
"#{remaining.size} plugins; " \
"#{remaining.map(&:first).join(", ")}")
end
end # while remaining.size > 0
# When we get here, all inputs have finished, all messages are done
@logger.info("Shutdown complete")
@ -413,13 +430,15 @@ class LogStash::Agent
#end # SIGUSR1
Signal.trap("INT") do
@logger.warn("SIGINT received, shutting down.")
shutdown
end
Signal.trap("TERM") do
@logger.warn("SIGTERM received, shutting down.")
shutdown
end
end # def register_signal_handler
end # def register_signal_handlers
private
def run_input(input, queue)

View file

@ -61,33 +61,37 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
end # def register
def run(queue)
loop do
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
@bunny = Bunny.new(@amqpsettings)
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
@bunny = Bunny.new(@amqpsettings)
begin
@bunny.start
begin
return if terminating?
@bunny.start
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
end
@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
end
queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
end # begin/rescue
end # loop
queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
retry
end # begin/rescue
end # def run
def teardown
@bunny.close if @bunny
end # def teardown
end # class LogStash::Inputs::Amqp

View file

@ -58,33 +58,39 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
amqpsettings[:user] = @user if @user
amqpsettings[:pass] = @password.value if @password
amqpsettings[:logging] = @debug
loop do
begin
@logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
@bunny = Bunny.new(amqpsettings)
begin
@bunny.start
break # success
rescue Bunny::ServerDownError => e
@bunny.start
break # success
rescue Bunny::ServerDownError => e
if terminating?
return
else
@logger.error("AMQP connection error, will reconnect: #{e}")
sleep(1)
retry
end
end # loop
end
@target = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
end # def connect
public
def receive(event)
loop do
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
if @target
@target.publish(event.to_json, :persistent => @persistent)
break;
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
connect
retry
else
@logger.warn("Tried to send message, but not connected to amqp yet.")
end
end # loop do
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
connect
retry
end
end # def receive
# This is used by the ElasticSearch AMQP/River output.
@ -98,8 +104,10 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
end
public
def teardown
@bunny.close_connection
end # def teardown
#public
#def teardown
#@bunny.close rescue nil
#@bunny = nil
#@target = nil
#end # def teardown
end # class LogStash::Outputs::Amqp

View file

@ -55,4 +55,9 @@ class LogStash::Plugin
return @plugin_state != :finished
end # def finished?
public
def terminating?
return @plugin_state == :terminating
end # def terminating?
end # class LogStash::Plugin