diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index 4b9b10c74..a26b21534 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -14,7 +14,6 @@ require "set" # from files into a single event. For example - joining java exception and # stacktrace messages into a single event. # -# TODO(sissel): Document any issues? # The config looks like this: # # filter { @@ -67,6 +66,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # The regular expression to match config :pattern, :validate => :string, :required => true + # The field to use for matching + config :source, :validate => :string, :default => "message" + # If the pattern matched, does event belong to the next or previous event? config :what, :validate => ["previous", "next"], :required => true @@ -84,8 +86,8 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # reconnects (due to error or client restart), then logstash will identify # the new connection as a new stream and break any multiline goodness that # may have occurred between the old and new connection. To solve this use - # case, you can use "%{@source_host}.%{@type}" instead. - config :stream_identity , :validate => :string, :default => "%{@source}.%{@type}" + # case, you can use "%{host}.%{type}" instead. + config :stream_identity , :validate => :string, :default => "%{host}-%{path}-%{type}" # logstash ships by default with a bunch of patterns, so you don't # necessarily need to define this yourself unless you are adding additional @@ -100,6 +102,13 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # NUMBER \d+ config :patterns_dir, :validate => :array, :default => [] + # Flush inactive multiline streams older than the given number of + # seconds. + # + # This is useful when your event stream is slow and you do not want to wait + # for the next event before seeing the current event. + #config :flush_age, :validate => :number, :default => 5 + # Detect if we are running from a jarfile, pick the right path. @@patterns_path = Set.new if __FILE__ =~ /file:\/.*\.jar!.*/ @@ -152,15 +161,15 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base def filter(event) return unless filter?(event) - if event.message.is_a?(Array) - match = @grok.match(event.message.first) + if event[@source].is_a?(Array) + match = @grok.match(event[@source].first) else - match = @grok.match(event.message) + match = @grok.match(event[@source]) end key = event.sprintf(@stream_identity) pending = @pending[key] - @logger.debug("Multiline", :pattern => @pattern, :message => event.message, + @logger.debug("Multiline", :pattern => @pattern, :message => event["message"], :match => match, :negate => @negate) # Add negate option @@ -169,7 +178,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base case @what when "previous" if match - event.tags |= ["multiline"] + event.tag("multiline") # previous previous line is part of this event. # append it to the event and cancel it if pending @@ -183,17 +192,16 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # if we have a pending event, it's done, send it. # put the current event into pending if pending - tmp = event.to_hash - event.overwrite(pending) - @pending[key] = LogStash::Event.new(tmp) - else - @pending[key] = event - event.cancel - end # if/else pending + pending.uncancel + pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array) + yield pending + end + @pending[key] = event + event.cancel end # if/else match when "next" if match - event.tags |= ["multiline"] + event.tag("multiline") # this line is part of a multiline event, the next # line will be part, too, put it into pending. if pending @@ -208,8 +216,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # multiline, send it. if pending pending.append(event) - event.overwrite(pending.to_hash) @pending.delete(key) + pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array) + yield pending end end # if/else match else @@ -218,18 +227,23 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base end # case @what if !event.cancelled? + event[@source] = event[@source].join("\n") if event[@source].is_a?(Array) filter_matched(event) end end # def filter - # Flush any pending messages. This is generally used for unit testing only. public def flush events = [] - @pending.each do |key, value| - value.uncancel - events << value + #flushed = [] + @pending.each do |key, event| + #next unless @flush_age.nil? || (Time.now - event.timestamp) > @flush_age + event.uncancel + event[@source] = event[@source].join("\n") if event[@source].is_a?(Array) + events << event + #flushed << key end + #flushed.each { |k| @pending.delete(k) } @pending.clear return events end # def flush diff --git a/spec/filters/multiline.rb b/spec/filters/multiline.rb index 4781c1cf4..aa74c0851 100644 --- a/spec/filters/multiline.rb +++ b/spec/filters/multiline.rb @@ -16,8 +16,8 @@ describe LogStash::Filters::Multiline do sample [ "hello world", " second line", "another first line" ] do insist { subject.length } == 2 - insist { subject[0].message } == "hello world\n second line" - insist { subject[1].message } == "another first line" + insist { subject[0]["message"] } == "hello world\n second line" + insist { subject[1]["message"] } == "another first line" end end @@ -33,8 +33,8 @@ describe LogStash::Filters::Multiline do CONFIG sample [ "120913 12:04:33 first line", "second line", "third line" ] do - reject { subject}.is_a? Array - insist { subject.message } == "120913 12:04:33 first line\nsecond line\nthird line" + reject { subject }.is_a? Array + insist { subject["message"] } == "120913 12:04:33 first line\nsecond line\nthird line" end end @@ -61,9 +61,7 @@ describe LogStash::Filters::Multiline do [ "hello world #{stream}" ] \ + rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" } ) .collect do |line| - LogStash::Event.new("@message" => line, - "@source" => stream, "@type" => stream, - "@fields" => { "event" => i }) + { "message" => line, "source" => stream, "type" => stream, "event" => i } end end @@ -77,7 +75,7 @@ describe LogStash::Filters::Multiline do index = rand(eventstream.count) event = eventstream[index].shift eventstream.delete_at(index) if eventstream[index].empty? - event + next event end sample concurrent_stream do @@ -86,7 +84,7 @@ describe LogStash::Filters::Multiline do #puts "#{i}/#{event["event"]}: #{event.to_json}" #insist { event.type } == stream #insist { event.source } == stream - insist { event.message.split("\n").first } =~ /hello world / + insist { event["message"].split("\n").first } =~ /hello world / end end end diff --git a/spec/test_utils.rb b/spec/test_utils.rb index b4352dd0c..220bf2a94 100644 --- a/spec/test_utils.rb +++ b/spec/test_utils.rb @@ -69,11 +69,12 @@ module LogStash end results = [] + count = 0 event.each do |e| filters.each do |filter| next if e.cancelled? filter.filter(e) do |newevent| - event << newevent + results << newevent unless e.cancelled? end end results << e unless e.cancelled?