mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge remote branch 'wilkenfeld/master'
This commit is contained in:
commit
147f7385fd
16 changed files with 430 additions and 112 deletions
17
CHANGELOG
17
CHANGELOG
|
@ -1,7 +1,18 @@
|
|||
1.0.7 ( ????? )
|
||||
@ TODO: logstash-web needs to support elasticsearch cluster name
|
||||
@ TODO: GELF 'dynamic' level/facility? (LOGSTASH-83)
|
||||
@ TODO: Catch input exceptions and handle them sanely (LOGSTASH-84)
|
||||
- logstash 'web' now allows you to specify the elasticsearch clustername;
|
||||
--backend elasticsearch://[host[:port]]/[clustername]
|
||||
- GELF output now supports dynamic strings for level and facility
|
||||
https://logstash.jira.com/browse/LOGSTASH-83
|
||||
- 'amqp' output supports persistent messages over AMQP, now. Tunable.
|
||||
https://logstash.jira.com/browse/LOGSTASH-81
|
||||
- Redis input and output are now supported. (Contributed by dokipen)
|
||||
- Add shutdown processing. Shutdown sequence will start if SIGINT or SIGTERM
|
||||
are received or if all inputs finish (like stdin). The shutdown sequence
|
||||
always starts the inputs. The sequence progresses using the same pipeline
|
||||
as the inputs/filters/outputs, so all in-flight events should finish
|
||||
getting processed before the final shutdown event makes it's way to the
|
||||
outputs.
|
||||
- Add retries to unhandled input exceptions (LOGSTASH-84)
|
||||
|
||||
1.0.6 (May 11, 2011)
|
||||
* Remove 'sigar' from monolithic jar packaging. This removes a boatload of
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
require "java"
|
||||
require "logstash/config/file"
|
||||
require "logstash/filters"
|
||||
require "logstash/filterworker"
|
||||
require "logstash/inputs"
|
||||
require "logstash/logging"
|
||||
require "logstash/multiqueue"
|
||||
|
@ -11,6 +12,7 @@ require "logstash/namespace"
|
|||
require "logstash/outputs"
|
||||
require "logstash/util"
|
||||
require "optparse"
|
||||
require "thread"
|
||||
require "uri"
|
||||
|
||||
# TODO(sissel): only enable this if we are in debug mode.
|
||||
|
@ -38,7 +40,8 @@ class LogStash::Agent
|
|||
@verbose = 0
|
||||
@daemonize = false
|
||||
|
||||
@threads = {}
|
||||
@plugins = {}
|
||||
@plugins_mutex = Mutex.new
|
||||
@outputs = []
|
||||
@inputs = []
|
||||
@filters = []
|
||||
|
@ -51,6 +54,7 @@ class LogStash::Agent
|
|||
# TODO(sissel): Other default plugin paths?
|
||||
|
||||
Thread::abort_on_exception = true
|
||||
@is_shutting_down = false
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
|
@ -113,7 +117,7 @@ class LogStash::Agent
|
|||
# These are 'unknown' flags that begin --<plugin>-flag
|
||||
# Put any plugin paths into the ruby library path for requiring later.
|
||||
@plugin_paths.each do |p|
|
||||
@logger.info "Adding #{p.inspect} to ruby load path"
|
||||
@logger.debug("Adding #{p.inspect} to ruby load path")
|
||||
$:.unshift p
|
||||
end
|
||||
|
||||
|
@ -145,7 +149,7 @@ class LogStash::Agent
|
|||
# and add any options to our option parser.
|
||||
klass_name = name.capitalize
|
||||
if c.const_defined?(klass_name)
|
||||
@logger.info("Found plugin class #{c}::#{klass_name})")
|
||||
@logger.debug("Found plugin class #{c}::#{klass_name})")
|
||||
klass = c.const_get(klass_name)
|
||||
# See LogStash::Config::Mixin::DSL#options
|
||||
klass.options(@opts)
|
||||
|
@ -215,6 +219,7 @@ class LogStash::Agent
|
|||
public
|
||||
def run(&block)
|
||||
LogStash::Util::set_thread_name(self.class.name)
|
||||
register_signal_handlers
|
||||
|
||||
ok = parse_options
|
||||
if !ok
|
||||
|
@ -261,52 +266,34 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
# NOTE(petef) we should use a SizedQueue here (w/config params for size)
|
||||
#filter_queue = Queue.new
|
||||
filter_queue = SizedQueue.new(10)
|
||||
output_queue = LogStash::MultiQueue.new
|
||||
|
||||
ready_queue = Queue.new
|
||||
@ready_queue = Queue.new
|
||||
|
||||
# inputs should write directly to output queue if there are no filters.
|
||||
input_target = @filters.length > 0 ? filter_queue : output_queue
|
||||
# Start inputs
|
||||
@inputs.each do |input|
|
||||
@logger.info(["Starting input", input])
|
||||
@threads[input] = Thread.new(input_target) do |input_target|
|
||||
LogStash::Util::set_thread_name("input|#{input.inspect}")
|
||||
input.logger = @logger
|
||||
input.register
|
||||
ready_queue << input
|
||||
input.run(input_target)
|
||||
end # new thread for thsi input
|
||||
@logger.debug(["Starting input", input])
|
||||
@plugins[input] = Thread.new(input, input_target) do |*args|
|
||||
run_input(*args)
|
||||
end
|
||||
end # @inputs.each
|
||||
|
||||
# Create N filter-worker threads
|
||||
if @filters.length > 0
|
||||
1.times do |n|
|
||||
@logger.info("Starting filter worker thread #{n}")
|
||||
@threads["filter|worker|#{n}"] = Thread.new do
|
||||
LogStash::Util::set_thread_name("filter|worker|#{n}")
|
||||
@filters.each do |filter|
|
||||
filter.logger = @logger
|
||||
filter.register
|
||||
# TODO(sissel): facter this out into a 'filterworker' that accepts
|
||||
# 'shutdown'
|
||||
# Start a filter worker
|
||||
filterworker = LogStash::FilterWorker.new(@filters, filter_queue,
|
||||
output_queue)
|
||||
filterworker.logger = @logger
|
||||
@plugins[filterworker] = \
|
||||
Thread.new(filterworker, n, output_queue) do |*args|
|
||||
run_filter(*args)
|
||||
end
|
||||
|
||||
while event = filter_queue.pop
|
||||
filters.each do |filter|
|
||||
filter.filter(event)
|
||||
if event.cancelled?
|
||||
@logger.debug({:message => "Event cancelled",
|
||||
:event => event,
|
||||
:filter => filter.class,
|
||||
})
|
||||
break
|
||||
end
|
||||
end # filters.each
|
||||
|
||||
@logger.debug(["Event finished filtering", event])
|
||||
output_queue.push(event) unless event.cancelled?
|
||||
end # event pop
|
||||
end # Thread.new
|
||||
end # N.times
|
||||
end # if @filters.length > 0
|
||||
|
||||
|
@ -315,30 +302,14 @@ class LogStash::Agent
|
|||
@outputs.each do |output|
|
||||
queue = SizedQueue.new(10)
|
||||
output_queue.add_queue(queue)
|
||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||
output.register
|
||||
ready_queue << output
|
||||
begin
|
||||
LogStash::Util::set_thread_name("output/#{output.to_s}")
|
||||
output.logger = @logger
|
||||
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
output.receive(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end # begin/rescue
|
||||
end # Thread.new
|
||||
@plugins[output] = Thread.new(output, queue) do |*args|
|
||||
run_output(*args)
|
||||
end
|
||||
end # @outputs.each
|
||||
|
||||
# Wait for all inputs and outputs to be registered.
|
||||
wait_count = outputs.size + inputs.size
|
||||
while wait_count > 0 and ready_queue.pop
|
||||
while wait_count > 0 and @ready_queue.pop
|
||||
wait_count -= 1
|
||||
end
|
||||
|
||||
|
@ -357,6 +328,7 @@ class LogStash::Agent
|
|||
# then stop the event loop
|
||||
end # def stop
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
def filter(event)
|
||||
@filters.each do |f|
|
||||
|
@ -365,14 +337,16 @@ class LogStash::Agent
|
|||
end
|
||||
end # def filter
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
def output(event)
|
||||
# TODO(sissel): write to a multiqueue and do 1 thread per output?
|
||||
@outputs.each do |o|
|
||||
o.receive(event)
|
||||
o.handle(event)
|
||||
end # each output
|
||||
end # def output
|
||||
|
||||
# TODO(sissel): Is this method even used anymore?
|
||||
protected
|
||||
# Process a message
|
||||
def receive(event)
|
||||
|
@ -383,29 +357,153 @@ class LogStash::Agent
|
|||
end
|
||||
end # def input
|
||||
|
||||
# Shutdown the agent.
|
||||
protected
|
||||
def shutdown
|
||||
return if @is_shutting_down
|
||||
|
||||
@is_shutting_down = true
|
||||
Thread.new do
|
||||
LogStash::Util::set_thread_name("logstash shutdown process")
|
||||
|
||||
finished_queue = Queue.new
|
||||
# Tell everything to shutdown.
|
||||
@plugins.each do |plugin, thread|
|
||||
plugin.shutdown(finished_queue)
|
||||
end
|
||||
|
||||
# Now wait until the queues we were given are empty.
|
||||
#@logger.debug(@plugins)
|
||||
loop do
|
||||
@logger.debug("Waiting for plugins to finish.")
|
||||
remaining = @plugins.select { |plugin, thread| plugin.running? }
|
||||
break if remaining.size == 0
|
||||
|
||||
plugin = finished_queue.pop
|
||||
@logger.debug("#{plugin.to_s} finished, waiting on #{remaining.size} plugins")
|
||||
end # loop
|
||||
|
||||
# When we get here, all inputs have finished, all messages are done
|
||||
@logger.info("Shutdown complete")
|
||||
java.lang.System.exit(0)
|
||||
end
|
||||
end # def shutdown
|
||||
|
||||
public
|
||||
def register_signal_handler
|
||||
def register_signal_handlers
|
||||
# TODO(sissel): This doesn't work well in jruby since ObjectSpace is disabled
|
||||
# by default.
|
||||
Signal.trap("USR2") do
|
||||
#Signal.trap("USR2") do
|
||||
# TODO(sissel): Make this a function.
|
||||
#counts = Hash.new { |h,k| h[k] = 0 }
|
||||
#ObjectSpace.each_object do |obj|
|
||||
#counts[obj.class] += 1
|
||||
#end
|
||||
|
||||
@logger.info("SIGUSR1 received. Dumping state")
|
||||
@logger.info("#{self.class.name} config")
|
||||
@logger.info([" Inputs:", @inputs])
|
||||
@logger.info([" Filters:", @filters])
|
||||
@logger.info([" Outputs:", @outputs])
|
||||
#@logger.info("SIGUSR1 received. Dumping state")
|
||||
#@logger.info("#{self.class.name} config")
|
||||
#@logger.info([" Inputs:", @inputs])
|
||||
#@logger.info([" Filters:", @filters])
|
||||
##@logger.info([" Outputs:", @outputs])
|
||||
|
||||
#@logger.info("Dumping counts of objects by class")
|
||||
#counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value|
|
||||
#@logger.info("Class: [#{value}] #{key}")
|
||||
#end
|
||||
end # SIGUSR1
|
||||
##end
|
||||
#end # SIGUSR1
|
||||
|
||||
Signal.trap("INT") do
|
||||
shutdown
|
||||
end
|
||||
|
||||
Signal.trap("TERM") do
|
||||
shutdown
|
||||
end
|
||||
end # def register_signal_handler
|
||||
|
||||
private
|
||||
def run_input(input, queue)
|
||||
LogStash::Util::set_thread_name("input|#{input.to_s}")
|
||||
input.logger = @logger
|
||||
input.register
|
||||
|
||||
@ready_queue << input
|
||||
done = false
|
||||
|
||||
while !done
|
||||
begin
|
||||
input.run(queue)
|
||||
done = true
|
||||
rescue => e
|
||||
@logger.warn(["Input #{input.to_s} thread exception", e])
|
||||
@logger.debug(["Input #{input.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
@logger.error("Restarting input #{input.to_s} due to exception")
|
||||
retry # This jumps to the top of this proc (to the start of 'do'
|
||||
end
|
||||
end
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::Inputs::Base, queue)
|
||||
end # def run_input
|
||||
|
||||
# Run a filter thread
|
||||
public
|
||||
def run_filter(filterworker, index, output_queue)
|
||||
LogStash::Util::set_thread_name("filter|worker|#{index}")
|
||||
filterworker.run
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::FilterWorker, output_queue)
|
||||
end # def run_filter
|
||||
|
||||
# TODO(sissel): Factor this into an 'outputworker'
|
||||
def run_output(output, queue)
|
||||
LogStash::Util::set_thread_name("output|#{output.to_s}")
|
||||
output.register
|
||||
output.logger = @logger
|
||||
@ready_queue << output
|
||||
|
||||
# TODO(sissel): We need a 'reset' or 'restart' method to call on errors
|
||||
|
||||
begin
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
output.handle(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end # begin/rescue
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::Outputs::Base)
|
||||
end # def run_output
|
||||
|
||||
def shutdown_if_none_running(pluginclass, queue=nil)
|
||||
# Send shutdown signal if all inputs are done.
|
||||
@plugins_mutex.synchronize do
|
||||
|
||||
# Look for plugins of type 'pluginclass' (or a subclass)
|
||||
# If none are running, start the shutdown sequence and
|
||||
# send the 'shutdown' event down the pipeline.
|
||||
remaining = @plugins.count do |plugin, thread|
|
||||
plugin.is_a?(pluginclass) and plugin.running?
|
||||
end
|
||||
@logger.debug("#{pluginclass} still running: #{remaining}")
|
||||
|
||||
if remaining == 0
|
||||
@logger.debug("All #{pluginclass} finished. Shutting down.")
|
||||
|
||||
# Send 'shutdown' to the filters.
|
||||
queue << LogStash::SHUTDOWN if !queue.nil?
|
||||
shutdown
|
||||
end # if remaining == 0
|
||||
end # @plugins_mutex.synchronize
|
||||
end # def shutdown_if_none_running
|
||||
end # class LogStash::Agent
|
||||
|
||||
if __FILE__ == $0
|
||||
|
|
|
@ -53,7 +53,7 @@ module LogStash::Config::Mixin
|
|||
next if params.include?(name.to_s)
|
||||
if opts.include?(:default) and (name.is_a?(Symbol) or name.is_a?(String))
|
||||
if opts[:validate] == :password
|
||||
@logger.info("Converting default value in #{self.class.name} (#{name}) to password object")
|
||||
@logger.debug("Converting default value in #{self.class.name} (#{name}) to password object")
|
||||
params[name.to_s] = ::LogStash::Util::Password.new(opts[:default])
|
||||
else
|
||||
params[name.to_s] = opts[:default]
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Filters::Base
|
||||
class LogStash::Filters::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
|
||||
attr_accessor :logger
|
||||
|
|
|
@ -31,8 +31,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
# The vhost to use
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# Is this exchange durable?
|
||||
config :durable, :validate => :boolean, :default => false
|
||||
# Is this exchange durable? (aka; Should it survive a broker restart?)
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/event"
|
||||
require "logstash/plugin"
|
||||
require "logstash/logging"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Inputs::Base
|
||||
class LogStash::Inputs::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
attr_accessor :logger
|
||||
|
||||
|
|
|
@ -11,23 +11,30 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => "default"
|
||||
|
||||
# The hostname of your redis server. Default hostname is 127.0.0.1.
|
||||
config :host, :validate => :string
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
# The port to connect on. The default port is 6379.
|
||||
config :port, :validate => :number
|
||||
# The port to connect on.
|
||||
config :port, :validate => :number, :default => 6379
|
||||
|
||||
# The redis database number. Db is 0 by default.
|
||||
config :db, :validate => :number
|
||||
# The redis database number.
|
||||
config :db, :validate => :number, :default => 0
|
||||
|
||||
# Initial connection timeout in seconds. Default timeout is 5 seconds.
|
||||
config :timeout, :validate => :number
|
||||
# Initial connection timeout in seconds.
|
||||
config :timeout, :validate => :number, :default => 5
|
||||
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of the redis queue (we'll use BLPOP against this).
|
||||
config :queue, :validate => :string, :required => true
|
||||
# The name of a redis list (we'll use BLPOP against this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :list, :validate => :string
|
||||
|
||||
# The name of a redis channel (we'll use SUBSCRIBE on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :channel, :validate => :string
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
@ -35,6 +42,10 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
|
||||
unless @list or @channel
|
||||
raise "Must specify redis list or channel"
|
||||
end
|
||||
end
|
||||
|
||||
def connect
|
||||
|
@ -48,11 +59,20 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
end
|
||||
|
||||
def run(output_queue)
|
||||
retries = @retries
|
||||
loop do
|
||||
wait = Proc.new do |command, *args|
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @queue, 0
|
||||
response = nil
|
||||
if command == :subscribe
|
||||
@redis.send(:subscribe, *args) do |on|
|
||||
on.message do |c, r|
|
||||
response = r
|
||||
end
|
||||
end
|
||||
else
|
||||
response = @redis.send(command, *args)
|
||||
end
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
|
@ -61,14 +81,88 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
@logger.error $!
|
||||
end
|
||||
rescue # redis error
|
||||
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end
|
||||
|
||||
if @channel
|
||||
Thread.new do
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.subscribe @channel do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.debug "Subscribed to #{ch} (#{count})"
|
||||
retries = @retries
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(message))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "Failed to create event with '#{message}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
end
|
||||
|
||||
on.unsubscribe do |ch, count|
|
||||
@logger.debug "Unsubscribed from #{ch} (#{count})"
|
||||
end
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @channel
|
||||
|
||||
if @list
|
||||
Thread.new do
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @list, 0
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @list
|
||||
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Redis
|
||||
|
|
|
@ -18,7 +18,13 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
def run(queue)
|
||||
loop do
|
||||
event = LogStash::Event.new
|
||||
event.message = $stdin.readline.chomp
|
||||
begin
|
||||
event.message = $stdin.readline.chomp
|
||||
rescue *[EOFError, IOError] => e
|
||||
@logger.info("Got EOF from stdin input. Ending")
|
||||
finished
|
||||
return
|
||||
end
|
||||
event.type = @type
|
||||
event.tags = @tags.clone rescue []
|
||||
event.source = "stdin://#{@host}/"
|
||||
|
@ -26,4 +32,9 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
queue << event
|
||||
end # loop
|
||||
end # def run
|
||||
|
||||
public
|
||||
def teardown
|
||||
$stdin.close
|
||||
end # def teardown
|
||||
end # class LogStash::Inputs::Stdin
|
||||
|
|
|
@ -8,4 +8,6 @@ module LogStash
|
|||
module Config; end
|
||||
module File; end
|
||||
module Web; end
|
||||
|
||||
SHUTDOWN = :shutdown
|
||||
end # module LogStash
|
||||
|
|
|
@ -27,8 +27,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
# The vhost to use
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# Is this exchange durable?
|
||||
config :durable, :validate => :boolean, :default => false
|
||||
# Is this exchange durable? (aka; Should it survive a broker restart?)
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# Should messages persist to disk on the AMQP broker until they are read by a
|
||||
# consumer?
|
||||
config :persistent, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
@ -73,7 +77,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
loop do
|
||||
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
|
||||
begin
|
||||
@target.publish(event.to_json)
|
||||
@target.publish(event.to_json, :persistent => @persistent)
|
||||
break;
|
||||
rescue *[Bunny::ServerDownError, Errno::ECONNRESET] => e
|
||||
@logger.error("AMQP connection error, will reconnect: #{e}")
|
||||
|
@ -93,4 +97,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
def to_s
|
||||
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
|
||||
end
|
||||
|
||||
public
|
||||
def teardown
|
||||
@bunny.close_connection
|
||||
end # def teardown
|
||||
end # class LogStash::Outputs::Amqp
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
require "cgi"
|
||||
require "logstash/event"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/namespace"
|
||||
require "logstash/config/mixin"
|
||||
require "uri"
|
||||
|
||||
class LogStash::Outputs::Base
|
||||
class LogStash::Outputs::Base < LogStash::Plugin
|
||||
include LogStash::Config::Mixin
|
||||
|
||||
attr_accessor :logger
|
||||
|
@ -27,4 +28,14 @@ class LogStash::Outputs::Base
|
|||
def receive(event)
|
||||
raise "#{self.class}#receive must be overidden"
|
||||
end # def receive
|
||||
|
||||
public
|
||||
def handle(event)
|
||||
if event == LogStash::SHUTDOWN
|
||||
finished
|
||||
return
|
||||
end
|
||||
|
||||
receive(event)
|
||||
end # def handle
|
||||
end # class LogStash::Outputs::Base
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require 'eventmachine'
|
||||
|
||||
# send events to a redis databse using RPUSH
|
||||
#
|
||||
|
@ -12,25 +11,30 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => 'default'
|
||||
|
||||
# The hostname of your redis server. Hostname is 127.0.0.1 by default.
|
||||
config :host, :validate => :string
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
# The port to connect on. Port is 6379 by default.
|
||||
config :port, :validate => :number
|
||||
# The port to connect on.
|
||||
config :port, :validate => :number, :default => 6379
|
||||
|
||||
# The redis database number. Db is 0 by default.
|
||||
config :db, :validate => :number
|
||||
# The redis database number.
|
||||
config :db, :validate => :number, :default => 0
|
||||
|
||||
# Redis initial connection timeout in seconds. Timeout is 5 seconds by
|
||||
# default.
|
||||
config :timeout, :validate => :number
|
||||
# Redis initial connection timeout in seconds.
|
||||
config :timeout, :validate => :number, :default => 5
|
||||
|
||||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}"
|
||||
config :queue, :validate => :string, :required => true
|
||||
# The name of a redis list (we'll use RPUSH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :list, :validate => :string
|
||||
|
||||
# The name of a redis channel (we'll use PUBLISH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :channel, :validate => :string
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
@ -38,6 +42,10 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
|
||||
unless @list or @channel
|
||||
raise "Must specify redis list or channel"
|
||||
end # unless @list or @channel
|
||||
end # def register
|
||||
|
||||
def connect
|
||||
|
@ -53,12 +61,16 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
def receive(event, tries=@retries)
|
||||
if tries <= 0
|
||||
@logger.error "Fatal error, failed to log #{event.to_s} to redis #{@name}"
|
||||
raise RuntimeError.new "Failed to log to redis #{@name}"
|
||||
raise "Failed to log to redis #{@name}"
|
||||
end
|
||||
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.rpush event.sprintf(@queue), event.to_json
|
||||
tx = @list and @channel
|
||||
@redis.multi if tx
|
||||
@redis.rpush event.sprintf(@list), event.to_json if @list
|
||||
@redis.publish event.sprintf(@channel), event.to_json if @channel
|
||||
@redis.exec if tx
|
||||
rescue
|
||||
# TODO(sissel): Be specific in the exceptions we rescue.
|
||||
# Drop the redis connection to be picked up later during a retry.
|
||||
|
|
|
@ -28,6 +28,11 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def receive(event)
|
||||
if event == LogStash::SHUTDOWN
|
||||
finished
|
||||
return
|
||||
end
|
||||
|
||||
if @debug
|
||||
if HAVE_AWESOME_PRINT
|
||||
ap event.to_hash
|
||||
|
|
58
lib/logstash/plugin.rb
Normal file
58
lib/logstash/plugin.rb
Normal file
|
@ -0,0 +1,58 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/config/mixin"
|
||||
|
||||
class LogStash::Plugin
|
||||
|
||||
# This method is called when someone or something wants this plugin to shut
|
||||
# down. When you successfully shutdown, you must call 'finished'
|
||||
# You must also call 'super' in any subclasses.
|
||||
public
|
||||
def shutdown(queue)
|
||||
# By default, shutdown is assumed a no-op for all plugins.
|
||||
# If you need to take special efforts to shutdown (like waiting for
|
||||
# an operation to complete, etc)
|
||||
teardown
|
||||
@logger.info("Got shutdown signal for #{self}")
|
||||
|
||||
@shutdown_queue = queue
|
||||
if @plugin_state == :finished
|
||||
finished
|
||||
else
|
||||
@plugin_state = :terminating
|
||||
end
|
||||
end # def shutdown
|
||||
|
||||
# You should call this method when you (the plugin) are done with work
|
||||
# forever.
|
||||
public
|
||||
def finished
|
||||
if @shutdown_queue
|
||||
@logger.info("Sending shutdown event to agent queue. (plugin #{to_s})")
|
||||
@shutdown_queue << self
|
||||
end
|
||||
|
||||
if @plugin_state != :finished
|
||||
@logger.info("Plugin #{to_s} is finished")
|
||||
@plugin_state = :finished
|
||||
end
|
||||
end # def finished
|
||||
|
||||
# Subclasses should implement this teardown method if you need to perform any
|
||||
# special tasks during shutdown (like flushing, etc.)
|
||||
public
|
||||
def teardown
|
||||
# nothing by default
|
||||
end
|
||||
|
||||
public
|
||||
def finished?
|
||||
return @plugin_state == :finished
|
||||
end # def finished?
|
||||
|
||||
public
|
||||
def running?
|
||||
return @plugin_state != :finished
|
||||
end # def finished?
|
||||
|
||||
end # class LogStash::Plugin
|
|
@ -1,6 +1,7 @@
|
|||
|
||||
require "logstash/namespace"
|
||||
require "logstash/logging"
|
||||
require "logstash/plugin"
|
||||
require "logstash/event"
|
||||
|
||||
class LogStash::Search::Base
|
||||
|
|
|
@ -60,9 +60,11 @@ class LogStash::Web::Server < Sinatra::Base
|
|||
when "elasticsearch"
|
||||
# if host is nil, it will
|
||||
# TODO(sissel): Support 'cluster' name?
|
||||
cluster_name = (backend_url.path != "/" ? backend_url.path[1..-1] : nil)
|
||||
@backend = LogStash::Search::ElasticSearch.new(
|
||||
:host => backend_url.host,
|
||||
:port => backend_url.port
|
||||
:port => backend_url.port,
|
||||
:cluster => cluster_name
|
||||
)
|
||||
when "twitter"
|
||||
require "logstash/search/twitter"
|
||||
|
@ -118,7 +120,9 @@ opts = OptionParser.new do |opts|
|
|||
end
|
||||
|
||||
opts.on("-b", "--backend URL",
|
||||
"The backend URL to use. Default is elasticserach:/// (assumes multicast discovery)") do |url|
|
||||
"The backend URL to use. Default is elasticserach:/// (assumes " \
|
||||
"multicast discovery); You can specify " \
|
||||
"elasticsearch://[host][:port]/[clustername]") do |url|
|
||||
settings.backend_url = url
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue