flush support in multiline filter

This commit is contained in:
Colin Surprenant 2014-04-11 01:34:54 -04:00
parent 31cb21efa2
commit cbb7f47b94

View file

@ -4,7 +4,7 @@ require "logstash/namespace"
require "set"
#
# This filter will collapse multiline messages from a single source into one Logstash event.
#
#
# The original goal of this filter was to allow joining of multi-line messages
# from files into a single event. For example - joining java exception and
# stacktrace messages into a single event.
@ -19,20 +19,20 @@ require "set"
# what => "previous" or "next"
# }
# }
#
#
# The `pattern` should be a regexp which matches what you believe to be an indicator
# that the field is part of an event consisting of multiple lines of log data.
#
# The `what` must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# The `negate` can be "true" or "false" (defaults to false). If "true", a
# The `negate` can be "true" or "false" (defaults to false). If "true", a
# message not matching the pattern will constitute a match of the multiline
# filter and the `what` will be applied. (vice-versa is also true)
#
# For example, Java stack traces are multiline and usually have the message
# starting at the far-left, with each subsequent line indented. Do this:
#
#
# filter {
# multiline {
# type => "somefiletype"
@ -52,7 +52,7 @@ require "set"
# what => "next"
# }
# }
#
#
# This says that any line ending with a backslash should be combined with the
# following line.
#
@ -69,7 +69,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false
# The stream identity is how the multiline filter determines which stream an
# event belongs to. This is generally used for differentiating, say, events
# coming from multiple files in the same file input, or multiple connections
@ -83,7 +83,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# may have occurred between the old and new connection. To solve this use
# case, you can use "%{@source_host}.%{@type}" instead.
config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"
# Logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns.
@ -97,6 +97,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []
# for debugging & testing purposes, do not use in production. allows periodic flushing of pending events
config :enable_flush, :validate => :boolean, :default => false
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/
@ -215,8 +218,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
end # case @what
if !event.cancelled?
event["message"] = event["message"].join("\n") if event["message"].is_a?(Array)
event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array)
collapse_event!(event)
filter_matched(event) if match
end
end # def filter
@ -225,13 +227,23 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
#
# Note: flush is disabled now; it is preferable to use the multiline codec.
public
def __flush
def flush
return [] unless @enable_flush
events = []
@pending.each do |key, value|
value.uncancel
events << value
events << collapse_event!(value)
end
@pending.clear
return events
end # def flush
private
def collapse_event!(event)
event["message"] = event["message"].join("\n") if event["message"].is_a?(Array)
event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array)
event
end
end # class LogStash::Filters::Multiline