Merge remote-tracking branch 'logstash/master'

This commit is contained in:
Bob Corsaro 2011-05-16 11:55:47 -04:00
commit 39fa1a8615
9 changed files with 165 additions and 78 deletions

View file

@ -6,12 +6,11 @@
- 'amqp' output supports persistent messages over AMQP, now. Tunable.
https://logstash.jira.com/browse/LOGSTASH-81
- Redis input and output are now supported. (Contributed by dokipen)
- Add shutdown processing. Shutdown sequence will start if SIGINT or SIGTERM
are received or if all inputs finish (like stdin). The shutdown sequence
always starts the inputs. The sequence progresses using the same pipeline
as the inputs/filters/outputs, so all in-flight events should finish
getting processed before the final shutdown event makes it's way to the
outputs.
- Add shutdown processing. Shutdown starts when all inputs finish (like
stdin). The shutdown sequence always starts the inputs. The sequence
progresses using the same pipeline as the inputs/filters/outputs, so all
in-flight events should finish getting processed before the final shutdown
event makes it's way to the outputs.
- Add retries to unhandled input exceptions (LOGSTASH-84)
1.0.6 (May 11, 2011)

32
Gemfile
View file

@ -1,24 +1,24 @@
source :rubygems
gem "bunny" # for amqp support
gem "uuidtools" # for naming amqp queues
gem "filewatch", "~> 0.2.3" # for file tailing
gem "jls-grok", "~> 0.4.7" # for grok filter
gem "jruby-elasticsearch", "~> 0.0.7"
gem "stomp" # for stomp protocol
gem "json"
gem "awesome_print"
gem "bunny" # for amqp support, MIT-style license
gem "uuidtools" # for naming amqp queues, License ???
gem "filewatch", "~> 0.2.3" # for file tailing, BSD License
gem "jls-grok", "~> 0.4.7" # for grok filter, BSD License
gem "jruby-elasticsearch", "~> 0.0.7", BSD License
gem "stomp" # for stomp protocol, Apache 2.0 License
gem "json" # Ruby license
gem "awesome_print" # MIT License
gem "rack"
gem "mizuno"
gem "sinatra"
gem "haml"
gem "rack" # License: MIT
gem "mizuno" # License: Apache 2.0
gem "sinatra" # License: MIT-style
gem "haml" # License: MIT
# TODO(sissel): Put this into a group that's only used for monolith packaging
gem "mongo" # outputs/mongodb
gem "redis" # outputs/redis
gem "gelf" # outputs/gelf
gem "mongo" # outputs/mongodb, License: Apache 2.0
gem "redis" # outputs/redis, License: MIT-style
gem "beanstalk-client" # for beanstalk support, License: GPL3
gem "gelf" # outputs/gelf, # License: MIT-style
# For testing/dev
group :development do

View file

@ -2,6 +2,7 @@ GEM
remote: http://rubygems.org/
specs:
awesome_print (0.3.2)
beanstalk-client (1.1.0)
bson (1.3.0-java)
bunny (0.6.0)
daemons (1.1.2)
@ -43,6 +44,7 @@ PLATFORMS
DEPENDENCIES
awesome_print
beanstalk-client
bunny
filewatch (~> 0.2.3)
gelf

View file

