- fix multiline specs

- added 'source' setting to multiline filter now that there's no
required "@message" field (due to event v1 schema)
This commit is contained in:
Jordan Sissel 2013-06-03 00:21:45 -07:00
parent 9b28a20b5c
commit 4a9eafbafb
3 changed files with 44 additions and 31 deletions

View file

@ -14,7 +14,6 @@ require "set"
# from files into a single event. For example - joining java exception and # from files into a single event. For example - joining java exception and
# stacktrace messages into a single event. # stacktrace messages into a single event.
# #
# TODO(sissel): Document any issues?
# The config looks like this: # The config looks like this:
# #
# filter { # filter {
@ -67,6 +66,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# The regular expression to match # The regular expression to match
config :pattern, :validate => :string, :required => true config :pattern, :validate => :string, :required => true
# The field to use for matching
config :source, :validate => :string, :default => "message"
# If the pattern matched, does event belong to the next or previous event? # If the pattern matched, does event belong to the next or previous event?
config :what, :validate => ["previous", "next"], :required => true config :what, :validate => ["previous", "next"], :required => true
@ -84,8 +86,8 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# reconnects (due to error or client restart), then logstash will identify # reconnects (due to error or client restart), then logstash will identify
# the new connection as a new stream and break any multiline goodness that # the new connection as a new stream and break any multiline goodness that
# may have occurred between the old and new connection. To solve this use # may have occurred between the old and new connection. To solve this use
# case, you can use "%{@source_host}.%{@type}" instead. # case, you can use "%{host}.%{type}" instead.
config :stream_identity , :validate => :string, :default => "%{@source}.%{@type}" config :stream_identity , :validate => :string, :default => "%{host}-%{path}-%{type}"
# logstash ships by default with a bunch of patterns, so you don't # logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional # necessarily need to define this yourself unless you are adding additional
@ -100,6 +102,13 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# NUMBER \d+ # NUMBER \d+
config :patterns_dir, :validate => :array, :default => [] config :patterns_dir, :validate => :array, :default => []
# Flush inactive multiline streams older than the given number of
# seconds.
#
# This is useful when your event stream is slow and you do not want to wait
# for the next event before seeing the current event.
#config :flush_age, :validate => :number, :default => 5
# Detect if we are running from a jarfile, pick the right path. # Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new @@patterns_path = Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/ if __FILE__ =~ /file:\/.*\.jar!.*/
@ -152,15 +161,15 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
def filter(event) def filter(event)
return unless filter?(event) return unless filter?(event)
if event.message.is_a?(Array) if event[@source].is_a?(Array)
match = @grok.match(event.message.first) match = @grok.match(event[@source].first)
else else
match = @grok.match(event.message) match = @grok.match(event[@source])
end end
key = event.sprintf(@stream_identity) key = event.sprintf(@stream_identity)
pending = @pending[key] pending = @pending[key]
@logger.debug("Multiline", :pattern => @pattern, :message => event.message, @logger.debug("Multiline", :pattern => @pattern, :message => event["message"],
:match => match, :negate => @negate) :match => match, :negate => @negate)
# Add negate option # Add negate option
@ -169,7 +178,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
case @what case @what
when "previous" when "previous"
if match if match
event.tags |= ["multiline"] event.tag("multiline")
# previous previous line is part of this event. # previous previous line is part of this event.
# append it to the event and cancel it # append it to the event and cancel it
if pending if pending
@ -183,17 +192,16 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# if we have a pending event, it's done, send it. # if we have a pending event, it's done, send it.
# put the current event into pending # put the current event into pending
if pending if pending
tmp = event.to_hash pending.uncancel
event.overwrite(pending) pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
@pending[key] = LogStash::Event.new(tmp) yield pending
else end
@pending[key] = event @pending[key] = event
event.cancel event.cancel
end # if/else pending
end # if/else match end # if/else match
when "next" when "next"
if match if match
event.tags |= ["multiline"] event.tag("multiline")
# this line is part of a multiline event, the next # this line is part of a multiline event, the next
# line will be part, too, put it into pending. # line will be part, too, put it into pending.
if pending if pending
@ -208,8 +216,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# multiline, send it. # multiline, send it.
if pending if pending
pending.append(event) pending.append(event)
event.overwrite(pending.to_hash)
@pending.delete(key) @pending.delete(key)
pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
yield pending
end end
end # if/else match end # if/else match
else else
@ -218,18 +227,23 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
end # case @what end # case @what
if !event.cancelled? if !event.cancelled?
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
filter_matched(event) filter_matched(event)
end end
end # def filter end # def filter
# Flush any pending messages. This is generally used for unit testing only.
public public
def flush def flush
events = [] events = []
@pending.each do |key, value| #flushed = []
value.uncancel @pending.each do |key, event|
events << value #next unless @flush_age.nil? || (Time.now - event.timestamp) > @flush_age
event.uncancel
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
events << event
#flushed << key
end end
#flushed.each { |k| @pending.delete(k) }
@pending.clear @pending.clear
return events return events
end # def flush end # def flush

View file

@ -16,8 +16,8 @@ describe LogStash::Filters::Multiline do
sample [ "hello world", " second line", "another first line" ] do sample [ "hello world", " second line", "another first line" ] do
insist { subject.length } == 2 insist { subject.length } == 2
insist { subject[0].message } == "hello world\n second line" insist { subject[0]["message"] } == "hello world\n second line"
insist { subject[1].message } == "another first line" insist { subject[1]["message"] } == "another first line"
end end
end end
@ -33,8 +33,8 @@ describe LogStash::Filters::Multiline do
CONFIG CONFIG
sample [ "120913 12:04:33 first line", "second line", "third line" ] do sample [ "120913 12:04:33 first line", "second line", "third line" ] do
reject { subject}.is_a? Array reject { subject }.is_a? Array
insist { subject.message } == "120913 12:04:33 first line\nsecond line\nthird line" insist { subject["message"] } == "120913 12:04:33 first line\nsecond line\nthird line"
end end
end end
@ -61,9 +61,7 @@ describe LogStash::Filters::Multiline do
[ "hello world #{stream}" ] \ [ "hello world #{stream}" ] \
+ rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" } + rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" }
) .collect do |line| ) .collect do |line|
LogStash::Event.new("@message" => line, { "message" => line, "source" => stream, "type" => stream, "event" => i }
"@source" => stream, "@type" => stream,
"@fields" => { "event" => i })
end end
end end
@ -77,7 +75,7 @@ describe LogStash::Filters::Multiline do
index = rand(eventstream.count) index = rand(eventstream.count)
event = eventstream[index].shift event = eventstream[index].shift
eventstream.delete_at(index) if eventstream[index].empty? eventstream.delete_at(index) if eventstream[index].empty?
event next event
end end
sample concurrent_stream do sample concurrent_stream do
@ -86,7 +84,7 @@ describe LogStash::Filters::Multiline do
#puts "#{i}/#{event["event"]}: #{event.to_json}" #puts "#{i}/#{event["event"]}: #{event.to_json}"
#insist { event.type } == stream #insist { event.type } == stream
#insist { event.source } == stream #insist { event.source } == stream
insist { event.message.split("\n").first } =~ /hello world / insist { event["message"].split("\n").first } =~ /hello world /
end end
end end
end end

View file

@ -69,11 +69,12 @@ module LogStash
end end
results = [] results = []
count = 0
event.each do |e| event.each do |e|
filters.each do |filter| filters.each do |filter|
next if e.cancelled? next if e.cancelled?
filter.filter(e) do |newevent| filter.filter(e) do |newevent|
event << newevent results << newevent unless e.cancelled?
end end
end end
results << e unless e.cancelled? results << e unless e.cancelled?