mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
commit
1216c67bed
48 changed files with 659 additions and 657 deletions
|
@ -21,7 +21,9 @@ Contributors:
|
|||
* Joel Merrick (joelio)
|
||||
* Michael Leinartas (mleinart)
|
||||
* Jesse Newland (jnewland)
|
||||
* Yuval Oren (yuvaloren)
|
||||
|
||||
Note: If you've sent me patches, bug reports, or other stuff for logstash, and
|
||||
you aren't on the list above, please let me know and I'll make sure you're here.
|
||||
Contributions from folks like you are what make open source the bomb.
|
||||
you aren't on the list above, and want to be, please let me know and I'll make
|
||||
sure you're here. Contributions from folks like you are what make open source
|
||||
the bomb.
|
||||
|
|
3
Gemfile
3
Gemfile
|
@ -4,11 +4,12 @@ def jruby?
|
|||
return RUBY_ENGINE == "jruby"
|
||||
end
|
||||
|
||||
gem "cabin", "0.1.3" # for logging. apache 2 license
|
||||
gem "bunny" # for amqp support, MIT-style license
|
||||
gem "uuidtools" # for naming amqp queues, License ???
|
||||
gem "filewatch", "~> 0.3.0" # for file tailing, BSD License
|
||||
gem "jls-grok", "0.9.0" # for grok filter, BSD License
|
||||
jruby? and gem "jruby-elasticsearch", "~> 0.0.10" # BSD License
|
||||
jruby? and gem "jruby-elasticsearch", "~> 0.0.11" # BSD License
|
||||
gem "stomp" # for stomp protocol, Apache 2.0 License
|
||||
gem "json" # Ruby license
|
||||
gem "awesome_print" # MIT License
|
||||
|
|
23
Gemfile.lock
23
Gemfile.lock
|
@ -3,9 +3,12 @@ GEM
|
|||
specs:
|
||||
awesome_print (0.4.0)
|
||||
bouncy-castle-java (1.5.0146.1)
|
||||
bson (1.4.0)
|
||||
bson (1.4.0-java)
|
||||
bunny (0.7.6)
|
||||
ffi-rzmq (0.8.2)
|
||||
cabin (0.1.3)
|
||||
json
|
||||
filewatch (0.3.0)
|
||||
gelf (1.1.3)
|
||||
json
|
||||
|
@ -13,22 +16,26 @@ GEM
|
|||
gmetric (0.1.3)
|
||||
haml (3.1.3)
|
||||
jls-grok (0.9.0)
|
||||
jruby-elasticsearch (0.0.10)
|
||||
jruby-elasticsearch (0.0.12)
|
||||
jruby-openssl (0.7.4)
|
||||
bouncy-castle-java
|
||||
json (1.6.1)
|
||||
json (1.6.1-java)
|
||||
minitest (2.6.1)
|
||||
mizuno (0.4.0)
|
||||
rack (>= 1.0.0)
|
||||
mongo (1.4.0)
|
||||
bson (= 1.4.0)
|
||||
rack (1.3.3)
|
||||
rack (1.3.4)
|
||||
rack-protection (1.1.4)
|
||||
rack
|
||||
rake (0.9.2)
|
||||
redis (2.2.2)
|
||||
sass (3.1.7)
|
||||
sinatra (1.2.6)
|
||||
rack (~> 1.1)
|
||||
tilt (>= 1.2.2, < 2.0)
|
||||
sass (3.1.10)
|
||||
sinatra (1.3.1)
|
||||
rack (~> 1.3, >= 1.3.4)
|
||||
rack-protection (~> 1.1, >= 1.1.2)
|
||||
tilt (~> 1.3, >= 1.3.3)
|
||||
statsd-ruby (0.3.0)
|
||||
stomp (1.1.9)
|
||||
tilt (1.3.3)
|
||||
|
@ -37,18 +44,20 @@ GEM
|
|||
|
||||
PLATFORMS
|
||||
java
|
||||
ruby
|
||||
|
||||
DEPENDENCIES
|
||||
awesome_print
|
||||
bunny
|
||||
ffi-rzmq
|
||||
cabin (= 0.1.3)
|
||||
filewatch (~> 0.3.0)
|
||||
gelf
|
||||
gelfd (~> 0.1.0)
|
||||
gmetric (~> 0.1.3)
|
||||
haml
|
||||
jls-grok (= 0.9.0)
|
||||
jruby-elasticsearch (~> 0.0.10)
|
||||
jruby-elasticsearch (~> 0.0.11)
|
||||
jruby-openssl
|
||||
json
|
||||
minitest
|
||||
|
|
|
@ -135,7 +135,7 @@ class LogStash::Agent
|
|||
# These are 'unknown' flags that begin --<plugin>-flag
|
||||
# Put any plugin paths into the ruby library path for requiring later.
|
||||
@plugin_paths.each do |p|
|
||||
@logger.debug("Adding #{p.inspect} to ruby load path")
|
||||
@logger.debug("Adding to ruby load path", :path => p)
|
||||
$:.unshift p
|
||||
end
|
||||
|
||||
|
@ -158,16 +158,17 @@ class LogStash::Agent
|
|||
%w{inputs outputs filters}.each do |component|
|
||||
@plugin_paths.each do |path|
|
||||
plugin = File.join(path, component, name) + ".rb"
|
||||
@logger.debug("Flag #{arg} found; trying to load #{plugin}")
|
||||
@logger.debug("Plugin flag found; trying to load it",
|
||||
:flag => arg, :plugin => plugin)
|
||||
if File.file?(plugin)
|
||||
@logger.info("Loading plugin #{plugin}")
|
||||
@logger.info("Loading plugin", :plugin => plugin)
|
||||
require plugin
|
||||
[LogStash::Inputs, LogStash::Filters, LogStash::Outputs].each do |c|
|
||||
# If we get flag --foo-bar, check for LogStash::Inputs::Foo
|
||||
# and add any options to our option parser.
|
||||
klass_name = name.capitalize
|
||||
if c.const_defined?(klass_name)
|
||||
@logger.debug("Found plugin class #{c}::#{klass_name})")
|
||||
@logger.debug("Found plugin class", :class => "#{c}::#{klass_name})")
|
||||
klass = c.const_get(klass_name)
|
||||
# See LogStash::Config::Mixin::DSL#options
|
||||
klass.options(@opts)
|
||||
|
@ -187,7 +188,7 @@ class LogStash::Agent
|
|||
begin
|
||||
remainder = @opts.parse(args)
|
||||
rescue OptionParser::InvalidOption => e
|
||||
@logger.info e
|
||||
@logger.info("Invalid option", :exception => e)
|
||||
raise e
|
||||
end
|
||||
|
||||
|
@ -197,22 +198,22 @@ class LogStash::Agent
|
|||
private
|
||||
def configure
|
||||
if @config_path && @config_string
|
||||
@logger.fatal "Can't use -f and -e at the same time"
|
||||
@logger.fatal("Can't use -f and -e at the same time")
|
||||
raise "Configuration problem"
|
||||
elsif (@config_path.nil? || @config_path.empty?) && @config_string.nil?
|
||||
@logger.fatal "No config file given. (missing -f or --config flag?)"
|
||||
@logger.fatal @opts.help
|
||||
@logger.fatal("No config file given. (missing -f or --config flag?)")
|
||||
@logger.fatal(@opts.help)
|
||||
raise "Configuration problem"
|
||||
end
|
||||
|
||||
#if @config_path and !File.exist?(@config_path)
|
||||
if @config_path and Dir.glob(@config_path).length == 0
|
||||
@logger.fatal "Config file '#{@config_path}' does not exist."
|
||||
@logger.fatal("Config file does not exist.", :path => @config_path)
|
||||
raise "Configuration problem"
|
||||
end
|
||||
|
||||
if @daemonize
|
||||
@logger.fatal "Can't daemonize, no support yet in JRuby."
|
||||
@logger.fatal("Can't daemonize, no support yet in JRuby.")
|
||||
raise "Can't daemonize, no fork in JRuby."
|
||||
end
|
||||
|
||||
|
@ -227,14 +228,14 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
if @verbose >= 3 # Uber debugging.
|
||||
@logger.level = Logger::DEBUG
|
||||
@logger.level = :debug
|
||||
$DEBUG = true
|
||||
elsif @verbose == 2 # logstash debug logs
|
||||
@logger.level = Logger::DEBUG
|
||||
@logger.level = :debug
|
||||
elsif @verbose == 1 # logstash info logs
|
||||
@logger.level = Logger::INFO
|
||||
@logger.level = :info
|
||||
else # Default log level
|
||||
@logger.level = Logger::WARN
|
||||
@logger.level = :warn
|
||||
end
|
||||
end # def configure
|
||||
|
||||
|
@ -243,7 +244,8 @@ class LogStash::Agent
|
|||
# Support directory of config files.
|
||||
# https://logstash.jira.com/browse/LOGSTASH-106
|
||||
if File.directory?(@config_path)
|
||||
@logger.debug("Loading '#{@config_path}' as directory")
|
||||
@logger.debug("Config path is a directory, scanning files",
|
||||
:path => @config_path)
|
||||
paths = Dir.glob(File.join(@config_path, "*")).sort
|
||||
else
|
||||
# Get a list of files matching a glob. If the user specified a single
|
||||
|
@ -330,7 +332,7 @@ class LogStash::Agent
|
|||
|
||||
private
|
||||
def start_input(input)
|
||||
@logger.debug(["Starting input", input])
|
||||
@logger.debug("Starting input", :plugin => input)
|
||||
# inputs should write directly to output queue if there are no filters.
|
||||
input_target = @filters.length > 0 ? @filter_queue : @output_queue
|
||||
@plugins[input] = Thread.new(input, input_target) do |*args|
|
||||
|
@ -340,7 +342,7 @@ class LogStash::Agent
|
|||
|
||||
private
|
||||
def start_output(output)
|
||||
@logger.debug(["Starting output", output])
|
||||
@logger.debug("Starting output", :plugin => output)
|
||||
queue = SizedQueue.new(10)
|
||||
@output_queue.add_queue(queue)
|
||||
@output_plugin_queues[output] = queue
|
||||
|
@ -462,9 +464,9 @@ class LogStash::Agent
|
|||
|
||||
finished_queue = Queue.new
|
||||
# Tell everything to shutdown.
|
||||
@logger.debug(plugins.keys.collect(&:to_s))
|
||||
@logger.debug("Plugins to shutdown", :plugins => plugins.keys.collect(&:to_s))
|
||||
plugins.each do |p, thread|
|
||||
@logger.debug("Telling to shutdown: #{p.to_s}")
|
||||
@logger.debug("Sending shutdown to: #{p.to_s}", :plugin => p)
|
||||
p.shutdown(finished_queue)
|
||||
end
|
||||
|
||||
|
@ -474,7 +476,7 @@ class LogStash::Agent
|
|||
while remaining.size > 0
|
||||
if (Time.now > force_shutdown_time)
|
||||
@logger.warn("Time to quit, even if some plugins aren't finished yet.")
|
||||
@logger.warn("Stuck plugins? #{remaining.map(&:first).join(", ")}")
|
||||
@logger.warn("Stuck plugins?", :remaining => remaining.map(&:first))
|
||||
break
|
||||
end
|
||||
|
||||
|
@ -485,9 +487,9 @@ class LogStash::Agent
|
|||
sleep(1)
|
||||
else
|
||||
remaining = plugins.select { |p, thread| plugin.running? }
|
||||
@logger.debug("#{p.to_s} finished, waiting on " \
|
||||
"#{remaining.size} plugins; " \
|
||||
"#{remaining.map(&:first).join(", ")}")
|
||||
@logger.debug("Plugin #{p.to_s} finished, waiting on the rest.",
|
||||
:count => remaining.size,
|
||||
:remaining => remaining.map(&:first))
|
||||
end
|
||||
end # while remaining.size > 0
|
||||
end
|
||||
|
@ -506,7 +508,7 @@ class LogStash::Agent
|
|||
config = read_config
|
||||
reloaded_inputs, reloaded_filters, reloaded_outputs = parse_config(config)
|
||||
rescue Exception => e
|
||||
@logger.error "Aborting reload due to bad configuration: #{e}"
|
||||
@logger.error("Aborting reload due to bad configuration", :exception => e)
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -526,7 +528,7 @@ class LogStash::Agent
|
|||
obsolete_plugins[p] = @plugins[p]
|
||||
@plugins.delete(p)
|
||||
else
|
||||
@logger.warn("Couldn't find input plugin to stop: #{p}")
|
||||
@logger.warn("Couldn't find input plugin to stop", :plugin => p)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -536,7 +538,7 @@ class LogStash::Agent
|
|||
@plugins.delete(p)
|
||||
@output_queue.remove_queue(@output_plugin_queues[p])
|
||||
else
|
||||
@logger.warn("Couldn't find output plugin to stop: #{p}")
|
||||
@logger.warn("Couldn't find output plugin to stop", :plugin => p)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -548,7 +550,7 @@ class LogStash::Agent
|
|||
deleted_filters.each {|f| obsolete_plugins[f] = nil}
|
||||
|
||||
if obsolete_plugins.size > 0
|
||||
@logger.info("Stopping removed plugins:\n\t" + obsolete_plugins.keys.join("\n\t"))
|
||||
@logger.info("Stopping removed plugins:", :plugins => obsolete_plugins.keys)
|
||||
shutdown_plugins(obsolete_plugins)
|
||||
end
|
||||
# require 'pry'; binding.pry()
|
||||
|
@ -556,7 +558,7 @@ class LogStash::Agent
|
|||
# Start up filters
|
||||
if new_filters.size > 0 || deleted_filters.size > 0
|
||||
if new_filters.size > 0
|
||||
@logger.info("Starting new filters: #{new_filters.join(', ')}")
|
||||
@logger.info("Starting new filters", :plugins => new_filters)
|
||||
new_filters.each do |f|
|
||||
f.logger = @logger
|
||||
f.register
|
||||
|
@ -569,13 +571,13 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
if new_inputs.size > 0
|
||||
@logger.info("Starting new inputs: #{new_inputs.join(', ')}")
|
||||
@logger.info("Starting new inputs", :plugins => new_inputs)
|
||||
new_inputs.each do |p|
|
||||
start_input(p)
|
||||
end
|
||||
end
|
||||
if new_outputs.size > 0
|
||||
@logger.info("Starting new outputs: #{new_outputs.join(', ')}")
|
||||
@logger.info("Starting new outputs", :plugins => new_outputs)
|
||||
new_inputs.each do |p|
|
||||
start_output(p)
|
||||
end
|
||||
|
@ -637,7 +639,7 @@ class LogStash::Agent
|
|||
LogStash::Util::set_thread_name("input|#{input.to_s}")
|
||||
input.logger = @logger
|
||||
input.register
|
||||
@logger.info("Input #{input.to_s} registered")
|
||||
@logger.info("Input registered", :plugin => input)
|
||||
@ready_queue << input
|
||||
done = false
|
||||
|
||||
|
@ -646,12 +648,11 @@ class LogStash::Agent
|
|||
input.run(queue)
|
||||
done = true
|
||||
rescue => e
|
||||
@logger.warn(["Input #{input.to_s} thread exception", e])
|
||||
@logger.debug(["Input #{input.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
@logger.error("Restarting input #{input.to_s} due to exception")
|
||||
@logger.warn("Input thread exception", :plugin => input,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
@logger.error("Restarting input due to exception", :plugin => input)
|
||||
sleep(1)
|
||||
retry # This jumps to the top of this proc (to the start of 'do')
|
||||
retry # This jumps to the top of the 'begin'
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -668,7 +669,7 @@ class LogStash::Agent
|
|||
def run_filter(filterworker, index, output_queue)
|
||||
LogStash::Util::set_thread_name("filter|worker|#{index}")
|
||||
filterworker.run
|
||||
@logger.warn("Filter worker ##{index} shutting down")
|
||||
@logger.warn("Filter worker shutting down", :index => index)
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::FilterWorker, output_queue) unless @reloading
|
||||
|
@ -679,26 +680,25 @@ class LogStash::Agent
|
|||
LogStash::Util::set_thread_name("output|#{output.to_s}")
|
||||
output.logger = @logger
|
||||
output.register
|
||||
@logger.info("Output #{output.to_s} registered")
|
||||
@logger.info("Output registered", :plugin => output)
|
||||
@ready_queue << output
|
||||
|
||||
# TODO(sissel): We need a 'reset' or 'restart' method to call on errors
|
||||
|
||||
begin
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
@logger.debug("Sending event", :target => output)
|
||||
output.handle(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
@logger.debug(["Output #{output.to_s} thread exception backtrace",
|
||||
e.backtrace])
|
||||
@logger.warn("Output thread exception", :plugin => output,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
sleep(1)
|
||||
retry
|
||||
end # begin/rescue
|
||||
|
||||
@logger.warn("Output #{input.to_s} shutting down")
|
||||
@logger.warn("Output shutting down", :plugin => output)
|
||||
|
||||
# If we get here, the plugin finished, check if we need to shutdown.
|
||||
shutdown_if_none_running(LogStash::Outputs::Base) unless @reloading
|
||||
|
@ -714,7 +714,8 @@ class LogStash::Agent
|
|||
remaining = @plugins.count do |plugin, thread|
|
||||
plugin.is_a?(pluginclass) and plugin.running?
|
||||
end
|
||||
@logger.debug("#{pluginclass} still running: #{remaining}")
|
||||
@logger.debug("Plugins still running", :type => pluginclass,
|
||||
:remaining => remaining)
|
||||
|
||||
if remaining == 0
|
||||
@logger.debug("All #{pluginclass} finished. Shutting down.")
|
||||
|
|
|
@ -11,7 +11,7 @@ class LogStash::Config::File
|
|||
def initialize(path=nil, string=nil)
|
||||
@path = path
|
||||
@string = string
|
||||
@logger = Logger.new(STDERR)
|
||||
@logger = LogStash::Logger.new(STDERR)
|
||||
|
||||
if (path.nil? and string.nil?) or (!path.nil? and !string.nil?)
|
||||
raise "Must give path or string, not both or neither"
|
||||
|
|
|
@ -44,7 +44,7 @@ module LogStash::Config::Mixin
|
|||
# Validation will modify the values inside params if necessary.
|
||||
# For example: converting a string to a number, etc.
|
||||
if !self.class.validate(params)
|
||||
@logger.error "Config validation failed."
|
||||
@logger.error("Config validation failed.")
|
||||
exit 1
|
||||
end
|
||||
|
||||
|
@ -53,7 +53,7 @@ module LogStash::Config::Mixin
|
|||
opts = self.class.get_config[name]
|
||||
if opts && opts[:deprecated]
|
||||
@logger.warn("Deprecated config item #{name.inspect} set " +
|
||||
"in #{self.class.name}")
|
||||
"in #{self.class.name}", :name => name, :plugin => self)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -149,7 +149,14 @@ class LogStash::Event
|
|||
# Append all fields
|
||||
event.fields.each do |name, value|
|
||||
if self.fields.include?(name)
|
||||
self.fields[name] |= value
|
||||
if !self.fields[name].is_a?(Array)
|
||||
self.fields[name] = [self.fields[name]]
|
||||
end
|
||||
if value.is_a?(Array)
|
||||
self.fields[name] |= value
|
||||
else
|
||||
self.fields[name] << value
|
||||
end
|
||||
else
|
||||
self.fields[name] = value
|
||||
end
|
||||
|
|
|
@ -63,11 +63,12 @@ class LogStash::Filters::Base < LogStash::Plugin
|
|||
(@add_field or {}).each do |field, value|
|
||||
event[field] ||= []
|
||||
event[field] << event.sprintf(value)
|
||||
@logger.debug("filters/#{self.class.name}: adding #{value} to field #{field}")
|
||||
@logger.debug("filters/#{self.class.name}: adding value to field",
|
||||
:field => field, :value => value)
|
||||
end
|
||||
|
||||
(@add_tag or []).each do |tag|
|
||||
@logger.debug("filters/#{self.class.name}: adding tag #{tag}")
|
||||
@logger.debug("filters/#{self.class.name}: adding tag", :tag => tag)
|
||||
event.tags << event.sprintf(tag)
|
||||
#event.tags |= [ event.sprintf(tag) ]
|
||||
end
|
||||
|
|
|
@ -94,7 +94,8 @@ class LogStash::Filters::Date < LogStash::Filters::Base
|
|||
missing = DATEPATTERNS.reject { |p| format.include?(p) }
|
||||
end
|
||||
|
||||
@logger.debug "Adding type #{@type} with date config: #{field} => #{format}"
|
||||
@logger.debug("Adding type with date config", :type => @type,
|
||||
:field => field, :format => format)
|
||||
@parsers[field] << {
|
||||
:parser => parser.withOffsetParsed,
|
||||
:missing => missing
|
||||
|
@ -105,13 +106,13 @@ class LogStash::Filters::Date < LogStash::Filters::Base
|
|||
|
||||
public
|
||||
def filter(event)
|
||||
@logger.debug "DATE FILTER: received event of type #{event.type}"
|
||||
@logger.debug("Date filter: received event", :type => event.type)
|
||||
return unless event.type == @type
|
||||
now = Time.now
|
||||
|
||||
@parsers.each do |field, fieldparsers|
|
||||
|
||||
@logger.debug "DATE FILTER: type #{event.type}, looking for field #{field.inspect}"
|
||||
@logger.debug("Date filter: type #{event.type}, looking for field #{field.inspect}",
|
||||
:type => event.type, :field => field)
|
||||
# TODO(sissel): check event.message, too.
|
||||
next unless event.fields.member?(field)
|
||||
|
||||
|
@ -165,12 +166,14 @@ class LogStash::Filters::Date < LogStash::Filters::Base
|
|||
time = time.withZone(org.joda.time.DateTimeZone.forID("UTC"))
|
||||
event.timestamp = time.to_s
|
||||
#event.timestamp = LogStash::Time.to_iso8601(time)
|
||||
@logger.debug "Parsed #{value.inspect} as #{event.timestamp}"
|
||||
@logger.debug("Date parsing done", :value => value, :timestamp => event.timestamp)
|
||||
rescue => e
|
||||
@logger.warn "Failed parsing date #{value.inspect} from field #{field}: #{e}"
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("Failed parsing date from field", :field => field,
|
||||
:value => value, :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
# Raising here will bubble all the way up and cause an exit.
|
||||
# TODO(sissel): Maybe we shouldn't raise?
|
||||
# TODO(sissel): What do we do on a failure? Tag it like grok does?
|
||||
#raise e
|
||||
end # begin
|
||||
end # fieldvalue.each
|
||||
|
|
|
@ -43,23 +43,25 @@ class LogStash::Filters::Grep < LogStash::Filters::Base
|
|||
|
||||
re = Regexp.new(pattern)
|
||||
@patterns[field] << re
|
||||
@logger.debug(["grep: #{@type}/#{field}", pattern, re])
|
||||
@logger.debug("Registered grep", :type => @type, :field => field,
|
||||
:pattern => pattern, :regexp => re)
|
||||
end # @match.merge.each
|
||||
end # def register
|
||||
|
||||
public
|
||||
def filter(event)
|
||||
if event.type != @type
|
||||
@logger.debug("grep: skipping type #{event.type} from #{event.source}")
|
||||
@logger.debug("grep: skipping non-matching event type", :type =>
|
||||
event.type, :wanted_type => @type, :event => event)
|
||||
return
|
||||
end
|
||||
|
||||
@logger.debug(["Running grep filter", event.to_hash, config])
|
||||
@logger.debug("Running grep filter", :event => event, :config => config)
|
||||
matches = 0
|
||||
@patterns.each do |field, regexes|
|
||||
if !event[field]
|
||||
@logger.debug(["Skipping match object, field not present", field,
|
||||
event, event[field]])
|
||||
@logger.debug("Skipping match object, field not present",
|
||||
:field => field, :event => event)
|
||||
next
|
||||
end
|
||||
|
||||
|
@ -77,13 +79,13 @@ class LogStash::Filters::Grep < LogStash::Filters::Base
|
|||
|
||||
(event[field].is_a?(Array) ? event[field] : [event[field]]).each do |value|
|
||||
if @negate
|
||||
@logger.debug(["want negate match", re, value])
|
||||
@logger.debug("negate match", :regexp => re, :value => value)
|
||||
next if re.match(value)
|
||||
@logger.debug(["grep not-matched (negate requsted)", { field => value }])
|
||||
@logger.debug("grep not-matched (negate requested)", field => value)
|
||||
else
|
||||
@logger.debug(["want match", re, value])
|
||||
@logger.debug("want match", :regexp => re, :value => value)
|
||||
next unless re.match(value)
|
||||
@logger.debug(["grep matched", { field => value }])
|
||||
@logger.debug("grep matched", field => value)
|
||||
end
|
||||
match_count += 1
|
||||
break
|
||||
|
@ -92,10 +94,9 @@ class LogStash::Filters::Grep < LogStash::Filters::Base
|
|||
|
||||
if match_count == match_want
|
||||
matches += 1
|
||||
@logger.debug("matched all fields (#{match_count})")
|
||||
@logger.debug("matched all fields", :count => match_count)
|
||||
else
|
||||
@logger.debug("match block failed " \
|
||||
"(#{match_count}/#{match_want} matches)")
|
||||
@logger.debug("match failed", :count => match_count, :wanted => match_want)
|
||||
end # match["match"].each
|
||||
end # @patterns.each
|
||||
|
||||
|
@ -111,6 +112,6 @@ class LogStash::Filters::Grep < LogStash::Filters::Base
|
|||
return
|
||||
end
|
||||
|
||||
@logger.debug(["Event after grep filter", event.to_hash])
|
||||
@logger.debug("Event after grep filter", :event => event)
|
||||
end # def filter
|
||||
end # class LogStash::Filters::Grep
|
||||
|
|
|
@ -79,6 +79,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
# If true, only store named captures from grok.
|
||||
config :named_captures_only, :validate => :boolean, :default => false
|
||||
|
||||
# TODO(sissel): Add this feature?
|
||||
# When disabled, any pattern that matches the entire string will not be set.
|
||||
# This is useful if you have named patterns like COMBINEDAPACHELOG that will
|
||||
# match entire events and you really don't want to add a field
|
||||
# 'COMBINEDAPACHELOG' that is set to the whole event line.
|
||||
#config :capture_full_match_patterns, :validate => :boolean, :default => false
|
||||
|
||||
|
||||
# Detect if we are running from a jarfile, pick the right path.
|
||||
@@patterns_path ||= Set.new
|
||||
if __FILE__ =~ /file:\/.*\.jar!.*/
|
||||
|
@ -102,18 +110,17 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
|
||||
public
|
||||
def register
|
||||
gem "jls-grok", ">=0.4.3"
|
||||
require "grok-pure" # rubygem 'jls-grok'
|
||||
|
||||
@patternfiles = []
|
||||
@patterns_dir += @@patterns_path.to_a
|
||||
@logger.info("Grok patterns path: #{@patterns_dir.join(":")}")
|
||||
@logger.info("Grok patterns path", :patterns_dir => @patterns_dir)
|
||||
@patterns_dir.each do |path|
|
||||
# Can't read relative paths from jars, try to normalize away '../'
|
||||
while path =~ /file:\/.*\.jar!.*\/\.\.\//
|
||||
# replace /foo/bar/../baz => /foo/baz
|
||||
path = path.gsub(/[^\/]+\/\.\.\//, "")
|
||||
@logger.debug "In-jar path to read: #{path}"
|
||||
@logger.debug("In-jar path to read", :path => path)
|
||||
end
|
||||
|
||||
if File.directory?(path)
|
||||
|
@ -121,14 +128,14 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
end
|
||||
|
||||
Dir.glob(path).each do |file|
|
||||
@logger.info("Grok loading patterns from #{file}")
|
||||
@logger.info("Grok loading patterns from file", :path => file)
|
||||
@patternfiles << file
|
||||
end
|
||||
end
|
||||
|
||||
@patterns = Hash.new { |h,k| h[k] = [] }
|
||||
|
||||
@logger.info(:match => @match)
|
||||
@logger.info("Match data", :match => @match)
|
||||
|
||||
# TODO(sissel): Hash.merge actually overrides, not merges arrays.
|
||||
# Work around it by implementing our own?
|
||||
|
@ -142,11 +149,13 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
|
||||
if !@patterns.include?(field)
|
||||
@patterns[field] = Grok::Pile.new
|
||||
@patterns[field].logger = @logger
|
||||
|
||||
add_patterns_from_files(@patternfiles, @patterns[field])
|
||||
end
|
||||
@logger.info(["Grok compile", { :field => field, :patterns => patterns }])
|
||||
@logger.info("Grok compile", :field => field, :patterns => patterns)
|
||||
patterns.each do |pattern|
|
||||
@logger.debug(["regexp: #{@type}/#{field}", pattern])
|
||||
@logger.debug("regexp: #{@type}/#{field}", :pattern => pattern)
|
||||
@patterns[field].compile(pattern)
|
||||
end
|
||||
end # @config.each
|
||||
|
@ -158,26 +167,23 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
matched = false
|
||||
|
||||
# Only filter events we are configured for
|
||||
if event.type != @type
|
||||
return
|
||||
end
|
||||
|
||||
if @type != event.type
|
||||
@logger.debug("Skipping grok for event type=#{event.type} (wanted '#{@type}')")
|
||||
@logger.debug("Skipping grok for event with wrong type",
|
||||
:type => event.type, :wanted_type => @type)
|
||||
return
|
||||
end
|
||||
|
||||
@logger.debug(["Running grok filter", event])
|
||||
@logger.debug("Running grok filter", :event => event);
|
||||
done = false
|
||||
@patterns.each do |field, pile|
|
||||
break if done
|
||||
if !event[field]
|
||||
@logger.debug(["Skipping match object, field not present", field,
|
||||
event, event[field]])
|
||||
@logger.debug("Skipping match object, field not present",
|
||||
:field => field, :event => event)
|
||||
next
|
||||
end
|
||||
|
||||
@logger.debug(["Trying pattern for type #{event.type}", { :pile => pile, :field => field }])
|
||||
@logger.debug("Trying pattern", :pile => pile, :field => field )
|
||||
(event[field].is_a?(Array) ? event[field] : [event[field]]).each do |fieldvalue|
|
||||
grok, match = pile.match(fieldvalue)
|
||||
next unless match
|
||||
|
@ -205,15 +211,16 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
value = value.to_f
|
||||
end
|
||||
|
||||
# Special casing to skip captures that represent the entire log message.
|
||||
if fieldvalue == value and field == "@message"
|
||||
# Skip patterns that match the entire message
|
||||
@logger.debug("Skipping capture '#{key}' since it matches the whole line.")
|
||||
@logger.debug("Skipping capture since it matches the whole line.", :field => key)
|
||||
next
|
||||
end
|
||||
|
||||
if @named_captures_only && key =~ /^[A-Z]+/
|
||||
@logger.debug("Skipping capture '#{key}' since it is not a named " \
|
||||
"capture and named_captures_only is true.")
|
||||
@logger.debug("Skipping capture since it is not a named " \
|
||||
"capture and named_captures_only is true.", :field => key)
|
||||
next
|
||||
end
|
||||
|
||||
|
@ -240,7 +247,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
event.tags << "_grokparsefailure"
|
||||
end
|
||||
|
||||
@logger.debug(["Event now: ", event.to_hash])
|
||||
@logger.debug("Event now: ", :event => event)
|
||||
end # def filter
|
||||
|
||||
private
|
||||
|
@ -259,8 +266,8 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
# the end. I don't know if this is a bug or intentional, but we need
|
||||
# to chomp it.
|
||||
name, pattern = line.chomp.split(/\s+/, 2)
|
||||
@logger.debug "Adding pattern '#{name}' from file #{path}"
|
||||
@logger.debug name => pattern
|
||||
@logger.debug("Adding pattern from file", :name => name,
|
||||
:pattern => pattern, :path => path)
|
||||
pile.add_pattern(name, pattern)
|
||||
end
|
||||
else
|
||||
|
|
|
@ -70,6 +70,20 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
|
|||
|
||||
# Negate the regexp pattern ('if not matched')
|
||||
config :negate, :validate => :boolean, :default => false
|
||||
|
||||
# The stream identity is how the multiline filter determines which stream an
|
||||
# event belongs. This is generally used for differentiating, say, events
|
||||
# coming from multiple files in the same file input, or multiple connections
|
||||
# coming from a tcp input.
|
||||
#
|
||||
# The default value here is usually what you want, but there are some cases
|
||||
# where you want to change it. One such example is if you are using a tcp
|
||||
# input with only one client connecting at any time. If that client
|
||||
# reconnects (due to error or client restart), then logstash will identify
|
||||
# the new connection as a new stream and break any multiline goodness that
|
||||
# may have occurred between the old and new connection. To solve this use
|
||||
# case, you can use "%{@source_host}.%{@type}" instead.
|
||||
config :stream_identity , :validate => :string, :default => "%{@source}.%{@type}"
|
||||
|
||||
public
|
||||
def initialize(config = {})
|
||||
|
@ -82,14 +96,13 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
|
|||
|
||||
public
|
||||
def register
|
||||
@logger.debug "Setting type #{@type.inspect} to the config #{@config.inspect}"
|
||||
|
||||
begin
|
||||
@pattern = Regexp.new(@pattern)
|
||||
rescue RegexpError => e
|
||||
@logger.fatal(["Invalid pattern for multiline filter on type '#{@type}'",
|
||||
@pattern, e])
|
||||
@logger.fatal("Invalid pattern for multiline filter",
|
||||
:pattern => @pattern, :exception => e, :backtrace => e.backtrace)
|
||||
end
|
||||
@logger.debug("Registered multiline plugin", :type => @type, :config => @config)
|
||||
end # def register
|
||||
|
||||
public
|
||||
|
@ -97,10 +110,11 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
|
|||
return unless event.type == @type
|
||||
|
||||
match = @pattern.match(event.message)
|
||||
key = [event.source, event.type]
|
||||
key = event.sprintf(@stream_identity)
|
||||
pending = @pending[key]
|
||||
|
||||
@logger.debug(["Reg: ", @pattern, event.message, { :match => match, :negate => @negate }])
|
||||
@logger.debug("Multiline", :pattern => @pattern, :message => event.message,
|
||||
:match => match, :negate => @negate)
|
||||
|
||||
# Add negate option
|
||||
match = (match and !@negate) || (!match and @negate)
|
||||
|
@ -152,7 +166,8 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
|
|||
end
|
||||
end # if/else match
|
||||
else
|
||||
@logger.warn(["Unknown multiline 'what' value.", { :what => @what }])
|
||||
# TODO(sissel): Make this part of the 'register' method.
|
||||
@logger.warn("Unknown multiline 'what' value.", :what => @what)
|
||||
end # case @what
|
||||
|
||||
if !event.cancelled?
|
||||
|
@ -161,14 +176,14 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
|
|||
filter_matched(event) if !event.cancelled?
|
||||
end # def filter
|
||||
|
||||
# flush any pending messages
|
||||
# Flush any pending messages. This is generally used for unit testing only.
|
||||
public
|
||||
def flush(source, type)
|
||||
key = [source, type]
|
||||
def flush(key)
|
||||
if @pending[key]
|
||||
event = @pending[key]
|
||||
@pending.delete(key)
|
||||
end
|
||||
return event
|
||||
end # def flush
|
||||
|
||||
end # class LogStash::Filters::Date
|
||||
|
|
|
@ -30,8 +30,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
|
|||
# TODO(sissel): Validate conversion requests if provided.
|
||||
@convert.nil? or @convert.each do |field, type|
|
||||
if !valid_conversions.include?(type)
|
||||
@logger.error(["Invalid conversion type",
|
||||
{ "type" => type, "expected one of" => valid_types }])
|
||||
@logger.error("Invalid conversion type",
|
||||
"type" => type, "expected one of" => valid_types)
|
||||
# TODO(sissel): It's 2011, man, let's actually make like.. a proper
|
||||
# 'configuration broken' exception
|
||||
raise "Bad configuration, aborting."
|
||||
|
@ -84,8 +84,8 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base
|
|||
# calls convert_{string,integer,float} depending on type requested.
|
||||
converter = method("convert_" + type)
|
||||
if original.is_a?(Hash)
|
||||
@logger.debug(["I don't know how to type convert a hash, skipping",
|
||||
{ "field" => field, "value" => original }])
|
||||
@logger.debug("I don't know how to type convert a hash, skipping",
|
||||
:field => field, :value => original)
|
||||
next
|
||||
elsif original.is_a?(Array)
|
||||
value = original.map { |v| converter.call(v) }
|
||||
|
|
|
@ -48,7 +48,7 @@ class LogStash::Filters::Split < LogStash::Filters::Base
|
|||
next if value.empty?
|
||||
|
||||
event_split = event.clone
|
||||
@logger.debug(["Split event", { :value => value, :field => @field }])
|
||||
@logger.debug("Split event", :value => value, :field => @field)
|
||||
event_split[@field] = value
|
||||
filter_matched(event_split)
|
||||
|
||||
|
|
|
@ -58,15 +58,14 @@ class LogStash::FilterWorker < LogStash::Plugin
|
|||
events << newevent
|
||||
end
|
||||
if event.cancelled?
|
||||
@logger.debug({:message => "Event cancelled",
|
||||
:event => event,
|
||||
:filter => filter.class,
|
||||
})
|
||||
@logger.debug("Event cancelled", :event => event,
|
||||
:filter => filter.class)
|
||||
break
|
||||
end
|
||||
end # @filters.each
|
||||
|
||||
@logger.debug(["Event finished filtering", { :event => event, :thread => Thread.current[:name] }])
|
||||
@logger.debug("Event finished filtering", :event => event,
|
||||
:thread => Thread.current[:name])
|
||||
@output_queue.push(event) unless event.cancelled?
|
||||
end # events.each
|
||||
end # def filter
|
||||
|
|
|
@ -81,11 +81,11 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
fields = JSON.parse(raw)
|
||||
fields.each { |k, v| event[k] = v }
|
||||
rescue => e
|
||||
@logger.warn({:message => "Trouble parsing json input",
|
||||
:input => raw,
|
||||
:source => source,
|
||||
})
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
# TODO(sissel): Instead of dropping the event, should we treat it as
|
||||
# plain text and try to do the best we can with it?
|
||||
@logger.warn("Trouble parsing json input", :input => raw,
|
||||
:source => source, :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
return nil
|
||||
end
|
||||
|
||||
|
@ -98,11 +98,11 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
begin
|
||||
event = LogStash::Event.from_json(raw)
|
||||
rescue => e
|
||||
@logger.warn({:message => "Trouble parsing json_event input",
|
||||
:input => raw,
|
||||
:source => source,
|
||||
})
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
# TODO(sissel): Instead of dropping the event, should we treat it as
|
||||
# plain text and try to do the best we can with it?
|
||||
@logger.warn("Trouble parsing json input", :input => raw,
|
||||
:source => source, :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
return nil
|
||||
end
|
||||
else
|
||||
|
|
|
@ -27,15 +27,15 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
@logger.info(["Registering Exec Input", {:type => @type,
|
||||
:command => @command, :interval => @interval}])
|
||||
@logger.info("Registering Exec Input", :type => @type,
|
||||
:command => @command, :interval => @interval)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def run(queue)
|
||||
loop do
|
||||
start = Time.now
|
||||
@logger.info(["Running exec", { :command => @command }]) if @debug
|
||||
@logger.info("Running exec", :command => @command) if @debug
|
||||
out = IO.popen(@command)
|
||||
# out.read will block until the process finishes.
|
||||
e = to_event(out.read, "exec://#{Socket.gethostname}/")
|
||||
|
@ -44,17 +44,17 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base
|
|||
|
||||
duration = Time.now - start
|
||||
if @debug
|
||||
@logger.info(["Command completed",
|
||||
{ :command => @command, :duration => duration } ])
|
||||
@logger.info("Command completed", :command => @command,
|
||||
:duration => duration)
|
||||
end
|
||||
|
||||
# Sleep for the remainder of the interval, or 0 if the duration ran
|
||||
# longer than the interval.
|
||||
sleeptime = [0, @interval - duration].max
|
||||
if sleeptime == 0
|
||||
@logger.warn(["Execution ran longer than the interval. Skipping sleep...",
|
||||
{ :command => @command, :duration => duration,
|
||||
:interval => @interval }])
|
||||
@logger.warn("Execution ran longer than the interval. Skipping sleep.",
|
||||
:command => @command, :duration => duration,
|
||||
:interval => @interval)
|
||||
else
|
||||
sleep(sleeptime)
|
||||
end
|
||||
|
|
|
@ -47,7 +47,7 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
|
|||
def register
|
||||
require "filewatch/tail"
|
||||
LogStash::Util::set_thread_name("input|file|#{path.join(":")}")
|
||||
@logger.info("Registering file input for #{path.join(":")}")
|
||||
@logger.info("Registering file input", :path => @path)
|
||||
end # def register
|
||||
|
||||
public
|
||||
|
@ -67,7 +67,7 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
|
|||
|
||||
tail.subscribe do |path, line|
|
||||
source = "file://#{hostname}/#{path}"
|
||||
@logger.debug({:path => path, :line => line})
|
||||
@logger.debug("Received line", :path => path, :line => line)
|
||||
e = to_event(line, source)
|
||||
if e
|
||||
queue << e
|
||||
|
|
|
@ -56,8 +56,7 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
begin
|
||||
udp_listener(output_queue)
|
||||
rescue => e
|
||||
@logger.warn("gelf listener died: #{$!}")
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("gelf listener died", :exception => e, :backtrace => e.backtrace)
|
||||
sleep(5)
|
||||
retry
|
||||
end # begin
|
||||
|
@ -66,7 +65,7 @@ class LogStash::Inputs::Gelf < LogStash::Inputs::Base
|
|||
|
||||
private
|
||||
def udp_listener(output_queue)
|
||||
@logger.info("Starting gelf listener on #{@host}:#{@port}")
|
||||
@logger.info("Starting gelf listener", :address => "#{@host}:#{@port}")
|
||||
|
||||
if @udp
|
||||
@udp.close_read
|
||||
|
|
|
@ -1,72 +0,0 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require 'pp'
|
||||
|
||||
class LogStash::Inputs::Onstomp < LogStash::Inputs::Base
|
||||
config_name "onstomp"
|
||||
|
||||
# The address of the STOMP server.
|
||||
config :host, :validate => :string, :default => "localhost", :required => true
|
||||
|
||||
# The port to connet to on your STOMP server.
|
||||
config :port, :validate => :number, :default => 61613
|
||||
|
||||
# The username to authenticate with.
|
||||
config :user, :validate => :string, :default => ""
|
||||
|
||||
# The password to authenticate with.
|
||||
config :password, :validate => :password, :default => ""
|
||||
|
||||
# The destination to read events from.
|
||||
#
|
||||
# Example: "/topic/logstash"
|
||||
config :destination, :validate => :string, :required => true
|
||||
|
||||
# Enable debugging output?
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
private
|
||||
def connect
|
||||
begin
|
||||
@client.connect
|
||||
@logger.info("Connected to stomp server") if @client.connected?
|
||||
rescue => e
|
||||
@logger.info("Failed to connect to stomp server: #{e}")
|
||||
sleep 2
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
require "onstomp"
|
||||
@client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
|
||||
@stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
|
||||
|
||||
# Handle disconnects
|
||||
@client.on_connection_closed {
|
||||
connect
|
||||
subscription_handler # is required for re-subscribing to the destination
|
||||
}
|
||||
connect
|
||||
end # def register
|
||||
|
||||
private
|
||||
def subscription_handler
|
||||
@client.subscribe(@destination) do |msg|
|
||||
e = to_event(msg.body, @stomp_url)
|
||||
@output_queue << e if e
|
||||
end
|
||||
|
||||
while @client.connected?
|
||||
# stay subscribed
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
@output_queue = output_queue
|
||||
subscription_handler
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Onstomp
|
||||
|
|
@ -73,7 +73,7 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
end
|
||||
# end TODO
|
||||
|
||||
@logger.info "Registering redis #{identity}"
|
||||
@logger.info("Registering redis", :identity => identity)
|
||||
end # def register
|
||||
|
||||
# A string used to identify a redis instance in log messages
|
||||
|
@ -101,8 +101,8 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
event = to_event msg, identity
|
||||
output_queue << event if event
|
||||
rescue => e # parse or event creation error
|
||||
@logger.error(["Failed to create event with '#{msg}'", e])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.error("Failed to create event", :message => msg, exception => e,
|
||||
:backtrace => e.backtrace);
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -115,16 +115,16 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
private
|
||||
def channel_listener(redis, output_queue)
|
||||
redis.subscribe @key do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.info "Subscribed to #{ch} (#{count})"
|
||||
on.subscribe do |channel, count|
|
||||
@logger.info("Subscribed", :channel => channel, :count => count)
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
on.message do |channel, message|
|
||||
queue_event message, output_queue
|
||||
end
|
||||
|
||||
on.unsubscribe do |ch, count|
|
||||
@logger.info "Unsubscribed from #{ch} (#{count})"
|
||||
on.unsubscribe do |channel, count|
|
||||
@logger.info("Unsubscribed", :channel => channel, :count => count)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -132,16 +132,16 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
private
|
||||
def pattern_channel_listener(redis, output_queue)
|
||||
redis.psubscribe @key do |on|
|
||||
on.psubscribe do |ch, count|
|
||||
@logger.info "Subscribed to #{ch} (#{count})"
|
||||
on.psubscribe do |channel, count|
|
||||
@logger.info("Subscribed", :channel => channel, :count => count)
|
||||
end
|
||||
|
||||
on.pmessage do |ch, event, message|
|
||||
queue_event message, output_queue
|
||||
end
|
||||
|
||||
on.punsubscribe do |ch, count|
|
||||
@logger.info "Unsubscribed from #{ch} (#{count})"
|
||||
on.punsubscribe do |channel, count|
|
||||
@logger.info("Unsubscribed", :channel => channel, :count => count)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -155,7 +155,8 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
@redis ||= connect
|
||||
self.send listener, @redis, output_queue
|
||||
rescue => e # redis error
|
||||
@logger.warn(["Failed to get event from redis #{@name}. ", e])
|
||||
@logger.warn("Failed to get event from redis", :name => @name,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
raise e
|
||||
end
|
||||
end # loop
|
||||
|
|
|
@ -1,17 +1,12 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require 'pp'
|
||||
|
||||
# TODO(sissel): This class doesn't work yet in JRuby.
|
||||
# http://jira.codehaus.org/browse/JRUBY-4941
|
||||
|
||||
# Stream events from a STOMP broker.
|
||||
#
|
||||
# http://stomp.codehaus.org/
|
||||
class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
||||
config_name "stomp"
|
||||
class LogStash::Inputs::Onstomp < LogStash::Inputs::Base
|
||||
config_name "onstomp"
|
||||
|
||||
# The address of the STOMP server.
|
||||
config :host, :validate => :string, :default => "localhost"
|
||||
config :host, :validate => :string, :default => "localhost", :required => true
|
||||
|
||||
# The port to connet to on your STOMP server.
|
||||
config :port, :validate => :number, :default => 61613
|
||||
|
@ -30,32 +25,48 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
|||
# Enable debugging output?
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@format ||= "json_event"
|
||||
raise "Stomp input currently not supported. See " +
|
||||
"http://jira.codehaus.org/browse/JRUBY-4941 and " +
|
||||
"https://logstash.jira.com/browse/LOGSTASH-8"
|
||||
private
|
||||
def connect
|
||||
begin
|
||||
@client.connect
|
||||
@logger.info("Connected to stomp server") if @client.connected?
|
||||
rescue => e
|
||||
@logger.info("Failed to connect to stomp server: #{e}")
|
||||
sleep 2
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
require "stomp"
|
||||
|
||||
@client = Stomp::Client.new(@user, @password.value, @host, @port)
|
||||
require "onstomp"
|
||||
@client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
|
||||
@stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
|
||||
|
||||
# Handle disconnects
|
||||
@client.on_connection_closed {
|
||||
connect
|
||||
subscription_handler # is required for re-subscribing to the destination
|
||||
}
|
||||
connect
|
||||
end # def register
|
||||
|
||||
def run(output_queue)
|
||||
private
|
||||
def subscription_handler
|
||||
@client.subscribe(@destination) do |msg|
|
||||
e = to_event(message.body, @stomp_url)
|
||||
if e
|
||||
output_queue << e
|
||||
end
|
||||
e = to_event(msg.body, @stomp_url)
|
||||
@output_queue << e if e
|
||||
end
|
||||
|
||||
raise "disconnected from stomp server"
|
||||
while @client.connected?
|
||||
# stay subscribed
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
@output_queue = output_queue
|
||||
subscription_handler
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Stomp
|
||||
end # class LogStash::Inputs::Onstomp
|
||||
|
||||
|
|
|
@ -61,8 +61,9 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
udp_listener(output_queue)
|
||||
rescue => e
|
||||
break if @shutdown_requested
|
||||
@logger.warn("syslog udp listener died: #{$!}")
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("syslog udp listener died",
|
||||
:address => "#{@host}:#{@port}", :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
sleep(5)
|
||||
retry
|
||||
end # begin
|
||||
|
@ -75,8 +76,9 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
tcp_listener(output_queue)
|
||||
rescue => e
|
||||
break if @shutdown_requested
|
||||
@logger.warn("syslog tcp listener died: #{$!}")
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("syslog tcp listener died",
|
||||
:address => "#{@host}:#{@port}", :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
sleep(5)
|
||||
retry
|
||||
end # begin
|
||||
|
@ -85,7 +87,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
private
|
||||
def udp_listener(output_queue)
|
||||
@logger.info("Starting syslog udp listener on #{@host}:#{@port}")
|
||||
@logger.info("Starting syslog udp listener", :address => "#{@host}:#{@port}")
|
||||
|
||||
if @udp
|
||||
@udp.close_read
|
||||
|
@ -111,7 +113,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
private
|
||||
def tcp_listener(output_queue)
|
||||
@logger.info("Starting syslog tcp listener on #{@host}:#{@port}")
|
||||
@logger.info("Starting syslog tcp listener", :address => "#{@host}:#{@port}")
|
||||
@tcp = TCPServer.new(@host, @port)
|
||||
@tcp_clients = []
|
||||
|
||||
|
@ -120,7 +122,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
@tcp_clients << client
|
||||
Thread.new(client) do |client|
|
||||
ip, port = client.peeraddr[3], client.peeraddr[1]
|
||||
@logger.warn("got connection from #{ip}:#{port}")
|
||||
@logger.info("new connection", :client => "#{ip}:#{port}")
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
|
||||
if ip.include?(":") # ipv6
|
||||
source = "syslog://[#{ip}]/"
|
||||
|
@ -192,7 +194,7 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
@date_filter.filter(event)
|
||||
else
|
||||
@logger.info(["NOT SYSLOG", event.message])
|
||||
@logger.info("NOT SYSLOG", :message => event.message)
|
||||
url = "syslog://#{Socket.gethostname}/" if url == "syslog://127.0.0.1/"
|
||||
|
||||
# RFC3164 says unknown messages get pri=13
|
||||
|
|
|
@ -41,7 +41,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
public
|
||||
def register
|
||||
if server?
|
||||
@logger.info("Starting tcp input listener on #{@host}:#{@port}")
|
||||
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
end
|
||||
end # def register
|
||||
|
@ -67,10 +67,11 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
end
|
||||
end # loop do
|
||||
rescue => e
|
||||
@logger.debug(["Closing connection with #{socket.peer}", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.debug("Closing connection", :client => socket.peer,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
rescue Timeout::Error
|
||||
@logger.debug("Closing connection with #{socket.peer} after read timeout")
|
||||
@logger.debug("Closing connection after read timeout",
|
||||
:client => socket.peer)
|
||||
end # begin
|
||||
|
||||
begin
|
||||
|
@ -95,7 +96,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
|
||||
# monkeypatch a 'peer' method onto the socket.
|
||||
s.instance_eval { class << self; include SocketPeer end }
|
||||
@logger.debug("Accepted connection from #{s.peer} on #{@host}:#{@port}")
|
||||
@logger.debug("Accepted connection", :client => s.peer,
|
||||
:server => "#{@host}:#{@port}")
|
||||
handle_socket(s, output_queue, "tcp://#{@host}:#{@port}/client/#{s.peer}")
|
||||
end # Thread.start
|
||||
end # loop
|
||||
|
@ -103,7 +105,7 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
loop do
|
||||
client_socket = TCPSocket.new(@host, @port)
|
||||
client_socket.instance_eval { class << self; include SocketPeer end }
|
||||
@logger.debug("Opened connection to #{client_socket.peer}")
|
||||
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
|
||||
handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server")
|
||||
end # loop
|
||||
end
|
||||
|
|
|
@ -41,9 +41,9 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
|
|||
#stream = TweetStream::Client.new(@user, @password.value)
|
||||
#stream.track(*@keywords) do |status|
|
||||
track(*@keywords) do |status|
|
||||
@logger.debug :status => status
|
||||
@logger.debug("twitter keyword track status", :status => status)
|
||||
#@logger.debug("Got twitter status from @#{status[:user][:screen_name]}")
|
||||
@logger.info("Got twitter status from @#{status["user"]["screen_name"]}")
|
||||
@logger.info("Got twitter status", :user => status["user"]["screen_name"])
|
||||
e = to_event(status["text"], "http://twitter.com/#{status["user"]["screen_name"]}/status/#{status["id"]}")
|
||||
next unless e
|
||||
|
||||
|
@ -92,13 +92,13 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
|
|||
response.read_body do |chunk|
|
||||
#@logger.info("Twitter: #{chunk.inspect}")
|
||||
buffer.extract(chunk).each do |line|
|
||||
@logger.info("Twitter line: #{line.inspect}")
|
||||
@logger.info("Twitter line", :line => line)
|
||||
begin
|
||||
status = JSON.parse(line)
|
||||
yield status
|
||||
rescue => e
|
||||
@logger.error e
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.error("Error parsing json from twitter", :exception => e,
|
||||
:backtrace => e.backtrace);
|
||||
end
|
||||
end # buffer.extract
|
||||
end # response.read_body
|
||||
|
|
|
@ -1,91 +1,19 @@
|
|||
require "logstash/namespace"
|
||||
require "cabin"
|
||||
require "logger"
|
||||
|
||||
class LogStash::Logger < Logger
|
||||
# Try to load awesome_print, if it fails, log it later
|
||||
# but otherwise we will continue to operate as normal.
|
||||
begin
|
||||
require "ap"
|
||||
@@have_awesome_print = true
|
||||
rescue LoadError => e
|
||||
@@have_awesome_print = false
|
||||
@@notify_awesome_print_load_failed = e
|
||||
end
|
||||
|
||||
class LogStash::Logger < Cabin::Channel
|
||||
public
|
||||
def initialize(*args)
|
||||
super(*args)
|
||||
@formatter = LogStash::Logger::Formatter.new
|
||||
super()
|
||||
|
||||
# Set default loglevel to WARN unless $DEBUG is set (run with 'ruby -d')
|
||||
self.level = $DEBUG ? Logger::DEBUG: Logger::INFO
|
||||
@level = $DEBUG ? :debug : :info
|
||||
if ENV["LOGSTASH_DEBUG"]
|
||||
self.level = Logger::DEBUG
|
||||
self.level = :debug
|
||||
end
|
||||
|
||||
@formatter.progname = self.progname = File.basename($0)
|
||||
|
||||
# Conditional support for awesome_print
|
||||
if !@@have_awesome_print && @@notify_awesome_print_load_failed
|
||||
debug [ "awesome_print not found, falling back to Object#inspect." \
|
||||
"If you want prettier log output, run 'gem install "\
|
||||
"awesome_print'",
|
||||
{ :exception => @@notify_awesome_print_load_failed }]
|
||||
|
||||
# Only show this once.
|
||||
@@notify_awesome_print_load_failed = nil
|
||||
end
|
||||
#self[:program] = File.basename($0)
|
||||
subscribe(::Logger.new(*args))
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def level=(level)
|
||||
super(level)
|
||||
@formatter.level = level
|
||||
end # def level=
|
||||
end # class LogStash::Logger
|
||||
|
||||
# Implement a custom Logger::Formatter that uses awesome_inspect on non-strings.
|
||||
class LogStash::Logger::Formatter < Logger::Formatter
|
||||
attr_accessor :level
|
||||
attr_accessor :progname
|
||||
|
||||
public
|
||||
def call(severity, timestamp, who, object)
|
||||
# override progname to be the caller if the log level threshold is DEBUG
|
||||
# We only do this if the logger level is DEBUG because inspecting the
|
||||
# stack and doing extra string manipulation can have performance impacts
|
||||
# under high logging rates.
|
||||
if @level == Logger::DEBUG
|
||||
# callstack inspection, include our caller
|
||||
# turn this: "/usr/lib/ruby/1.8/irb/workspace.rb:52:in `irb_binding'"
|
||||
# into this: ["/usr/lib/ruby/1.8/irb/workspace.rb", "52", "irb_binding"]
|
||||
#
|
||||
# caller[3] is actually who invoked the Logger#<type>
|
||||
# This only works if you use the severity methods
|
||||
path, line, method = caller[3].split(/(?::in `|:|')/)
|
||||
# Trim RUBYLIB path from 'file' if we can
|
||||
#whence = $:.select { |p| path.start_with?(p) }[0]
|
||||
whence = $:.detect { |p| path.start_with?(p) }
|
||||
if !whence
|
||||
# We get here if the path is not in $:
|
||||
file = path
|
||||
else
|
||||
file = path[whence.length + 1..-1]
|
||||
end
|
||||
who = "#{file}:#{line}##{method}"
|
||||
end
|
||||
|
||||
# Log like normal if we got a string.
|
||||
if object.is_a?(String)
|
||||
super(severity, timestamp, who, object)
|
||||
else
|
||||
# If we logged an object, use .awesome_inspect (or just .inspect)
|
||||
# to stringify it for higher sanity logging.
|
||||
if object.respond_to?(:awesome_inspect)
|
||||
super(severity, timestamp, who, object.awesome_inspect)
|
||||
else
|
||||
super(severity, timestamp, who, object.inspect)
|
||||
end
|
||||
end
|
||||
end # def call
|
||||
end # class LogStash::Logger::Formatter
|
||||
|
|
|
@ -29,15 +29,22 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
# The name of the exchange
|
||||
config :name, :validate => :string, :required => true
|
||||
|
||||
# Key to route to
|
||||
# Key to route to by default. Defaults to queue name
|
||||
config :key, :validate => :string
|
||||
|
||||
# The name of the queue to bind to the default key. Defaults to exchange name
|
||||
config :queue_name, :validate => :string
|
||||
|
||||
# The vhost to use
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# Is this exchange durable? (aka; Should it survive a broker restart?)
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# Is this queue durable? (aka; Should it survive a broker restart?).
|
||||
# If you omit this setting, the 'durable' property will be used as default.
|
||||
config :queue_durable, :validate => :boolean
|
||||
|
||||
# Should messages persist to disk on the AMQP broker until they are read by a
|
||||
# consumer?
|
||||
config :persistent, :validate => :boolean, :default => true
|
||||
|
@ -58,7 +65,11 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
raise "Invalid exchange_type, #{@exchange_type.inspect}, must be one of #{MQTYPES.join(", ")}"
|
||||
end
|
||||
|
||||
@logger.info("Registering output #{to_s}")
|
||||
@queue_name ||= @name
|
||||
@queue_durable ||= @durable
|
||||
@key ||= @queue_name
|
||||
|
||||
@logger.info("Registering output", :plugin => self)
|
||||
connect
|
||||
end # def register
|
||||
|
||||
|
@ -76,34 +87,53 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
amqpsettings[:verify_ssl] = @verify_ssl if @verify_ssl
|
||||
|
||||
begin
|
||||
@logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
|
||||
@logger.debug("Connecting to AMQP", :settings => amqpsettings,
|
||||
:exchange_type => @exchange_type, :name => @name)
|
||||
@bunny = Bunny.new(amqpsettings)
|
||||
@bunny.start
|
||||
rescue => e
|
||||
if terminating?
|
||||
return
|
||||
else
|
||||
@logger.error("AMQP connection error (during connect), will reconnect: #{e}")
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.error("AMQP connection error (during connect), will reconnect",
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
sleep(1)
|
||||
retry
|
||||
end
|
||||
end
|
||||
@target = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
|
||||
|
||||
@logger.debug("Declaring queue", :queue_name => @queue_name,
|
||||
:durable => @queue_durable)
|
||||
queue = @bunny.queue(@queue_name, :durable => @queue_durable)
|
||||
|
||||
@logger.debug("Declaring exchange", :name => @name, :type => @exchange_type,
|
||||
:durable => @durable)
|
||||
@exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
|
||||
|
||||
@logger.debug("Binding exchange", :name => @name, :key => @key)
|
||||
queue.bind(@exchange, :key => @key)
|
||||
end # def connect
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
@logger.debug("Sending event", :destination => to_s, :event => event,
|
||||
:key => key)
|
||||
key = event.sprintf(@key) if @key
|
||||
@logger.debug(["Sending event", { :destination => to_s, :event => event, :key => key }])
|
||||
begin
|
||||
if @target
|
||||
begin
|
||||
@target.publish(event.to_json, :persistent => @persistent, :key => key)
|
||||
rescue JSON::GeneratorError
|
||||
@logger.warn(["Trouble converting event to JSON", $!, event.to_hash])
|
||||
return
|
||||
end
|
||||
receive_raw(event.to_json, key)
|
||||
rescue JSON::GeneratorError => e
|
||||
@logger.warn("Trouble converting event to JSON", :exception => e,
|
||||
:event => event)
|
||||
return
|
||||
end
|
||||
end # def receive
|
||||
|
||||
public
|
||||
def receive_raw(message, key=@key)
|
||||
begin
|
||||
if @exchange
|
||||
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key }])
|
||||
@exchange.publish(message, :persistent => @persistent, :key => key, :mandatory => true)
|
||||
else
|
||||
@logger.warn("Tried to send message, but not connected to amqp yet.")
|
||||
end
|
||||
|
@ -112,24 +142,18 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
connect
|
||||
retry
|
||||
end
|
||||
end # def receive
|
||||
|
||||
# This is used by the ElasticSearch AMQP/River output.
|
||||
public
|
||||
def receive_raw(raw)
|
||||
@target.publish(raw)
|
||||
end # def receive_raw
|
||||
end
|
||||
|
||||
public
|
||||
def to_s
|
||||
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
|
||||
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}\##{@queue_name}"
|
||||
end
|
||||
|
||||
public
|
||||
def teardown
|
||||
@bunny.close rescue nil
|
||||
@bunny = nil
|
||||
@target = nil
|
||||
@exchange = nil
|
||||
finished
|
||||
end # def teardown
|
||||
end # class LogStash::Outputs::Amqp
|
||||
|
|
|
@ -52,7 +52,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
# default.
|
||||
config :embedded_http_port, :validate => :string, :default => "9200-9300"
|
||||
|
||||
# TODO(sissel): Config for river?
|
||||
# Configure the maximum number of in-flight requests to ElasticSearch.
|
||||
#
|
||||
# Note: This setting may be removed in the future.
|
||||
config :max_inflight_requests, :validate => :number, :default => 50
|
||||
|
||||
public
|
||||
def register
|
||||
|
@ -77,13 +80,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
start_local_elasticsearch
|
||||
end
|
||||
|
||||
gem "jruby-elasticsearch", ">= 0.0.3"
|
||||
require "jruby-elasticsearch"
|
||||
|
||||
@logger.info(:message => "New ElasticSearch output", :cluster => @cluster,
|
||||
@logger.info("New ElasticSearch output", :cluster => @cluster,
|
||||
:host => @host, :port => @port, :embedded => @embedded)
|
||||
@pending = []
|
||||
@callback = self.method(:receive_native)
|
||||
options = {
|
||||
:cluster => @cluster,
|
||||
:host => @host,
|
||||
|
@ -99,6 +100,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
end
|
||||
|
||||
@client = ElasticSearch::Client.new(options)
|
||||
@inflight_requests = 0
|
||||
@inflight_mutex = Mutex.new
|
||||
@inflight_cv = ConditionVariable.new
|
||||
|
||||
# TODO(sissel): Set up the bulkstream.
|
||||
end # def register
|
||||
|
||||
protected
|
||||
|
@ -112,156 +118,63 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
@embedded_elasticsearch.start
|
||||
end # def start_local_elasticsearch
|
||||
|
||||
# TODO(sissel): Needs migration to jrubyland
|
||||
public
|
||||
def ready(params)
|
||||
case params["method"]
|
||||
when "http"
|
||||
@logger.debug "ElasticSearch using http with URL #{@url.to_s}"
|
||||
#@http = EventMachine::HttpRequest.new(@url.to_s)
|
||||
@callback = self.method(:receive_http)
|
||||
when "river"
|
||||
require "logstash/outputs/amqp"
|
||||
params["port"] ||= 5672
|
||||
auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}"
|
||||
mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1")
|
||||
@mq = LogStash::Outputs::Amqp.new(mq_url.to_s)
|
||||
@mq.register
|
||||
@callback = self.method(:receive_river)
|
||||
em_url = URI.parse("http://#{@url.host}:#{@url.port}/_river/logstash#{@url.path.tr("/", "_")}/_meta")
|
||||
unused, @es_index, @es_type = @url.path.split("/", 3)
|
||||
|
||||
river_config = {"type" => params["type"],
|
||||
params["type"] => {"host" => params["host"],
|
||||
"user" => params["user"],
|
||||
"port" => params["port"],
|
||||
"pass" => params["pass"],
|
||||
"vhost" => params["vhost"],
|
||||
"queue" => params["queue"],
|
||||
"exchange" => params["queue"],
|
||||
},
|
||||
"index" => {"bulk_size" => 100,
|
||||
"bulk_timeout" => "10ms",
|
||||
},
|
||||
}
|
||||
@logger.debug(["ElasticSearch using river", river_config])
|
||||
#http_setup = EventMachine::HttpRequest.new(em_url.to_s)
|
||||
req = http_setup.put :body => river_config.to_json
|
||||
req.errback do
|
||||
@logger.warn "Error setting up river: #{req.response}"
|
||||
end
|
||||
@callback = self.method(:receive_river)
|
||||
else raise "unknown elasticsearch method #{params["method"].inspect}"
|
||||
end
|
||||
|
||||
#receive(LogStash::Event.new({
|
||||
#"@source" => "@logstashinit",
|
||||
#"@type" => "@none",
|
||||
#"@message" => "Starting logstash output to elasticsearch",
|
||||
#"@fields" => {
|
||||
#"HOSTNAME" => Socket.gethostname
|
||||
#},
|
||||
#}))
|
||||
|
||||
pending = @pending
|
||||
@pending = []
|
||||
@logger.info("Flushing #{pending.size} events")
|
||||
pending.each do |event|
|
||||
receive(event)
|
||||
end
|
||||
end # def ready
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
if @callback
|
||||
@callback.call(event)
|
||||
else
|
||||
@pending << event
|
||||
end
|
||||
end # def receive
|
||||
|
||||
public
|
||||
def receive_http(event, tries=5)
|
||||
req = @http.post :body => event.to_json
|
||||
req.errback do
|
||||
@logger.warn("Request to index to #{@url.to_s} failed (will retry, #{tries} tries left). Event was #{event.to_s}")
|
||||
EventMachine::add_timer(2) do
|
||||
# TODO(sissel): Actually abort if we retry too many times.
|
||||
receive_http(event, tries - 1)
|
||||
end
|
||||
end
|
||||
end # def receive_http
|
||||
|
||||
public
|
||||
def receive_native(event)
|
||||
index = event.sprintf(@index)
|
||||
type = event.sprintf(@type)
|
||||
# TODO(sissel): allow specifying the ID?
|
||||
# The document ID is how elasticsearch determines sharding hash, so it can
|
||||
# help performance if we allow folks to specify a specific ID.
|
||||
req = @client.index(index, type, event.to_hash)
|
||||
req.on(:success) do |response|
|
||||
@logger.debug(["Successfully indexed", event.to_hash])
|
||||
end.on(:failure) do |exception|
|
||||
@logger.debug(["Failed to index an event", exception, event.to_hash])
|
||||
end
|
||||
req.execute
|
||||
end # def receive_native
|
||||
# TODO(sissel): Use the bulk index api, but to do this I need to figure out
|
||||
# how to handle indexing errors especially related to part of the full bulk
|
||||
# request. In the mean-time, keep track of the number of outstanding requests
|
||||
# and block if we reach that maximum.
|
||||
|
||||
public
|
||||
def receive_river(event)
|
||||
# bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/
|
||||
index_message = {"index" => {"_index" => @es_index, "_type" => @es_type}}.to_json + "\n"
|
||||
#index_message += {@es_type => event.to_hash}.to_json + "\n"
|
||||
index_message += event.to_hash.to_json + "\n"
|
||||
@mq.receive_raw(index_message)
|
||||
end # def receive_river
|
||||
# If current in-flight requests exceeds max_inflight_requests, block until
|
||||
# it doesn't.
|
||||
@inflight_mutex.synchronize do
|
||||
# Keep blocking until it's safe to send new requests.
|
||||
while @inflight_requests >= @max_inflight_requests
|
||||
@logger.info("Too many active ES requests, blocking now.",
|
||||
:inflight_requests => @inflight_requests,
|
||||
:max_inflight_requests => @max_inflight_requests);
|
||||
@inflight_cv.wait(@inflight_mutex)
|
||||
end
|
||||
end
|
||||
|
||||
req = @client.index(index, type, event.to_hash)
|
||||
increment_inflight_request_count
|
||||
#timer = @logger.time("elasticsearch write")
|
||||
req.on(:success) do |response|
|
||||
@logger.debug("Successfully indexed", :event => event.to_hash)
|
||||
#timer.stop
|
||||
decrement_inflight_request_count
|
||||
end.on(:failure) do |exception|
|
||||
@logger.debug("Failed to index an event", :exception => exception,
|
||||
:event => event.to_hash)
|
||||
#timer.stop
|
||||
decrement_inflight_request_count
|
||||
end
|
||||
|
||||
# Execute this request asynchronously.
|
||||
req.execute
|
||||
end # def receive
|
||||
|
||||
# Ruby doesn't appear to have a semaphor implementation, so this is a
|
||||
# hack until I write one.
|
||||
private
|
||||
def increment_inflight_request_count
|
||||
@inflight_mutex.synchronize do
|
||||
@inflight_requests += 1
|
||||
@logger.info("ElasticSearch in-flight requests", :count => @inflight_requests)
|
||||
end
|
||||
end # def increment_inflight_request_count
|
||||
|
||||
private
|
||||
def old_create_index
|
||||
# TODO(sissel): this is leftover from the old eventmachine days
|
||||
# make sure we don't need it, or, convert it.
|
||||
|
||||
# Describe this index to elasticsearch
|
||||
indexmap = {
|
||||
# The name of the index
|
||||
"settings" => {
|
||||
@url.path.split("/")[-1] => {
|
||||
"mappings" => {
|
||||
"@source" => { "type" => "string" },
|
||||
"@source_host" => { "type" => "string" },
|
||||
"@source_path" => { "type" => "string" },
|
||||
"@timestamp" => { "type" => "date" },
|
||||
"@tags" => { "type" => "string" },
|
||||
"@message" => { "type" => "string" },
|
||||
|
||||
# TODO(sissel): Hack for now until this bug is resolved:
|
||||
# https://github.com/elasticsearch/elasticsearch/issues/issue/604
|
||||
"@fields" => {
|
||||
"type" => "object",
|
||||
"properties" => {
|
||||
"HOSTNAME" => { "type" => "string" },
|
||||
},
|
||||
}, # "@fields"
|
||||
}, # "properties"
|
||||
}, # index map for this index type.
|
||||
}, # "settings"
|
||||
} # ES Index
|
||||
|
||||
#indexurl = @esurl.to_s
|
||||
#indexmap_http = EventMachine::HttpRequest.new(indexurl)
|
||||
#indexmap_req = indexmap_http.put :body => indexmap.to_json
|
||||
#indexmap_req.callback do
|
||||
#@logger.info(["Done configuring index", indexurl, indexmap])
|
||||
#ready(params)
|
||||
#end
|
||||
#indexmap_req.errback do
|
||||
#@logger.warn(["Failure configuring index (http failed to connect?)",
|
||||
#@esurl.to_s, indexmap])
|
||||
#@logger.warn([indexmap_req])
|
||||
##sleep 30
|
||||
#raise "Failure configuring index: #{@esurl.to_s}"
|
||||
#
|
||||
#end
|
||||
end # def old_create_index
|
||||
def decrement_inflight_request_count
|
||||
@inflight_mutex.synchronize do
|
||||
@inflight_requests -= 1
|
||||
@inflight_cv.signal
|
||||
end
|
||||
end # def decrement_inflight_request_count
|
||||
end # class LogStash::Outputs::Elasticsearch
|
||||
|
|
153
lib/logstash/outputs/elasticsearch_river.rb
Normal file
153
lib/logstash/outputs/elasticsearch_river.rb
Normal file
|
@ -0,0 +1,153 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/outputs/base"
|
||||
require "json"
|
||||
require "uri"
|
||||
require "net/http"
|
||||
|
||||
# This output lets you store logs in elasticsearch. It's similar to the
|
||||
# 'elasticsearch' output but improves performance by using an AMQP server,
|
||||
# such as rabbitmq, to send data to elasticsearch.
|
||||
#
|
||||
# Upon startup, this output will automatically contact an elasticsearch cluster
|
||||
# and configure it to read from the queue to which we write.
|
||||
#
|
||||
# You can learn more about elasticseasrch at <http://elasticsearch.org>
|
||||
|
||||
class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
|
||||
|
||||
config_name "elasticsearch_river"
|
||||
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
# The index to write events to. This can be dynamic using the %{foo} syntax.
|
||||
# The default value will partition your indeces by day so you can more easily
|
||||
# delete old data or only search specific date ranges.
|
||||
config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}"
|
||||
|
||||
# The type to write events to. Generally you should try to write only similar
|
||||
# events to the same 'type'. String expansion '%{foo}' works here.
|
||||
config :type, :validate => :string, :default => "%{@type}"
|
||||
|
||||
# The name/address of an ElasticSearch host to use for river creation
|
||||
config :es_host, :validate => :string, :required => true
|
||||
|
||||
# ElasticSearch API port
|
||||
config :es_port, :validate => :number, :default => 9200
|
||||
|
||||
# ElasticSearch river configuration: bulk fetch size
|
||||
config :es_bulk_size, :validate => :number, :default => 1000
|
||||
|
||||
# ElasticSearch river configuration: bulk timeout in milliseconds
|
||||
config :es_bulk_timeout_ms, :validate => :number, :default => 100
|
||||
|
||||
# Hostname of AMQP server
|
||||
config :amqp_host, :validate => :string, :required => true
|
||||
|
||||
# Port of AMQP server
|
||||
config :amqp_port, :validate => :number, :default => 5672
|
||||
|
||||
# AMQP user
|
||||
config :user, :validate => :string, :default => "guest"
|
||||
|
||||
# AMQP password
|
||||
config :password, :validate => :string, :default => "guest"
|
||||
|
||||
# AMQP vhost
|
||||
config :vhost, :validate => :string, :default => "/"
|
||||
|
||||
# AMQWP queue name
|
||||
config :queue, :validate => :string, :default => "elasticsearch"
|
||||
|
||||
# AMQP exchange name
|
||||
config :exchange, :validate => :string, :default => "elasticsearch"
|
||||
|
||||
# AMQP routing key
|
||||
config :key, :validate => :string, :default => "elasticsearch"
|
||||
|
||||
# AMQP durability setting. Also used for ElasticSearch setting
|
||||
config :durable, :validate => :boolean, :default => true
|
||||
|
||||
# AMQP persistence setting
|
||||
config :persistent, :validate => :boolean, :default => true
|
||||
|
||||
public
|
||||
def register
|
||||
# TODO(sissel): find a better way of declaring where the elasticsearch
|
||||
# libraries are
|
||||
# TODO(sissel): can skip this step if we're running from a jar.
|
||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/**/*.jar")
|
||||
Dir[jarpath].each do |jar|
|
||||
require jar
|
||||
end
|
||||
prepare_river
|
||||
end
|
||||
|
||||
protected
|
||||
def prepare_river
|
||||
require "logstash/outputs/amqp"
|
||||
|
||||
# Configure the message plugin
|
||||
params = {
|
||||
"host" => [@amqp_host],
|
||||
"port" => [@amqp_port],
|
||||
"user" => [@user],
|
||||
"password" => [@password],
|
||||
"exchange_type" => ["direct"],
|
||||
"queue_name" => [@queue],
|
||||
"name" => [@exchange],
|
||||
"key" => [@key],
|
||||
"vhost" => [@vhost],
|
||||
"durable" => [@durable.to_s],
|
||||
"persistent" => [@persistent.to_s],
|
||||
"debug" => [@debug.to_s],
|
||||
}.reject {|k,v| v.nil?}
|
||||
@mq = LogStash::Outputs::Amqp.new(params)
|
||||
@mq.register
|
||||
|
||||
# Set up the river
|
||||
begin
|
||||
auth = "#{@user}:#{@password}"
|
||||
|
||||
# Name the river by our hostname
|
||||
require "socket"
|
||||
hostname = Socket.gethostname
|
||||
api_path = "/_river/logstash-#{hostname.gsub('.','_')}/_meta"
|
||||
|
||||
river_config = {"type" => "rabbitmq",
|
||||
"rabbitmq" => {
|
||||
"host" => @amqp_host=="localhost" ? hostname : @amqp_host,
|
||||
"port" => @amqp_port,
|
||||
"user" => @user,
|
||||
"pass" => @password,
|
||||
"vhost" => @vhost,
|
||||
"queue" => @queue,
|
||||
"exchange" => @exchange,
|
||||
"routing_key" => @key,
|
||||
"exchange_durable" => @durable,
|
||||
"queue_durable" => @durable,
|
||||
},
|
||||
"index" => {"bulk_size" => @es_bulk_size,
|
||||
"bulk_timeout" => "#{@es_bulk_timeout_ms}ms",
|
||||
},
|
||||
}
|
||||
@logger.info(["ElasticSearch using river", river_config])
|
||||
Net::HTTP.start(@es_host, @es_port) do |http|
|
||||
req = Net::HTTP::Put.new(api_path)
|
||||
req.body = river_config.to_json
|
||||
response = http.request(req)
|
||||
response.value() # raise an exception if error
|
||||
@logger.info("River created: #{response.body}")
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn "Couldn't set up river: #{e.inspect}. You'll have to set it up manually (or restart)"
|
||||
end
|
||||
end
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
index_message = {"index" => {"_index" => event.sprintf(@index), "_type" => event.sprintf(@type)}}.to_json + "\n"
|
||||
index_message += event.to_hash.to_json + "\n"
|
||||
@mq.receive_raw(index_message)
|
||||
end
|
||||
end
|
||||
|
|
@ -55,11 +55,11 @@ class LogStash::Outputs::File < LogStash::Outputs::Base
|
|||
def open(path)
|
||||
return @files[path] if @files.include?(path)
|
||||
|
||||
@logger.info(["Opening", { :path => path }])
|
||||
@logger.info("Opening file", :path => path)
|
||||
|
||||
dir = File.dirname(path)
|
||||
if !Dir.exists?(dir)
|
||||
@logger.info(["Creating directory", { :directory => dir }])
|
||||
@logger.info("Creating directory", :directory => dir)
|
||||
FileUtils.mkdir_p(dir)
|
||||
end
|
||||
|
||||
|
|
|
@ -101,7 +101,7 @@ class LogStash::Outputs::Gelf < LogStash::Outputs::Base
|
|||
m["facility"] = event.sprintf(@facility)
|
||||
m["timestamp"] = event.unix_timestamp.to_i
|
||||
|
||||
@logger.debug(["Sending GELF event", m])
|
||||
@logger.debug("Sending GELF event", :event => m)
|
||||
@gelf.notify!(m)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Gelf
|
||||
|
|
|
@ -46,8 +46,8 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
|||
begin
|
||||
@socket = TCPSocket.new(@host, @port)
|
||||
rescue Errno::ECONNREFUSED => e
|
||||
@logger.warn(["Connection refused to graphite server, sleeping...",
|
||||
{ :host => @host, :port => @port }])
|
||||
@logger.warn("Connection refused to graphite server, sleeping...",
|
||||
:host => @host, :port => @port)
|
||||
sleep(2)
|
||||
retry
|
||||
end
|
||||
|
@ -68,8 +68,8 @@ class LogStash::Outputs::Graphite < LogStash::Outputs::Base
|
|||
begin
|
||||
@socket.puts(message)
|
||||
rescue Errno::EPIPE, Errno::ECONNRESET => e
|
||||
@logger.warn(["Connection to graphite server died",
|
||||
{ :exception => e, :host => @host, :port => @port }])
|
||||
@logger.warn("Connection to graphite server died",
|
||||
:exception => e, :host => @host, :port => @port)
|
||||
sleep(2)
|
||||
connect
|
||||
end
|
||||
|
|
|
@ -44,14 +44,14 @@ class LogStash::Outputs::Loggly < LogStash::Outputs::Base
|
|||
|
||||
# Send the event over http.
|
||||
url = URI.parse("http://#{@host}/inputs/#{event.sprintf(@key)}")
|
||||
@logger.info("Loggly URL: #{url}")
|
||||
@logger.info("Loggly URL", :url => url)
|
||||
request = Net::HTTP::Post.new(url.path)
|
||||
request.body = event.to_json
|
||||
response = Net::HTTP.new(url.host, url.port).start {|http| http.request(request) }
|
||||
if response == Net::HTTPSuccess
|
||||
@logger.info("Event send to Loggly OK!")
|
||||
else
|
||||
@logger.info response.error!
|
||||
@logger.info("HTTP error", :error => response.error!)
|
||||
end
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Loggly
|
||||
|
|
|
@ -62,8 +62,8 @@ class LogStash::Outputs::Nagios < LogStash::Outputs::Base
|
|||
end
|
||||
|
||||
if !File.exists?(@commandfile)
|
||||
@logger.warn(["Skipping nagios output; command file is missing",
|
||||
{"commandfile" => @commandfile, "missed_event" => event}])
|
||||
@logger.warn("Skipping nagios output; command file is missing",
|
||||
:commandfile => @commandfile, :missed_event => event)
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -74,15 +74,15 @@ class LogStash::Outputs::Nagios < LogStash::Outputs::Base
|
|||
|
||||
host = event.fields["nagios_host"]
|
||||
if !host
|
||||
@logger.warn(["Skipping nagios output; nagios_host field is missing",
|
||||
{"missed_event" => event}])
|
||||
@logger.warn("Skipping nagios output; nagios_host field is missing",
|
||||
:missed_event => event)
|
||||
return
|
||||
end
|
||||
|
||||
service = event.fields["nagios_service"]
|
||||
if !service
|
||||
@logger.warn(["Skipping nagios output; nagios_service field is missing",
|
||||
{"missed_event" => event}])
|
||||
@logger.warn("Skipping nagios output; nagios_service field is missing",
|
||||
"missed_event" => event)
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -100,17 +100,17 @@ class LogStash::Outputs::Nagios < LogStash::Outputs::Base
|
|||
# In the multi-line case, escape the newlines for the nagios command file
|
||||
cmd += event.message.gsub("\n", "\\n")
|
||||
|
||||
@logger.debug({"commandfile" => @commandfile, "nagios_command" => cmd})
|
||||
@logger.debug("Opening nagios command file", :commandfile => @commandfile,
|
||||
:nagios_command => cmd)
|
||||
begin
|
||||
File.open(@commandfile, "r+") do |f|
|
||||
f.puts(cmd)
|
||||
f.flush # TODO(sissel): probably don't need this.
|
||||
end
|
||||
rescue => e
|
||||
@logger.warn(["Skipping nagios output; error writing to command file",
|
||||
{"error" => $!, "commandfile" => @commandfile,
|
||||
"missed_event" => event}])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("Skipping nagios output; error writing to command file",
|
||||
:commandfile => @commandfile, :missed_event => event,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
end
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Nagios
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
|
||||
class LogStash::Outputs::Onstomp < LogStash::Outputs::Base
|
||||
config_name "onstomp"
|
||||
|
||||
|
||||
# The address of the STOMP server.
|
||||
config :host, :validate => :string, :required => true
|
||||
|
||||
# The port to connect to on your STOMP server.
|
||||
config :port, :validate => :number, :default => 61613
|
||||
|
||||
# The username to authenticate with.
|
||||
config :user, :validate => :string, :default => ""
|
||||
|
||||
# The password to authenticate with.
|
||||
config :password, :validate => :password, :default => ""
|
||||
|
||||
# The destination to read events from. Supports string expansion, meaning
|
||||
# %{foo} values will expand to the field value.
|
||||
#
|
||||
# Example: "/topic/logstash"
|
||||
config :destination, :validate => :string, :required => true
|
||||
|
||||
# Enable debugging output?
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
private
|
||||
def connect
|
||||
begin
|
||||
@client.connect
|
||||
@logger.debug("Connected to stomp server") if @client.connected?
|
||||
rescue => e
|
||||
@logger.debug("Failed to connect to stomp server : #{e}")
|
||||
sleep 2
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
public
|
||||
def register
|
||||
require "onstomp"
|
||||
@client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
|
||||
|
||||
# Handle disconnects
|
||||
@client.on_connection_closed {
|
||||
connect
|
||||
}
|
||||
|
||||
connect
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
@logger.debug(["stomp sending event", { :host => @host, :event => event }])
|
||||
@client.send(event.sprintf(@destination), event.to_json)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Onstomp
|
||||
|
|
@ -96,7 +96,9 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
@redis.publish event.sprintf(@key), event.to_json
|
||||
end
|
||||
rescue => e
|
||||
@logger.warn(["Failed to log #{event.to_s} to #{identity}.", e])
|
||||
@logger.warn("Failed to send event to redis", :event => event,
|
||||
:identity => identiy, :exception => e,
|
||||
:backtrace => e.backtrace)
|
||||
raise e
|
||||
end
|
||||
end # def receive
|
||||
|
|
|
@ -97,8 +97,7 @@ class LogStash::Outputs::Statsd < LogStash::Outputs::Base
|
|||
def build_stat(metric, sender=@sender)
|
||||
sender = sender.gsub('::','.').gsub(RESERVED_CHARACTERS_REGEX, '_').gsub(".", "_")
|
||||
metric = metric.gsub('::','.').gsub(RESERVED_CHARACTERS_REGEX, '_')
|
||||
@logger.debug("Formatted sender: #{sender}")
|
||||
@logger.debug("Formatted metric: #{metric}")
|
||||
@logger.debug("Formatted value", :sender => sender, :metric => metric)
|
||||
return "#{sender}.#{metric}"
|
||||
end
|
||||
end # class LogStash::Outputs::Statsd
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
|
||||
class LogStash::Outputs::Stomp < LogStash::Outputs::Base
|
||||
config_name "stomp"
|
||||
class LogStash::Outputs::Onstomp < LogStash::Outputs::Base
|
||||
config_name "onstomp"
|
||||
|
||||
|
||||
|
||||
# The address of the STOMP server.
|
||||
config :host, :validate => :string
|
||||
config :host, :validate => :string, :required => true
|
||||
|
||||
# The port to connect to on your STOMP server.
|
||||
config :port, :validate => :number, :default => 61613
|
||||
|
@ -21,20 +21,41 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base
|
|||
# %{foo} values will expand to the field value.
|
||||
#
|
||||
# Example: "/topic/logstash"
|
||||
config :destination, :validate => :string
|
||||
config :destination, :validate => :string, :required => true
|
||||
|
||||
# Enable debugging output?
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
public
|
||||
def register
|
||||
require "stomp"
|
||||
@client = Stomp::Client.new(@user, @password.value, @host, @port)
|
||||
end # def register
|
||||
private
|
||||
def connect
|
||||
begin
|
||||
@client.connect
|
||||
@logger.debug("Connected to stomp server") if @client.connected?
|
||||
rescue => e
|
||||
@logger.debug("Failed to connect to stomp server, will retry",
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
sleep 2
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
public
|
||||
def register
|
||||
require "onstomp"
|
||||
@client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
|
||||
|
||||
# Handle disconnects
|
||||
@client.on_connection_closed {
|
||||
connect
|
||||
}
|
||||
|
||||
connect
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
@logger.debug(["stomp sending event", { :host => @host, :event => event }])
|
||||
@client.publish(event.sprintf(@destination), event.to_json)
|
||||
@logger.debug(["stomp sending event", { :host => @host, :event => event }])
|
||||
@client.send(event.sprintf(@destination), event.to_json)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Stomp
|
||||
end # class LogStash::Outputs::Onstomp
|
||||
|
||||
|
|
|
@ -39,8 +39,8 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
begin
|
||||
@socket.write(@queue.pop)
|
||||
rescue => e
|
||||
@logger.warn(["tcp output exception", @socket, $!])
|
||||
@logger.debug(["backtrace", e.backtrace])
|
||||
@logger.warn("tcp output exception", :socket => @socket,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
break
|
||||
end
|
||||
end
|
||||
|
@ -55,7 +55,7 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
public
|
||||
def register
|
||||
if server?
|
||||
@logger.info("Starting tcp output listener on #{@host}:#{@port}")
|
||||
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
|
||||
@server_socket = TCPServer.new(@host, @port)
|
||||
@client_threads = []
|
||||
|
||||
|
@ -100,8 +100,8 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
|
|||
@client_socket.write(event.to_hash.to_json)
|
||||
@client_socket.write("\n")
|
||||
rescue => e
|
||||
@logger.warn(["tcp output exception", @host, @port, $!])
|
||||
@logger.debug(["backtrace", e.backtrace])
|
||||
@logger.warn("tcp output exception", :host => @host, :port => @port,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
@client_socket = nil
|
||||
end
|
||||
end
|
||||
|
|
|
@ -74,22 +74,22 @@ class LogStash::Outputs::Zabbix < LogStash::Outputs::Base
|
|||
end
|
||||
|
||||
if !File.exists?(@zabbix_sender)
|
||||
@logger.warn(["Skipping zabbix output; zabbix_sender file is missing",
|
||||
{"zabbix_sender" => @zabbix_sender, "missed_event" => event}])
|
||||
@logger.warn("Skipping zabbix output; zabbix_sender file is missing",
|
||||
:zabbix_sender => @zabbix_sender, :missed_event => event)
|
||||
return
|
||||
end
|
||||
|
||||
host = event.fields["zabbix_host"]
|
||||
if !host
|
||||
@logger.warn(["Skipping zabbix output; zabbix_host field is missing",
|
||||
{"missed_event" => event}])
|
||||
@logger.warn("Skipping zabbix output; zabbix_host field is missing",
|
||||
:missed_event => event)
|
||||
return
|
||||
end
|
||||
|
||||
item = event.fields["zabbix_item"]
|
||||
if !item
|
||||
@logger.warn(["Skipping zabbix output; zabbix_item field is missing",
|
||||
{"missed_event" => event}])
|
||||
@logger.warn("Skipping zabbix output; zabbix_item field is missing",
|
||||
:missed_event => event)
|
||||
return
|
||||
end
|
||||
|
||||
|
@ -99,14 +99,13 @@ class LogStash::Outputs::Zabbix < LogStash::Outputs::Base
|
|||
|
||||
cmd = "#{@zabbix_sender} -z #{@host} -p #{@port} -s #{host} -k #{item} -o \"#{zmsg}\" 2>/dev/null >/dev/null"
|
||||
|
||||
@logger.debug({"zabbix_sender_command" => cmd})
|
||||
@logger.debug("Running zabbix command", :command => cmd)
|
||||
begin
|
||||
system cmd
|
||||
system(cmd)
|
||||
rescue => e
|
||||
@logger.warn(["Skipping zabbix output; error calling zabbix_sender",
|
||||
{"error" => $!, "zabbix_sender_command" => cmd,
|
||||
"missed_event" => event}])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
@logger.warn("Skipping zabbix output; error calling zabbix_sender",
|
||||
:command => cmd, :missed_event => event,
|
||||
:exception => e, :backtrace => e.backtrace)
|
||||
end
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Nagios
|
||||
|
|
|
@ -32,7 +32,7 @@ class LogStash::Plugin
|
|||
# If you need to take special efforts to shutdown (like waiting for
|
||||
# an operation to complete, etc)
|
||||
teardown
|
||||
@logger.info("Got shutdown signal for #{self}")
|
||||
@logger.info("Received shutdown signal", :plugin => self)
|
||||
|
||||
@shutdown_queue = queue
|
||||
if @plugin_state == :finished
|
||||
|
@ -47,12 +47,12 @@ class LogStash::Plugin
|
|||
public
|
||||
def finished
|
||||
if @shutdown_queue
|
||||
@logger.info("Sending shutdown event to agent queue. (plugin #{to_s})")
|
||||
@logger.info("Sending shutdown event to agent queue", :plugin => self)
|
||||
@shutdown_queue << self
|
||||
end
|
||||
|
||||
if @plugin_state != :finished
|
||||
@logger.info("Plugin #{to_s} is finished")
|
||||
@logger.info("Plugin is finished", :plugin => self)
|
||||
@plugin_state = :finished
|
||||
end
|
||||
end # def finished
|
||||
|
|
|
@ -95,7 +95,7 @@ class LogStash::Runner
|
|||
return args
|
||||
end # def run
|
||||
|
||||
def emit_version
|
||||
def emit_version(args)
|
||||
require "logstash/version"
|
||||
puts "logstash #{LOGSTASH_VERSION}"
|
||||
|
||||
|
|
|
@ -27,13 +27,13 @@ class LogStash::Test
|
|||
def check_libraries
|
||||
results = [
|
||||
# main agent
|
||||
check_lib("grok", "jls-grok", :optional, "needed for the grok filter."),
|
||||
check_lib("grok-pure", "jls-grok", :optional, "needed for the grok filter."),
|
||||
check_lib("bunny", "bunny", :optional, "needed for AMQP input and output"),
|
||||
check_lib("uuidtools", "uuidtools", :required,
|
||||
"needed for AMQP input and output"),
|
||||
check_lib("ap", "awesome_print", :optional, "improve debug logging output"),
|
||||
check_lib("json", "json", :required, "required for logstash to function"),
|
||||
check_lib("filewatch/tailglob", "filewatch", :optional,
|
||||
check_lib("filewatch/tail", "filewatch", :optional,
|
||||
"required for file input"),
|
||||
check_lib("jruby-elasticsearch", "jruby-elasticsearch", :optional,
|
||||
"required for elasticsearch output and for logstash web"),
|
||||
|
|
|
@ -1 +1 @@
|
|||
LOGSTASH_VERSION = "1.1.0beta5"
|
||||
LOGSTASH_VERSION = "1.1.0beta6"
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
require File.join(File.dirname(__FILE__), "VERSION") # For LOGSTASH_VERSION
|
||||
require File.join(File.dirname(__FILE__), "lib", "logstash", "version") # For LOGSTASH_VERSION
|
||||
|
||||
Gem::Specification.new do |spec|
|
||||
files = []
|
||||
|
@ -27,13 +27,11 @@ Gem::Specification.new do |spec|
|
|||
spec.add_dependency("json")
|
||||
|
||||
# New for our JRuby stuff
|
||||
spec.add_dependency("file-tail")
|
||||
spec.add_dependency("jruby-elasticsearch", ">= 0.0.2")
|
||||
spec.add_dependency "bunny" # for amqp support
|
||||
spec.add_dependency "uuidtools" # for naming amqp queues
|
||||
spec.add_dependency "filewatch", "~> 0.2.3" # for file tailing
|
||||
spec.add_dependency "jls-grok", "~> 0.5.2" # for grok filter
|
||||
spec.add_dependency "jruby-elasticsearch", "~> 0.0.7"
|
||||
spec.add_dependency "jls-grok", "~> 0.9.0" # for grok filter
|
||||
spec.add_dependency "jruby-elasticsearch", "~> 0.0.11"
|
||||
spec.add_dependency "stomp" # for stomp protocol
|
||||
spec.add_dependency "json"
|
||||
spec.add_dependency "awesome_print"
|
||||
|
@ -49,9 +47,6 @@ Gem::Specification.new do |spec|
|
|||
spec.add_dependency "gmetric" # outputs/ganglia
|
||||
spec.add_dependency "xmpp4r" # outputs/xmpp
|
||||
|
||||
# For the 'grok' filter
|
||||
spec.add_dependency("jls-grok", "~> 0.4.7")
|
||||
|
||||
spec.add_dependency("bunny")
|
||||
spec.add_dependency("uuidtools")
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
|
|||
# Months: January, Feb, 3, 03, 12, December
|
||||
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
|
||||
MONTHNUM (?:0?[1-9]|1[0-2])
|
||||
MONTHDAY (?:3[01]|[1-2]?[0-9]|0?[1-9])
|
||||
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
|
||||
|
||||
# Days: Monday, Tue, Thu, etc...
|
||||
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
|
||||
|
|
|
@ -92,7 +92,7 @@ describe LogStash::Filters::Multiline do
|
|||
outputs << event.message
|
||||
end
|
||||
end
|
||||
last = @filter.flush("unknown", @typename)
|
||||
last = @filter.flush("unknown.#{@typename}")
|
||||
if last
|
||||
outputs << last.message
|
||||
end
|
||||
|
@ -137,7 +137,7 @@ describe LogStash::Filters::Multiline do
|
|||
outputs << event.message
|
||||
end
|
||||
end
|
||||
last = @filter.flush("unknown", @typename)
|
||||
last = @filter.flush("unknown.#{@typename}")
|
||||
if last
|
||||
outputs << last.message
|
||||
end
|
||||
|
@ -181,7 +181,7 @@ describe LogStash::Filters::Multiline do
|
|||
outputs << event.message
|
||||
end
|
||||
end
|
||||
last = @filter.flush("unknown", @typename)
|
||||
last = @filter.flush("unknown.#{@typename}")
|
||||
if last
|
||||
outputs << last.message
|
||||
end
|
||||
|
@ -191,4 +191,43 @@ describe LogStash::Filters::Multiline do
|
|||
assert_equal(expected, actual)
|
||||
end
|
||||
end # negate false
|
||||
|
||||
test "with custom stream identity" do
|
||||
config "pattern" => ".*", "what" => "next",
|
||||
"stream_identity" => "%{key}"
|
||||
|
||||
inputs = [
|
||||
"one",
|
||||
"two",
|
||||
"one",
|
||||
"two",
|
||||
"one",
|
||||
]
|
||||
|
||||
expected_outputs = [
|
||||
"one\none\none",
|
||||
"two\ntwo",
|
||||
]
|
||||
|
||||
outputs = []
|
||||
|
||||
inputs.each_with_index do |input, i|
|
||||
event = LogStash::Event.new
|
||||
event.type = @typename
|
||||
# even/odd keying to fake multiple streams.
|
||||
event["key"] = ["odd", "even"][i % 2]
|
||||
event.message = input
|
||||
@filter.filter(event)
|
||||
if !event.cancelled?
|
||||
outputs << event.message
|
||||
end
|
||||
end
|
||||
outputs << @filter.flush("odd").message
|
||||
outputs << @filter.flush("even").message
|
||||
assert_equal(expected_outputs.length, outputs.length,
|
||||
"Incorrect number of output events")
|
||||
expected_outputs.zip(outputs).each do |expected, actual|
|
||||
assert_equal(expected, actual)
|
||||
end
|
||||
end
|
||||
end # tests for LogStash::Filters::Multiline
|
||||
|
|
|
@ -10,7 +10,7 @@ require "logstash"
|
|||
parent = caller.collect {
|
||||
|c| c.gsub(/:[0-9]+(:.*)$/, "")
|
||||
}.find { |c| c != __FILE__ }
|
||||
require "minitest/autorun" if parent == $0
|
||||
require "minitest/autorun" if parent == $0 or ENV["AUTORUN"]
|
||||
|
||||
# I don't really like monkeypatching, but whatever, this is probably better
|
||||
# than overriding the 'describe' method.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue