- add a new 'split' filter for splitting like ruby String#split

Splits into multiple events.
This commit is contained in:
Jordan Sissel 2011-08-17 00:50:42 -07:00
parent 6a69dcf231
commit 46f2fa2a0d
2 changed files with 90 additions and 4 deletions

View file

@ -0,0 +1,65 @@
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/time"
# The split filter is for splitting multiline messages into separate events.
#
# An example use case of this filter is for taking output from the 'exec' input
# which emits one event for the whole output of a command and splitting that
# output by newline - making each line an event.
#
# The end result of each split is a complete copy of the event
# with only the current split section of the given field changed.
class LogStash::Filters::Split < LogStash::Filters::Base
config_name "split"
# [dateformats]: http://download.oracle.com/javase/1.4.2/docs/api/java/text/SimpleDateFormat.html
config /[A-Za-z0-9_-]+/, :validate => :array
# The string to split on. This is usually a line terminator, but can be any
# string.
config :terminator, :validate => :string, :default => "\n"
# The field which value is split by the terminator
config :field, :validate => :string, :default => "@message"
public
def register
# Nothing to do
end # def register
public
def filter(event)
return unless event.type == @type or @type.nil?
events = []
original_value = event[@field]
# If for some reason the field is an array of values, take the first only.
original_value = original_value.first if original_value.is_a?(Array)
# Using -1 for 'limit' on String#split makes ruby not drop trailing empty
# splits.
splits = original_value.split(@terminator, -1)
# Skip filtering if splitting this event resulted in only one thing found.
return if splits.length == 1
#or splits[1].empty?
splits.each do |value|
next if value.empty?
event_split = event.clone
@logger.debug(["Split event", { :value => value, :field => @field }])
event_split[@field] = value
filter_matched(event_split)
# Push this new event onto the stack at the LogStash::FilterWorker
yield event_split
end
# Cancel this event, we'll use the newly generated ones above.
event.cancel
end # def filter
end # class LogStash::Filters::Date

View file

@ -26,9 +26,30 @@ class LogStash::FilterWorker < LogStash::Plugin
break
end
# TODO(sissel): Handle exceptions? Retry? Drop it?
filter(event)
end # while @input_queue.pop
end
def filter(original_event)
# TODO(sissel): Handle exceptions? Retry? Drop it?
# Make an 'events' array that filters can push onto if they
# need to generate additional events based on the current event.
# The 'split' filter does this, for example.
events = [original_event]
events.each do |event|
@filters.each do |filter|
filter.filter(event)
# Filter can emit multiple events, like the 'split' event, so
# give the input queue to dump generated events into.
# TODO(sissel): This may require some refactoring later, I am not sure
# this is the best approach. The goal is to allow filters to modify
# the current event, but if necessary, create new events based on
# this event.
filter.filter(event) do |newevent|
events << newevent
end
if event.cancelled?
@logger.debug({:message => "Event cancelled",
:event => event,
@ -40,6 +61,6 @@ class LogStash::FilterWorker < LogStash::Plugin
@logger.debug(["Event finished filtering", event])
@output_queue.push(event) unless event.cancelled?
end # while @input_queue.pop
end
end # events.each
end # def filter
end # class LogStash::FilterWorker