- Fix filter execution for filters that emit multiple events (clone,

split)
- fix codec initialization
- remove unreleased+deprecated features from filters/base
- deprecate the multiline filter (see the new multiline codec)
- skip date filter tests when we aren't on jruby
This commit is contained in:
Jordan Sissel 2013-06-14 23:06:24 -07:00
parent aa45cc2417
commit b55684d3fe
10 changed files with 88 additions and 280 deletions

View file

@ -59,13 +59,17 @@ module LogStash; module Config; module AST
["filter", "output"].each do |type|
definitions << "def #{type}(event)"
if type == "filter"
definitions << " events = [event]"
definitions << " extra_events = []"
end
definitions << " @logger.info(\"#{type} received\", :event => event)"
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
definitions << s.compile.split("\n").map { |e| " #{e}" }.join("\n")
end
if type == "filter"
definitions << " extra_events.each { |e| yield e }"
end
definitions << "end"
end
@ -114,7 +118,11 @@ module LogStash; module Config; module AST
class Plugins < Node; end
class Plugin < Node
def plugin_type
return recursive_select_parent(PluginSection).first.plugin_type.text_value
if recursive_select_parent(Plugin).any?
return "codec"
else
return recursive_select_parent(PluginSection).first.plugin_type.text_value
end
end
def plugin_name
@ -127,19 +135,14 @@ module LogStash; module Config; module AST
def compile_initializer
# If any parent is a Plugin, this must be a codec.
if recursive_select_parent(Plugin).any?
type = "codec"
else
type = plugin_type
end
if attributes.elements.nil?
return "plugin(#{type.inspect}, #{plugin_name.inspect})" << (type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect})" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (type == "codec" ? "" : "\n")
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
end
end
@ -150,17 +153,27 @@ module LogStash; module Config; module AST
when "filter"
return [
"newevents = []",
"events.each do |event|",
"extra_events.each do |event|",
" #{variable_name}.filter(event) do |newevent|",
" newevents << newevent",
" end",
"end",
"events += newevents",
"events = events.reject(&:cancelled?)",
"return if events.empty?",
"extra_events += newevents",
"#{variable_name}.filter(event) do |newevent|",
" extra_events << newevent",
"end",
"if event.cancelled?",
" extra_events.each { |e| yield e }",
" return",
"end",
].map { |l| "#{l}\n" }.join("")
when "output"
return "#{variable_name}.receive(event)"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
end
end
end

View file

@ -291,7 +291,6 @@ module LogStash::Config::Mixin
elsif validator.is_a?(Symbol)
# TODO(sissel): Factor this out into a coersion method?
# TODO(sissel): Document this stuff.
p :value => value
value = hash_or_array(value)
case validator
@ -304,7 +303,6 @@ module LogStash::Config::Mixin
return true, value
end
when :hash
p :hash? => value
if value.is_a?(Hash)
return true, value
end

View file

@ -91,9 +91,6 @@ class LogStash::Filters::Base < LogStash::Plugin
super
config_init(params)
@threadsafe = true
@include_method = @include_any ? :any? : :all?
@exclude_method = @exclude_any ? :any? : :all?
end # def initialize
public
@ -165,16 +162,20 @@ class LogStash::Filters::Base < LogStash::Plugin
end
if !@tags.empty?
return false if event["tags"].nil?
if !@tags.send(@include_method) { |tag| event.tags.include?(tag) }
@logger.debug? and @logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event])
# this filter has only works on events with certain tags,
# and this event has no tags.
return false if !event["tags"]
# Is @tags a subset of the event's tags? If not, skip it.
if (event["tags"] & @tags).size != @tags.size
@logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event])
return false
end
end
if !@exclude_tags.empty? && !event["tags"].nil?
if @exclude_tags.send(@exclude_method) {|tag| event.tags.include?(tag)}
@logger.debug? and @logger.debug(["Skipping event because tags contains excluded tags: #{exclude_tags.inspect}", event])
if !@exclude_tags.empty? && event["tags"]
if (diff_tags = (event["tags"] & @exclude_tags)).size != 0
@logger.debug(["Skipping event because tags contains excluded tags: #{diff_tags.inspect}", event])
return false
end
end

View file

@ -1,251 +1,30 @@
# multiline filter
#
# This filter will collapse multiline messages into a single event.
#
require "logstash/filters/base"
require "logstash/namespace"
require "set"
require "logstash/errors"
# The multiline filter is for combining multiple events from a single source
# into the same event.
# ## This filter was replaced by a codec.
#
# The original goal of this filter was to allow joining of multi-line messages
# from files into a single event. For example - joining java exception and
# stacktrace messages into a single event.
#
# The config looks like this:
#
# filter {
# multiline {
# type => "type"
# pattern => "pattern, a regexp"
# negate => boolean
# what => "previous" or "next"
# }
# }
#
# The 'regexp' should match what you believe to be an indicator that
# the field is part of a multi-line event
#
# The 'what' must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# The 'negate' can be "true" or "false" (defaults false). If true, a
# message not matching the pattern will constitute a match of the multiline
# filter and the what will be applied. (vice-versa is also true)
#
# For example, java stack traces are multiline and usually have the message
# starting at the far-left, then each subsequent line indented. Do this:
#
# filter {
# multiline {
# type => "somefiletype"
# pattern => "^\s"
# what => "previous"
# }
# }
#
# This says that any line starting with whitespace belongs to the previous line.
#
# Another example is C line continuations (backslash). Here's how to do that:
#
# filter {
# multiline {
# type => "somefiletype "
# pattern => "\\$"
# what => "next"
# }
# }
#
# See the multiline codec instead.
class LogStash::Filters::Multiline < LogStash::Filters::Base
config_name "multiline"
plugin_status "stable"
# The regular expression to match
# Leave these config settings until we remove this filter entirely.
# THe idea is that we want the register method to cause an abort
# giving the user a clue to use the codec instead of the filter.
config :pattern, :validate => :string, :required => true
# The field to use for matching a multiline event.
config :source, :validate => :string, :default => "message"
# If the pattern matched, does event belong to the next or previous event?
config :what, :validate => ["previous", "next"], :required => true
# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false
# The stream identity is how the multiline filter determines which stream an
# event belongs. This is generally used for differentiating, say, events
# coming from multiple files in the same file input, or multiple connections
# coming from a tcp input.
#
# The default value here is usually what you want, but there are some cases
# where you want to change it. One such example is if you are using a tcp
# input with only one client connecting at any time. If that client
# reconnects (due to error or client restart), then logstash will identify
# the new connection as a new stream and break any multiline goodness that
# may have occurred between the old and new connection. To solve this use
# case, you can use "%{host}.%{type}" instead.
config :stream_identity , :validate => :string, :default => "%{host}-%{path}-%{type}"
# logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns.
#
# Pattern files are plain text with format:
#
# NAME PATTERN
#
# For example:
#
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []
# Flush inactive multiline streams older than the given number of
# seconds.
#
# This is useful when your event stream is slow and you do not want to wait
# for the next event before seeing the current event.
#config :flush_age, :validate => :number, :default => 5
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/
@@patterns_path += ["#{File.dirname(__FILE__)}/../../patterns/*"]
else
@@patterns_path += ["#{File.dirname(__FILE__)}/../../../patterns/*"]
end
public
def initialize(config = {})
super
@threadsafe = false
# This filter needs to keep state.
@types = Hash.new { |h,k| h[k] = [] }
@pending = Hash.new
end # def initialize
public
def register
require "grok-pure" # rubygem 'jls-grok'
@grok = Grok.new
@patterns_dir = @@patterns_path.to_a + @patterns_dir
@patterns_dir.each do |path|
# Can't read relative paths from jars, try to normalize away '../'
while path =~ /file:\/.*\.jar!.*\/\.\.\//
# replace /foo/bar/../baz => /foo/baz
path = path.gsub(/[^\/]+\/\.\.\//, "")
end
if File.directory?(path)
path = File.join(path, "*")
end
Dir.glob(path).each do |file|
@logger.info("Grok loading patterns from file", :path => file)
@grok.add_patterns_from_file(file)
end
end
@grok.compile(@pattern)
@logger.debug("Registered multiline plugin", :type => @type, :config => @config)
raise LogStash::ConfigurationError, "The multiline filter has been replaced by the multiline codec. Please see http://logstash.net/docs/%VERSION%/codecs/multiline.\n"
end # def register
public
def filter(event)
return unless filter?(event)
return unless event.include?(@source)
if event[@source].is_a?(Array)
match = @grok.match(event[@source].first)
else
match = @grok.match(event[@source])
end
key = event.sprintf(@stream_identity)
pending = @pending[key]
@logger.debug("Multiline", :pattern => @pattern, :message => event["message"],
:match => match, :negate => @negate)
# Add negate option
match = (match and !@negate) || (!match and @negate)
case @what
when "previous"
if match
event.tag("multiline")
# previous previous line is part of this event.
# append it to the event and cancel it
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# this line is not part of the previous event
# if we have a pending event, it's done, send it.
# put the current event into pending
if pending
pending.uncancel
pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
yield pending
end
@pending[key] = event
event.cancel
end # if/else match
when "next"
if match
event.tag("multiline")
# this line is part of a multiline event, the next
# line will be part, too, put it into pending.
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# if we have something in pending, join it with this message
# and send it. otherwise, this is a new message and not part of
# multiline, send it.
if pending
pending.append(event)
@pending.delete(key)
pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
yield pending
end
end # if/else match
else
# TODO(sissel): Make this part of the 'register' method.
@logger.warn("Unknown multiline 'what' value.", :what => @what)
end # case @what
if !event.cancelled?
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
filter_matched(event)
end
end # def filter
public
def flush
events = []
#flushed = []
@pending.each do |key, event|
#next unless @flush_age.nil? || (Time.now - event.timestamp) > @flush_age
event.uncancel
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
events << event
#flushed << key
end
#flushed.each { |k| @pending.delete(k) }
@pending.clear
return events
end # def flush
end # class LogStash::Filters::Multiline

View file

@ -10,14 +10,21 @@ class LogStash::Pipeline
class ShutdownSignal < StandardError; end
def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)
if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
#puts (@config.compile)
eval(@config.compile)
# This will compile the config to ruby and evaluate the resulting code.
# The code will initialize all the plugins and define the
# filter and output methods.
code = @config.compile
# The config code is hard to represent as a log message...
# So just print it.
puts code if @logger.debug?
eval(code)
@input_to_filter = SizedQueue.new(20)
@ -28,8 +35,6 @@ class LogStash::Pipeline
@filter_to_output = SizedQueue.new(20)
end
@logger = Cabin::Channel.get(LogStash)
end # def initialize
def filters?
@ -139,9 +144,15 @@ class LogStash::Pipeline
while true
event = @input_to_filter.pop
break if event == ShutdownSignal
filter(event)
next if event.cancelled?
@filter_to_output.push(event)
events = []
filter(event) do |newevent|
events << newevent
end
events.each do |event|
next if event.cancelled?
@filter_to_output.push(event)
end
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",

View file

@ -15,11 +15,11 @@ describe LogStash::Filters::Clone do
}
CONFIG
sample "hello world" do
insist { subject}.is_a? Array
sample("message" => "hello world", "type" => "original") do
insist { subject }.is_a? Array
insist { subject.length } == 4
subject.each_with_index do |s,i|
if i == 3 # last one should be 'original'
if i == 0 # last one should be 'original'
insist { s["type"] } == "original"
else
insist { s["type"]} == "clone"
@ -45,22 +45,23 @@ describe LogStash::Filters::Clone do
sample("type" => "nginx-access", "tags" => ["TESTLOG"], "message" => "hello world") do
insist { subject }.is_a? Array
insist { subject.length } == 3
#All clones go through filter_matched
insist { subject[0].type } == "nginx-access-clone1"
reject { subject[0].tags }.include? "TESTLOG"
insist { subject[0].tags }.include? "RABBIT"
insist { subject[0].tags }.include? "NO_ES"
insist { subject[1].type } == "nginx-access-clone2"
insist { subject[0].type } == "nginx-access"
#Initial event remains unchanged
insist { subject[0].tags }.include? "TESTLOG"
reject { subject[0].tags }.include? "RABBIT"
reject { subject[0].tags }.include? "NO_ES"
#All clones go through filter_matched
insist { subject[1].type } == "nginx-access-clone1"
reject { subject[1].tags }.include? "TESTLOG"
insist { subject[1].tags }.include? "RABBIT"
insist { subject[1].tags }.include? "NO_ES"
insist { subject[2].type } == "nginx-access"
#Initial event remains unchanged
insist { subject[2].tags }.include? "TESTLOG"
reject { subject[2].tags }.include? "RABBIT"
reject { subject[2].tags }.include? "NO_ES"
insist { subject[2].type } == "nginx-access-clone2"
reject { subject[2].tags }.include? "TESTLOG"
insist { subject[2].tags }.include? "RABBIT"
insist { subject[2].tags }.include? "NO_ES"
end
end
end

View file

@ -1,7 +1,8 @@
require "test_utils"
require "logstash/filters/date"
describe LogStash::Filters::Date do
puts "Skipping date performance tests because this ruby is not jruby" if RUBY_ENGINE != "jruby"
RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do
extend LogStash::RSpec
describe "parsing with ISO8601" do

View file

@ -1,7 +1,8 @@
require "test_utils"
require "logstash/filters/date"
describe LogStash::Filters::Date do
puts "Skipping date tests because this ruby is not jruby" if RUBY_ENGINE != "jruby"
RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do
extend LogStash::RSpec
describe "performance test of java syntax parsing" do

View file

@ -33,7 +33,6 @@ describe LogStash::Filters::Json do
CONFIG
sample '{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }' do
puts subject.to_json
insist { subject["data"]["hello"] } == "world"
insist { subject["data"]["list" ] } == [1,2,3]
insist { subject["data"]["hash"] } == { "k" => "v" }

View file

@ -56,13 +56,17 @@ module LogStash
e = { "message" => e } if e.is_a?(String)
next LogStash::Event.new(e)
end
results = []
count = 0
pipeline.instance_eval { @filters.each(&:register) }
event.each do |e|
pipeline.filter(e)
results << e unless e.cancelled?
extra = []
pipeline.filter(e) do |new_event|
extra << new_event
end
results << e if !e.cancelled?
results += extra.reject(&:cancelled?)
end
# TODO(sissel): pipeline flush needs to be implemented.
@ -87,7 +91,7 @@ module LogStash
def agent(&block)
@agent_count ||= 0
require "logstash/agent"
require "logstash/pipeline"
# scoping is hard, let's go shopping!
config_str = @config_str