@ -364,24 +364,41 @@ class LogStash::Agent
@is_shutting_down = true
Thread.new do
@logger.info("Starting shutdown sequence")
LogStash::Util::set_thread_name("logstash shutdown process")
# TODO(sissel): Make this a flag
force_shutdown_time = Time.now + 10
finished_queue = Queue.new
# Tell everything to shutdown.
@logger.debug(@plugins.keys.collect(&:to_s))
@plugins.each do |plugin, thread|
@logger.debug("Telling to shutdown: #{plugin.to_s}")
plugin.shutdown(finished_queue)
end
# Now wait until the queues we were given are empty.
#@logger.debug(@plugins)
loop do
@logger.debug("Waiting for plugins to finish.")
remaining = @plugins.select { |plugin, thread| plugin.running? }
break if remaining.size == 0
remaining = @plugins.select { |plugin, thread| plugin.running? }
while remaining.size > 0
if (Time.now > force_shutdown_time)
@logger.warn("Time to quit, even if some plugins aren't finished yet.")
@logger.warn("Stuck plugins? #{remaining.map(&:first).join(", ")}")
break
end
plugin = finished_queue.pop
@logger.debug("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
end # loop
@logger.debug("Waiting for plugins to finish.")
plugin = finished_queue.pop(non_block=true) rescue nil
if plugin.nil?
sleep(1)
else
remaining = @plugins.select { |plugin, thread| plugin.running? }
@logger.debug("#{plugin.to_s} finished, waiting on " \
"#{remaining.size} plugins; " \
"#{remaining.map(&:first).join(", ")}")
end
end # while remaining.size > 0
# When we get here, all inputs have finished, all messages are done
@logger.info("Shutdown complete")
@ -412,14 +429,16 @@ class LogStash::Agent
##end
#end # SIGUSR1
Signal.trap("INT") do
shutdown
end
#Signal.trap("INT") do
#@logger.warn("SIGINT received, shutting down.")
#shutdown
#end
Signal.trap("TERM") do
shutdown
end
end # def register_signal_handler
#Signal.trap("TERM") do
#@logger.warn("SIGTERM received, shutting down.")
#shutdown
#end
end # def register_signal_handlers
private
def run_input(input, queue)

View file

@ -0,0 +1,45 @@
require "logstash/namespace"
require "logstash/logging"
require "logstash/plugin"
require "logstash/config/mixin"
# TODO(sissel): Should this really be a 'plugin' ?
class LogStash::FilterWorker < LogStash::Plugin
attr_accessor :logger
def initialize(filters, input_queue, output_queue)
@filters = filters
@input_queue = input_queue
@output_queue = output_queue
end # def initialize
def run
# for each thread.
@filters.each do |filter|
filter.logger = @logger
filter.register
end
while event = @input_queue.pop
if event == LogStash::SHUTDOWN
finished
break
end
# TODO(sissel): Handle exceptions? Retry? Drop it?
@filters.each do |filter|
filter.filter(event)
if event.cancelled?
@logger.debug({:message => "Event cancelled",
:event => event,
:filter => filter.class,
})
break
end
end # @filters.each
@logger.debug(["Event finished filtering", event])
@output_queue.push(event) unless event.cancelled?
end # while @input_queue.pop
end
end # class LogStash::FilterWorker

View file

@ -61,33 +61,37 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
end # def register
def run(queue)
loop do
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
@bunny = Bunny.new(@amqpsettings)
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
@bunny = Bunny.new(@amqpsettings)
begin
@bunny.start
begin
return if terminating?
@bunny.start
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
end
@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
end
queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
end # begin/rescue
end # loop
queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
retry
end # begin/rescue
end # def run
def teardown
@bunny.close if @bunny
end # def teardown
end # class LogStash::Inputs::Amqp

View file

@ -42,4 +42,9 @@ class LogStash::Inputs::Beanstalk < LogStash::Inputs::Base
job.delete
end
end # def run
public
def teardown
@beanstalk.close rescue nil
end # def teardown
end # class LogStash::Inputs::Beanstalk

View file

@ -58,33 +58,39 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
amqpsettings[:user] = @user if @user
amqpsettings[:pass] = @password.value if @password
amqpsettings[:logging] = @debug
loop do
begin
@logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
@bunny = Bunny.new(amqpsettings)
begin
@bunny.start
break # success
rescue Bunny::ServerDownError => e
@bunny.start
break # success
rescue Bunny::ServerDownError => e
if terminating?
return
else
@logger.error("AMQP connection error, will reconnect: #{e}")
sleep(1)
retry
end
end # loop
end
@target = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
end # def connect
public
def receive(event)
loop do
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
begin
if @target
@target.publish(event.to_json, :persistent => @persistent)
break;
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
connect
retry
else
@logger.warn("Tried to send message, but not connected to amqp yet.")
end
end # loop do
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
connect
retry
end
end # def receive
# This is used by the ElasticSearch AMQP/River output.
@ -98,8 +104,10 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
end
public
def teardown
@bunny.close_connection
end # def teardown
#public
#def teardown
#@bunny.close rescue nil
#@bunny = nil
#@target = nil
#end # def teardown
end # class LogStash::Outputs::Amqp

View file

@ -55,4 +55,9 @@ class LogStash::Plugin
return @plugin_state != :finished
end # def finished?
public
def terminating?
return @plugin_state == :terminating
end # def terminating?
end # class LogStash::Plugin