mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge branch 'master' of github.com:logstash/logstash
This commit is contained in:
commit
809690fc5a
14 changed files with 208 additions and 108 deletions
|
@ -6,6 +6,11 @@
|
|||
with a "_") and sets timestamp to seconds-since-epoch (millisecond
|
||||
precision and time zone information is lost, but this is the format GELF
|
||||
asks for).
|
||||
- Inputs support specifying the format of input data (see "format" and
|
||||
"message_format" input config parameters).
|
||||
- Grok filter no longer incorrectly tags _grokparsefailure when more than
|
||||
one grok filter is enabled (for multiple types) or when an event has
|
||||
no grok configuration for it's type.
|
||||
|
||||
1.0.9 (May 18, 2011)
|
||||
- Fix crash bug caused by refactoring that left 'break' calls in code
|
||||
|
|
15
etc/examples/jsoninput.conf
Normal file
15
etc/examples/jsoninput.conf
Normal file
|
@ -0,0 +1,15 @@
|
|||
# Example config demonstrating the use of message_format
|
||||
|
||||
input {
|
||||
stdin {
|
||||
type => test
|
||||
format => json
|
||||
message_format => "%{date} | %{user} | %{action} | %{reason}"
|
||||
}
|
||||
}
|
||||
|
||||
output {
|
||||
stdout {
|
||||
debug => true
|
||||
}
|
||||
}
|
|
@ -14,6 +14,7 @@ class LogStash::File::Manager
|
|||
def initialize(output_queue)
|
||||
@tail = FileWatch::TailGlob.new
|
||||
@watching = Hash.new
|
||||
@to_event = Hash.new
|
||||
@watching_lock = Mutex.new
|
||||
@file_threads = {}
|
||||
@main_thread = nil
|
||||
|
@ -36,11 +37,11 @@ class LogStash::File::Manager
|
|||
end
|
||||
|
||||
public
|
||||
def watch(paths, config)
|
||||
def watch(paths, config, to_event)
|
||||
@watching_lock.synchronize do
|
||||
paths.each do |path|
|
||||
if @watching[path]
|
||||
raise ValueError, "cannot watch the same path #{path} more than once"
|
||||
raise "cannot watch the same path #{path} more than once"
|
||||
end
|
||||
@logger.debug(["watching file", {:path => path, :config => config}])
|
||||
|
||||
|
@ -61,6 +62,7 @@ class LogStash::File::Manager
|
|||
@tail.tail(path, tailconf) do |fullpath|
|
||||
@logger.info("New file found: #{fullpath}")
|
||||
@watching[fullpath] = config
|
||||
@to_event[fullpath] = to_event
|
||||
end
|
||||
# TODO(sissel): Make FileWatch emit real exceptions
|
||||
rescue RuntimeError
|
||||
|
@ -83,16 +85,13 @@ class LogStash::File::Manager
|
|||
# Maybe extend @tail.tail to accept a extra args that it will
|
||||
# pass to subscribe's callback?
|
||||
config = @watching[path]
|
||||
to_event = @to_event[path]
|
||||
@logger.debug(["Event from tail", { :path => path, :config => config }])
|
||||
@buffers[path].extract(data).each do |line|
|
||||
e = LogStash::Event.new({
|
||||
"@message" => line,
|
||||
"@type" => config["type"],
|
||||
"@tags" => config["tag"].dup,
|
||||
})
|
||||
e.source = "file://#{@hostname}#{path}"
|
||||
@logger.debug(["New event from file input", path, e])
|
||||
@output_queue << e
|
||||
e = to_event.call(line, "file://#{@hostname}#{path}")
|
||||
if e
|
||||
@output_queue << e
|
||||
end
|
||||
end
|
||||
end
|
||||
rescue Exception => e
|
||||
|
|
|
@ -109,6 +109,16 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
# parse it with grok
|
||||
match = false
|
||||
|
||||
# Only filter events we are configured for
|
||||
if event.type != @type
|
||||
return
|
||||
end
|
||||
|
||||
if @@grokpiles[event.type].length == 0
|
||||
@logger.debug("Skipping grok for event type=#{event.type} (no grokpiles defined)")
|
||||
return
|
||||
end
|
||||
|
||||
if !event.message.is_a?(Array)
|
||||
messages = [event.message]
|
||||
else
|
||||
|
@ -119,7 +129,7 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
|
|||
@logger.debug(["Running grok filter", event])
|
||||
|
||||
@@grokpiles[event.type].each do |pile|
|
||||
@logger.debug(["Trying pattern", pile])
|
||||
@logger.debug(["Trying pattern for type #{event.type}", pile])
|
||||
grok, match = @pile.match(message)
|
||||
@logger.debug(["Result", { :grok => grok, :match => match }])
|
||||
break if match
|
||||
|
|
|
@ -41,6 +41,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
def initialize(params)
|
||||
super
|
||||
|
||||
@format ||= ["json_event"]
|
||||
|
||||
if !MQTYPES.include?(@exchange_type)
|
||||
raise "Invalid type '#{@exchange_type}' must be one of #{MQTYPES.join(", ")}"
|
||||
end
|
||||
|
@ -50,14 +52,21 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
def register
|
||||
@logger.info("Registering input #{@url}")
|
||||
require "bunny" # rubygem 'bunny'
|
||||
@vhost ||= "/"
|
||||
@port ||= 5672
|
||||
@amqpsettings = {
|
||||
:vhost => (@vhost or "/"),
|
||||
:vhost => @vhost,
|
||||
:host => @host,
|
||||
:port => (@port or 5672),
|
||||
:port => @port,
|
||||
}
|
||||
@amqpsettings[:user] = @user if @user
|
||||
@amqpsettings[:pass] = @password.value if @password
|
||||
@amqpsettings[:logging] = @debug
|
||||
@amqpurl = "amqp://"
|
||||
if @user or @password
|
||||
@amqpurl += "#{@user}:#{@password}@"
|
||||
end
|
||||
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"
|
||||
end # def register
|
||||
|
||||
def run(queue)
|
||||
|
@ -73,14 +82,10 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
@queue.bind(exchange)
|
||||
|
||||
@queue.subscribe do |data|
|
||||
begin
|
||||
obj = JSON.parse(data[:payload])
|
||||
rescue => e
|
||||
@logger.error(["json parse error", { :exception => e }])
|
||||
raise e
|
||||
e = to_event(data[:payload], @amqpurl)
|
||||
if e
|
||||
queue << e
|
||||
end
|
||||
|
||||
queue << LogStash::Event.new(obj)
|
||||
end # @queue.subscribe
|
||||
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
|
||||
@logger.error("AMQP connection error, will reconnect: #{e}")
|
||||
|
|
|
@ -16,6 +16,21 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
# Set this to true to enable debugging on an input.
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
# The format of input data (plain, json, json_event)
|
||||
config :format, :validate => (lambda do |value|
|
||||
valid_formats = ["plain", "json", "json_event"]
|
||||
if value.length != 1
|
||||
false
|
||||
else
|
||||
valid_formats.member?(value.first)
|
||||
end
|
||||
end) # config :format
|
||||
|
||||
# If format is "json", an event sprintf string to build what
|
||||
# the display @message should be (defaults to the raw JSON).
|
||||
# sprintf format strings look like %{fieldname} or %{@metadata}.
|
||||
config :message_format, :validate => :string
|
||||
|
||||
# Add any number of arbitrary tags to your event.
|
||||
#
|
||||
# This can help with processing later.
|
||||
|
@ -48,4 +63,51 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
def tag(newtag)
|
||||
@tags << newtag
|
||||
end # def tag
|
||||
|
||||
protected
|
||||
def to_event(raw, source)
|
||||
@format ||= ["plain"]
|
||||
|
||||
event = LogStash::Event.new
|
||||
event.type = @type
|
||||
event.tags = @tags.clone rescue []
|
||||
event.source = source
|
||||
|
||||
case @format.first
|
||||
when "plain":
|
||||
event.message = raw
|
||||
when "json":
|
||||
begin
|
||||
fields = JSON.parse(raw)
|
||||
fields.each { |k, v| event[k] = v }
|
||||
rescue
|
||||
@logger.warn({:message => "Trouble parsing json input",
|
||||
:input => raw,
|
||||
:source => source,
|
||||
})
|
||||
return nil
|
||||
end
|
||||
|
||||
if @message_format
|
||||
event.message = event.sprintf(@message_format)
|
||||
else
|
||||
event.message = raw
|
||||
end
|
||||
when "json_event":
|
||||
begin
|
||||
event = LogStash::Event.from_json(raw)
|
||||
rescue
|
||||
@logger.warn({:message => "Trouble parsing json_event input",
|
||||
:input => raw,
|
||||
:source => source,
|
||||
})
|
||||
return nil
|
||||
end
|
||||
else
|
||||
raise "unknown event format #{@format.first}, this should never happen"
|
||||
end
|
||||
|
||||
logger.debug(["Received new event", {:source => source, :event => event}])
|
||||
return event
|
||||
end
|
||||
end # class LogStash::Inputs::Base
|
||||
|
|
|
@ -46,6 +46,6 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
|
|||
end
|
||||
end
|
||||
|
||||
@@filemanager.watch(@path, @config)
|
||||
@@filemanager.watch(@path, @config, method(:to_event))
|
||||
end # def run
|
||||
end # class LogStash::Inputs::File
|
||||
|
|
|
@ -32,11 +32,21 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@format ||= ["json_event"]
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
end
|
||||
@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"
|
||||
end # def register
|
||||
|
||||
private
|
||||
def connect
|
||||
Redis.new(
|
||||
:host => @host,
|
||||
|
@ -45,8 +55,9 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
:db => @db,
|
||||
:password => @password
|
||||
)
|
||||
end
|
||||
end # def connect
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
retries = @retries
|
||||
loop do
|
||||
|
@ -54,20 +65,19 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
@redis ||= connect
|
||||
response = @redis.blpop @queue, 0
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
e = to_event(response[1], @redis_url)
|
||||
if e
|
||||
output_queue << e
|
||||
end
|
||||
rescue # redis error
|
||||
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
|
||||
if retries <= 0
|
||||
raise RuntimeError, "Redis connection failed too many times"
|
||||
end
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn(["Failed to get event from redis #{@name}. " +
|
||||
"Will retry #{retries} times.", $!])
|
||||
retries -= 1
|
||||
sleep 1
|
||||
sleep(1)
|
||||
end
|
||||
end # loop
|
||||
end # def run
|
||||
|
|
|
@ -17,19 +17,10 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
|
||||
def run(queue)
|
||||
loop do
|
||||
event = LogStash::Event.new
|
||||
begin
|
||||
event.message = $stdin.readline.chomp
|
||||
rescue *[EOFError, IOError] => e
|
||||
@logger.info("Got EOF from stdin input. Ending")
|
||||
finished
|
||||
return
|
||||
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
|
||||
if e
|
||||
queue << e
|
||||
end
|
||||
event.type = @type
|
||||
event.tags = @tags.clone rescue []
|
||||
event.source = "stdin://#{@host}/"
|
||||
@logger.debug(["Got event", event])
|
||||
queue << event
|
||||
end # loop
|
||||
end # def run
|
||||
|
||||
|
|
|
@ -25,33 +25,37 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
|||
# The destination to read events from.
|
||||
#
|
||||
# Example: "/topic/logstash"
|
||||
config :destination, :validate => :string
|
||||
config :destination, :validate => :string, :required => true
|
||||
|
||||
# Enable debugging output?
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@format ||= "json_event"
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
require "stomp"
|
||||
|
||||
if @destination == "" or @destination.nil?
|
||||
@logger.error("No destination path given for stomp")
|
||||
return
|
||||
end
|
||||
|
||||
begin
|
||||
@client = Stomp::Client.new(@user, @password.value, @host, @port)
|
||||
rescue Errno::ECONNREFUSED
|
||||
@stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
|
||||
rescue Errno::ECONNREFUSED => e
|
||||
@logger.error("Connection refused to #{@host}:#{@port}...")
|
||||
# TODO(sissel): Retry?
|
||||
raise e
|
||||
end
|
||||
end # def register
|
||||
|
||||
def run(queue)
|
||||
@client.subscribe(@destination) do |msg|
|
||||
@logger.debug(["Got message from stomp", { :msg => msg }])
|
||||
#event = LogStash::Event.from_json(message.body)
|
||||
#queue << event
|
||||
e = to_event(message.body, @stomp_url)
|
||||
if e
|
||||
queue << e
|
||||
end
|
||||
end
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Stomp
|
||||
|
|
|
@ -19,6 +19,14 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
# The port to listen on
|
||||
config :port, :validate => :number, :default => 514
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
# force "plain" format. others don't make sense here.
|
||||
@format = ["plain"]
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
# This comes from RFC3164, mostly.
|
||||
|
@ -64,20 +72,13 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
|
||||
loop do
|
||||
line, client = server.recvfrom(9000)
|
||||
p :client => client
|
||||
p :line => line
|
||||
begin
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil, nil, nil, nil)
|
||||
syslog_relay(event, source)
|
||||
rescue => e
|
||||
p :exception => e
|
||||
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil,
|
||||
nil, nil, nil)
|
||||
e = to_event(line.chomp, source.to_s)
|
||||
if e
|
||||
syslog_relay(e, source)
|
||||
output_queue << e
|
||||
end
|
||||
output_queue << event
|
||||
end
|
||||
ensure
|
||||
if server
|
||||
|
@ -96,19 +97,18 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
|
|||
ip, port = client.peeraddr[3], client.peeraddr[1]
|
||||
@logger.warn("got connection from #{ip}:#{port}")
|
||||
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
|
||||
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil, nil, nil, nil)
|
||||
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil,
|
||||
nil, nil, nil)
|
||||
client.each do |line|
|
||||
event = LogStash::Event.new({
|
||||
"@message" => line.chomp,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
source = source_base.dup
|
||||
syslog_relay(event, source)
|
||||
output_queue << event
|
||||
end
|
||||
end
|
||||
end
|
||||
e = to_event(line.chomp, source_base.to_s)
|
||||
if e
|
||||
source = source_base.dup
|
||||
syslog_relay(e, source)
|
||||
output_queue << e
|
||||
end # e
|
||||
end # client.each
|
||||
end # Thread.new
|
||||
end # loop do
|
||||
ensure
|
||||
server.close if server
|
||||
end # def tcp_listener
|
||||
|
|
|
@ -44,14 +44,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
|
|||
Timeout::timeout(@data_timeout) do
|
||||
buf = s.readline
|
||||
end
|
||||
e = LogStash::Event.new({
|
||||
"@message" => buf,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
e.source = "tcp://#{@host}:#{@port}/client/#{peer}"
|
||||
@logger.debug(["Received message from #{peer}", e])
|
||||
output_queue << e
|
||||
e = self.to_event(buf, "tcp://#{@host}:#{@port}/client/#{peer}")
|
||||
if e
|
||||
output_queue << e
|
||||
end
|
||||
end # loop do
|
||||
rescue
|
||||
@logger.debug(["Closing connection with #{peer}", $!])
|
||||
|
|
|
@ -17,6 +17,15 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
|
|||
# Any keywords to track in the twitter stream
|
||||
config :keywords, :validate => :array, :required => true
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
# Force format to plain. Other values don't make any sense here.
|
||||
@format = ["plain"]
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
# TODO(sissel): put buftok in logstash, too
|
||||
require "filewatch/buftok"
|
||||
|
@ -32,34 +41,23 @@ class LogStash::Inputs::Twitter < LogStash::Inputs::Base
|
|||
@logger.debug :status => status
|
||||
#@logger.debug("Got twitter status from @#{status[:user][:screen_name]}")
|
||||
@logger.info("Got twitter status from @#{status["user"]["screen_name"]}")
|
||||
event = LogStash::Event.new(
|
||||
#"@message" => status[:text],
|
||||
"@message" => status["text"],
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone
|
||||
)
|
||||
e = to_event(status["text"], "http://twitter.com/#{status["user"]["screen_name"]}/status/#{status["id"]}")
|
||||
next unless e
|
||||
|
||||
event.fields.merge!(
|
||||
#"user" => (status[:user][:screen_name] rescue nil),
|
||||
"user" => (status["user"]["screen_name"] rescue nil),
|
||||
#"client" => (status[:source] rescue nil),
|
||||
e.fields.merge!(
|
||||
"user" => (status["user"]["screen_name"] rescue nil),
|
||||
"client" => (status["source"] rescue nil),
|
||||
#"retweeted" => (status[:retweeted] rescue nil)
|
||||
"retweeted" => (status["retweeted"] rescue nil)
|
||||
)
|
||||
|
||||
#event.fields["in-reply-to"] = status[:in_reply_to_status_id] if status[:in_reply_to_status_id]
|
||||
event.fields["in-reply-to"] = status["in_reply_to_status_id"] if status["in_reply_to_status_id"]
|
||||
e.fields["in-reply-to"] = status["in_reply_to_status_id"] if status["in_reply_to_status_id"]
|
||||
|
||||
#urls = status[:entities][:urls] rescue []
|
||||
urls = status["entities"]["urls"] rescue []
|
||||
if urls.size > 0
|
||||
event.fields["urls"] = urls.collect { |u| u["url"] }
|
||||
e.fields["urls"] = urls.collect { |u| u["url"] }
|
||||
end
|
||||
|
||||
event.source = "http://twitter.com/#{event.fields["user"]}/status/#{status["id"]}"
|
||||
@logger.debug(["Got event", event])
|
||||
queue << event
|
||||
queue << e
|
||||
end # stream.track
|
||||
|
||||
# Some closure or error occured, sleep and try again.
|
||||
|
|
|
@ -80,7 +80,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
|
||||
begin
|
||||
if @target
|
||||
@target.publish(event.to_json, :persistent => @persistent)
|
||||
begin
|
||||
@target.publish(event.to_json, :persistent => @persistent)
|
||||
rescue JSON::GeneratorError
|
||||
@logger.warn(["Trouble converting event to JSON", $!, event.to_hash])
|
||||
return
|
||||
end
|
||||
else
|
||||
@logger.warn("Tried to send message, but not connected to amqp yet.")
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue