From b55684d3fe6136ceb279a0e3896caf0018f9a26b Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Fri, 14 Jun 2013 23:06:24 -0700 Subject: [PATCH] - Fix filter execution for filters that emit multiple events (clone, split) - fix codec initialization - remove unreleased+deprecated features from filters/base - deprecate the multiline filter (see the new multiline codec) - skip date filter tests when we aren't on jruby --- lib/logstash/config/config_ast.rb | 39 +++-- lib/logstash/config/mixin.rb | 2 - lib/logstash/filters/base.rb | 19 +-- lib/logstash/filters/multiline.rb | 235 +----------------------------- lib/logstash/pipeline.rb | 25 +++- spec/filters/clone.rb | 29 ++-- spec/filters/date.rb | 3 +- spec/filters/date_performance.rb | 3 +- spec/filters/json.rb | 1 - spec/test_utils.rb | 12 +- 10 files changed, 88 insertions(+), 280 deletions(-) diff --git a/lib/logstash/config/config_ast.rb b/lib/logstash/config/config_ast.rb index 77a5a8fe7..d96a75b51 100644 --- a/lib/logstash/config/config_ast.rb +++ b/lib/logstash/config/config_ast.rb @@ -59,13 +59,17 @@ module LogStash; module Config; module AST ["filter", "output"].each do |type| definitions << "def #{type}(event)" if type == "filter" - definitions << " events = [event]" + definitions << " extra_events = []" end definitions << " @logger.info(\"#{type} received\", :event => event)" sections.select { |s| s.plugin_type.text_value == type }.each do |s| definitions << s.compile.split("\n").map { |e| " #{e}" }.join("\n") end + + if type == "filter" + definitions << " extra_events.each { |e| yield e }" + end definitions << "end" end @@ -114,7 +118,11 @@ module LogStash; module Config; module AST class Plugins < Node; end class Plugin < Node def plugin_type - return recursive_select_parent(PluginSection).first.plugin_type.text_value + if recursive_select_parent(Plugin).any? + return "codec" + else + return recursive_select_parent(PluginSection).first.plugin_type.text_value + end end def plugin_name @@ -127,19 +135,14 @@ module LogStash; module Config; module AST def compile_initializer # If any parent is a Plugin, this must be a codec. - if recursive_select_parent(Plugin).any? - type = "codec" - else - type = plugin_type - end if attributes.elements.nil? - return "plugin(#{type.inspect}, #{plugin_name.inspect})" << (type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect})" << (plugin_type == "codec" ? "" : "\n") else settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" - return "plugin(#{type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (type == "codec" ? "" : "\n") + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n") end end @@ -150,17 +153,27 @@ module LogStash; module Config; module AST when "filter" return [ "newevents = []", - "events.each do |event|", + "extra_events.each do |event|", " #{variable_name}.filter(event) do |newevent|", " newevents << newevent", " end", "end", - "events += newevents", - "events = events.reject(&:cancelled?)", - "return if events.empty?", + "extra_events += newevents", + + "#{variable_name}.filter(event) do |newevent|", + " extra_events << newevent", + "end", + "if event.cancelled?", + " extra_events.each { |e| yield e }", + " return", + "end", ].map { |l| "#{l}\n" }.join("") when "output" return "#{variable_name}.receive(event)" + when "codec" + settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?) + attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})" + return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})" end end end diff --git a/lib/logstash/config/mixin.rb b/lib/logstash/config/mixin.rb index c87bc06c3..77dbe69a8 100644 --- a/lib/logstash/config/mixin.rb +++ b/lib/logstash/config/mixin.rb @@ -291,7 +291,6 @@ module LogStash::Config::Mixin elsif validator.is_a?(Symbol) # TODO(sissel): Factor this out into a coersion method? # TODO(sissel): Document this stuff. - p :value => value value = hash_or_array(value) case validator @@ -304,7 +303,6 @@ module LogStash::Config::Mixin return true, value end when :hash - p :hash? => value if value.is_a?(Hash) return true, value end diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 55ea74a8b..0522ccef5 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -91,9 +91,6 @@ class LogStash::Filters::Base < LogStash::Plugin super config_init(params) @threadsafe = true - - @include_method = @include_any ? :any? : :all? - @exclude_method = @exclude_any ? :any? : :all? end # def initialize public @@ -165,16 +162,20 @@ class LogStash::Filters::Base < LogStash::Plugin end if !@tags.empty? - return false if event["tags"].nil? - if !@tags.send(@include_method) { |tag| event.tags.include?(tag) } - @logger.debug? and @logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event]) + # this filter has only works on events with certain tags, + # and this event has no tags. + return false if !event["tags"] + + # Is @tags a subset of the event's tags? If not, skip it. + if (event["tags"] & @tags).size != @tags.size + @logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event]) return false end end - if !@exclude_tags.empty? && !event["tags"].nil? - if @exclude_tags.send(@exclude_method) {|tag| event.tags.include?(tag)} - @logger.debug? and @logger.debug(["Skipping event because tags contains excluded tags: #{exclude_tags.inspect}", event]) + if !@exclude_tags.empty? && event["tags"] + if (diff_tags = (event["tags"] & @exclude_tags)).size != 0 + @logger.debug(["Skipping event because tags contains excluded tags: #{diff_tags.inspect}", event]) return false end end diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index 6db43befe..e880621e5 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -1,251 +1,30 @@ -# multiline filter -# -# This filter will collapse multiline messages into a single event. -# - require "logstash/filters/base" require "logstash/namespace" -require "set" +require "logstash/errors" -# The multiline filter is for combining multiple events from a single source -# into the same event. +# ## This filter was replaced by a codec. # -# The original goal of this filter was to allow joining of multi-line messages -# from files into a single event. For example - joining java exception and -# stacktrace messages into a single event. -# -# The config looks like this: -# -# filter { -# multiline { -# type => "type" -# pattern => "pattern, a regexp" -# negate => boolean -# what => "previous" or "next" -# } -# } -# -# The 'regexp' should match what you believe to be an indicator that -# the field is part of a multi-line event -# -# The 'what' must be "previous" or "next" and indicates the relation -# to the multi-line event. -# -# The 'negate' can be "true" or "false" (defaults false). If true, a -# message not matching the pattern will constitute a match of the multiline -# filter and the what will be applied. (vice-versa is also true) -# -# For example, java stack traces are multiline and usually have the message -# starting at the far-left, then each subsequent line indented. Do this: -# -# filter { -# multiline { -# type => "somefiletype" -# pattern => "^\s" -# what => "previous" -# } -# } -# -# This says that any line starting with whitespace belongs to the previous line. -# -# Another example is C line continuations (backslash). Here's how to do that: -# -# filter { -# multiline { -# type => "somefiletype " -# pattern => "\\$" -# what => "next" -# } -# } -# +# See the multiline codec instead. class LogStash::Filters::Multiline < LogStash::Filters::Base - config_name "multiline" plugin_status "stable" - # The regular expression to match + # Leave these config settings until we remove this filter entirely. + # THe idea is that we want the register method to cause an abort + # giving the user a clue to use the codec instead of the filter. config :pattern, :validate => :string, :required => true - - # The field to use for matching a multiline event. config :source, :validate => :string, :default => "message" - - # If the pattern matched, does event belong to the next or previous event? config :what, :validate => ["previous", "next"], :required => true - - # Negate the regexp pattern ('if not matched') config :negate, :validate => :boolean, :default => false - - # The stream identity is how the multiline filter determines which stream an - # event belongs. This is generally used for differentiating, say, events - # coming from multiple files in the same file input, or multiple connections - # coming from a tcp input. - # - # The default value here is usually what you want, but there are some cases - # where you want to change it. One such example is if you are using a tcp - # input with only one client connecting at any time. If that client - # reconnects (due to error or client restart), then logstash will identify - # the new connection as a new stream and break any multiline goodness that - # may have occurred between the old and new connection. To solve this use - # case, you can use "%{host}.%{type}" instead. config :stream_identity , :validate => :string, :default => "%{host}-%{path}-%{type}" - - # logstash ships by default with a bunch of patterns, so you don't - # necessarily need to define this yourself unless you are adding additional - # patterns. - # - # Pattern files are plain text with format: - # - # NAME PATTERN - # - # For example: - # - # NUMBER \d+ config :patterns_dir, :validate => :array, :default => [] - # Flush inactive multiline streams older than the given number of - # seconds. - # - # This is useful when your event stream is slow and you do not want to wait - # for the next event before seeing the current event. - #config :flush_age, :validate => :number, :default => 5 - - # Detect if we are running from a jarfile, pick the right path. - @@patterns_path = Set.new - if __FILE__ =~ /file:\/.*\.jar!.*/ - @@patterns_path += ["#{File.dirname(__FILE__)}/../../patterns/*"] - else - @@patterns_path += ["#{File.dirname(__FILE__)}/../../../patterns/*"] - end - - public - def initialize(config = {}) - super - - @threadsafe = false - - # This filter needs to keep state. - @types = Hash.new { |h,k| h[k] = [] } - @pending = Hash.new - end # def initialize - public def register - require "grok-pure" # rubygem 'jls-grok' - - @grok = Grok.new - - @patterns_dir = @@patterns_path.to_a + @patterns_dir - @patterns_dir.each do |path| - # Can't read relative paths from jars, try to normalize away '../' - while path =~ /file:\/.*\.jar!.*\/\.\.\// - # replace /foo/bar/../baz => /foo/baz - path = path.gsub(/[^\/]+\/\.\.\//, "") - end - - if File.directory?(path) - path = File.join(path, "*") - end - - Dir.glob(path).each do |file| - @logger.info("Grok loading patterns from file", :path => file) - @grok.add_patterns_from_file(file) - end - end - - @grok.compile(@pattern) - - @logger.debug("Registered multiline plugin", :type => @type, :config => @config) + raise LogStash::ConfigurationError, "The multiline filter has been replaced by the multiline codec. Please see http://logstash.net/docs/%VERSION%/codecs/multiline.\n" end # def register public def filter(event) - return unless filter?(event) - return unless event.include?(@source) - - if event[@source].is_a?(Array) - match = @grok.match(event[@source].first) - else - match = @grok.match(event[@source]) - end - key = event.sprintf(@stream_identity) - pending = @pending[key] - - @logger.debug("Multiline", :pattern => @pattern, :message => event["message"], - :match => match, :negate => @negate) - - # Add negate option - match = (match and !@negate) || (!match and @negate) - - case @what - when "previous" - if match - event.tag("multiline") - # previous previous line is part of this event. - # append it to the event and cancel it - if pending - pending.append(event) - else - @pending[key] = event - end - event.cancel - else - # this line is not part of the previous event - # if we have a pending event, it's done, send it. - # put the current event into pending - if pending - pending.uncancel - pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array) - yield pending - end - @pending[key] = event - event.cancel - end # if/else match - when "next" - if match - event.tag("multiline") - # this line is part of a multiline event, the next - # line will be part, too, put it into pending. - if pending - pending.append(event) - else - @pending[key] = event - end - event.cancel - else - # if we have something in pending, join it with this message - # and send it. otherwise, this is a new message and not part of - # multiline, send it. - if pending - pending.append(event) - @pending.delete(key) - pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array) - yield pending - end - end # if/else match - else - # TODO(sissel): Make this part of the 'register' method. - @logger.warn("Unknown multiline 'what' value.", :what => @what) - end # case @what - - if !event.cancelled? - event[@source] = event[@source].join("\n") if event[@source].is_a?(Array) - filter_matched(event) - end end # def filter - - public - def flush - events = [] - #flushed = [] - @pending.each do |key, event| - #next unless @flush_age.nil? || (Time.now - event.timestamp) > @flush_age - event.uncancel - event[@source] = event[@source].join("\n") if event[@source].is_a?(Array) - events << event - #flushed << key - end - #flushed.each { |k| @pending.delete(k) } - @pending.clear - return events - end # def flush end # class LogStash::Filters::Multiline diff --git a/lib/logstash/pipeline.rb b/lib/logstash/pipeline.rb index 48b416b1b..e7c0c3b70 100644 --- a/lib/logstash/pipeline.rb +++ b/lib/logstash/pipeline.rb @@ -10,14 +10,21 @@ class LogStash::Pipeline class ShutdownSignal < StandardError; end def initialize(configstr) + @logger = Cabin::Channel.get(LogStash) grammar = LogStashConfigParser.new @config = grammar.parse(configstr) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end - #puts (@config.compile) - eval(@config.compile) + # This will compile the config to ruby and evaluate the resulting code. + # The code will initialize all the plugins and define the + # filter and output methods. + code = @config.compile + # The config code is hard to represent as a log message... + # So just print it. + puts code if @logger.debug? + eval(code) @input_to_filter = SizedQueue.new(20) @@ -28,8 +35,6 @@ class LogStash::Pipeline @filter_to_output = SizedQueue.new(20) end - @logger = Cabin::Channel.get(LogStash) - end # def initialize def filters? @@ -139,9 +144,15 @@ class LogStash::Pipeline while true event = @input_to_filter.pop break if event == ShutdownSignal - filter(event) - next if event.cancelled? - @filter_to_output.push(event) + + events = [] + filter(event) do |newevent| + events << newevent + end + events.each do |event| + next if event.cancelled? + @filter_to_output.push(event) + end end rescue => e @logger.error("Exception in plugin #{plugin.class}", diff --git a/spec/filters/clone.rb b/spec/filters/clone.rb index e257e46d5..d50253436 100644 --- a/spec/filters/clone.rb +++ b/spec/filters/clone.rb @@ -15,11 +15,11 @@ describe LogStash::Filters::Clone do } CONFIG - sample "hello world" do - insist { subject}.is_a? Array + sample("message" => "hello world", "type" => "original") do + insist { subject }.is_a? Array insist { subject.length } == 4 subject.each_with_index do |s,i| - if i == 3 # last one should be 'original' + if i == 0 # last one should be 'original' insist { s["type"] } == "original" else insist { s["type"]} == "clone" @@ -45,22 +45,23 @@ describe LogStash::Filters::Clone do sample("type" => "nginx-access", "tags" => ["TESTLOG"], "message" => "hello world") do insist { subject }.is_a? Array insist { subject.length } == 3 - #All clones go through filter_matched - insist { subject[0].type } == "nginx-access-clone1" - reject { subject[0].tags }.include? "TESTLOG" - insist { subject[0].tags }.include? "RABBIT" - insist { subject[0].tags }.include? "NO_ES" - insist { subject[1].type } == "nginx-access-clone2" + insist { subject[0].type } == "nginx-access" + #Initial event remains unchanged + insist { subject[0].tags }.include? "TESTLOG" + reject { subject[0].tags }.include? "RABBIT" + reject { subject[0].tags }.include? "NO_ES" + #All clones go through filter_matched + insist { subject[1].type } == "nginx-access-clone1" reject { subject[1].tags }.include? "TESTLOG" insist { subject[1].tags }.include? "RABBIT" insist { subject[1].tags }.include? "NO_ES" - insist { subject[2].type } == "nginx-access" - #Initial event remains unchanged - insist { subject[2].tags }.include? "TESTLOG" - reject { subject[2].tags }.include? "RABBIT" - reject { subject[2].tags }.include? "NO_ES" + insist { subject[2].type } == "nginx-access-clone2" + reject { subject[2].tags }.include? "TESTLOG" + insist { subject[2].tags }.include? "RABBIT" + insist { subject[2].tags }.include? "NO_ES" + end end end diff --git a/spec/filters/date.rb b/spec/filters/date.rb index ea9d605ac..4ad48eff8 100644 --- a/spec/filters/date.rb +++ b/spec/filters/date.rb @@ -1,7 +1,8 @@ require "test_utils" require "logstash/filters/date" -describe LogStash::Filters::Date do +puts "Skipping date performance tests because this ruby is not jruby" if RUBY_ENGINE != "jruby" +RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do extend LogStash::RSpec describe "parsing with ISO8601" do diff --git a/spec/filters/date_performance.rb b/spec/filters/date_performance.rb index ce2ae1061..cff675ec8 100644 --- a/spec/filters/date_performance.rb +++ b/spec/filters/date_performance.rb @@ -1,7 +1,8 @@ require "test_utils" require "logstash/filters/date" -describe LogStash::Filters::Date do +puts "Skipping date tests because this ruby is not jruby" if RUBY_ENGINE != "jruby" +RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do extend LogStash::RSpec describe "performance test of java syntax parsing" do diff --git a/spec/filters/json.rb b/spec/filters/json.rb index d83e09d8b..72ead31f3 100644 --- a/spec/filters/json.rb +++ b/spec/filters/json.rb @@ -33,7 +33,6 @@ describe LogStash::Filters::Json do CONFIG sample '{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }' do - puts subject.to_json insist { subject["data"]["hello"] } == "world" insist { subject["data"]["list" ] } == [1,2,3] insist { subject["data"]["hash"] } == { "k" => "v" } diff --git a/spec/test_utils.rb b/spec/test_utils.rb index b4e6b68f3..e0f156d3e 100644 --- a/spec/test_utils.rb +++ b/spec/test_utils.rb @@ -56,13 +56,17 @@ module LogStash e = { "message" => e } if e.is_a?(String) next LogStash::Event.new(e) end - + results = [] count = 0 pipeline.instance_eval { @filters.each(&:register) } event.each do |e| - pipeline.filter(e) - results << e unless e.cancelled? + extra = [] + pipeline.filter(e) do |new_event| + extra << new_event + end + results << e if !e.cancelled? + results += extra.reject(&:cancelled?) end # TODO(sissel): pipeline flush needs to be implemented. @@ -87,7 +91,7 @@ module LogStash def agent(&block) @agent_count ||= 0 - require "logstash/agent" + require "logstash/pipeline" # scoping is hard, let's go shopping! config_str = @config_str