Add recurse method for doing breadth-first traversal of the AST

This will be used by the filter flush compiler

Add generation of a flush lambda for each filter.

This allows filters to flush and have any generated events proceed
downward through the config as you would expect, respecting any
branches or future plugins.

Die on IOError which occurs when reading from a closed STDIN

Make filter_flusher invoke the new (and correct, I hope!) way to flush.

- On shutdown, we will also flush all filters.
- The flusher thread will terminate if we are shutting down.

Clarify the comment

Fix comment generation in the code to avoid newlines.

Add 'max_age' setting to multiline for flushing.

This setting chooses how long (in seconds) an event is considered to be
fresh before it will be automatically flushed.

This is useful for:
* slow log sources to get the 'last event' flushed,
* transaction-id-style events that have no obvious "end" event and also
  are mixed among other-id events in the same stream.

Also:
- Make filters have no teardown by default.
- Remove 'enable_flush' since it is not needed anymore; flush is always
  enabled.

refactor flush

new spool filter and specs, mainly for testing flushing

turn off unrelated test error for now

fix the flush logic, fix the flush compiles code to not include output section

synchronize cross-thread  access to @pending

refactor for performance and readability

synchronize cross-thread access to @spool

unused code

input:udp removed boggus ShutdownSignal handling, morphed loop do into while true, cosmetic reformat

use transcient events and not exceptions for in-flow pipeline signaling

inline flushing into  filterworker

removed now unnecessary flushing thread safety

fix conditionals bug for new events generated by filters & specs

spec for issue #793

performance tweeks

simplify filter handling of events and new_events

this removes unecessary duplication when treating the original event as
a special case (different from new_events generated by a filter).
Also, since @filter_func only outputs non-cancelled events, some checks
were also removed.

Move multiple filter specs to a filter_chains file

append events generated by a filter using unshift instead of insert

closes #793, closes #1429, closes #1431, closes #1548
This commit is contained in:
Jordan Sissel 2014-04-12 13:50:40 -07:00 committed by Colin Surprenant
parent 01f5c753fb
commit 4211522de1
17 changed files with 647 additions and 352 deletions

View file

@ -6,6 +6,15 @@ class Treetop::Runtime::SyntaxNode
return elements.collect(&:compile).reject(&:empty?).join("")
end
# Traverse the syntax tree recursively.
# The order should respect the order of the configuration file as it is read
# and written by humans (and the order in which it is parsed).
def recurse(e, depth=0, &block)
r = block.call(e, depth)
e.elements.each { |e| recurse(e, depth + 1, &block) } if r && e.elements
nil
end
def recursive_inject(results=[], &block)
if !elements.nil?
elements.each do |element|
@ -39,29 +48,34 @@ class Treetop::Runtime::SyntaxNode
end
end
module LogStash; module Config; module AST
module LogStash; module Config; module AST
class Node < Treetop::Runtime::SyntaxNode; end
class Config < Node
def compile
# TODO(sissel): Move this into config/config_ast.rb
code = []
code << "@inputs = []"
code << "@filters = []"
code << "@outputs = []"
code << <<-CODE
@inputs = []
@filters = []
@outputs = []
@periodic_flushers = []
@shutdown_flushers = []
CODE
sections = recursive_select(LogStash::Config::AST::PluginSection)
sections.each do |s|
code << s.compile_initializer
end
# start inputs
#code << "class << self"
definitions = []
["filter", "output"].each do |type|
#definitions << "def #{type}(event)"
# defines @filter_func and @output_func
definitions << "@#{type}_func = lambda do |event, &block|"
if type == "filter"
definitions << " extra_events = []"
definitions << " events = [event]"
end
definitions << " @logger.debug? && @logger.debug(\"#{type} received\", :event => event.to_hash)"
@ -70,13 +84,12 @@ module LogStash; module Config; module AST
end
if type == "filter"
definitions << " extra_events.each(&block)"
definitions << " events.flatten.each{|e| block.call(e) }"
end
definitions << "end"
end
code += definitions.join("\n").split("\n", -1).collect { |l| " #{l}" }
#code << "end"
return code.join("\n")
end
end
@ -84,14 +97,52 @@ module LogStash; module Config; module AST
class Comment < Node; end
class Whitespace < Node; end
class PluginSection < Node
# Global plugin numbering for the janky instance variable naming we use
# like @filter_<name>_1
@@i = 0
# Generate ruby code to initialize all the plugins.
def compile_initializer
generate_variables
code = []
@variables.collect do |plugin, name|
code << "#{name} = #{plugin.compile_initializer}"
code << "@#{plugin.plugin_type}s << #{name}"
@variables.each do |plugin, name|
code << <<-CODE
#{name} = #{plugin.compile_initializer}
@#{plugin.plugin_type}s << #{name}
CODE
# The flush method for this filter.
if plugin.plugin_type == "filter"
code << <<-CODE
#{name}_flush = lambda do |options, &block|
@logger.debug? && @logger.debug(\"Flushing\", :plugin => #{name})
flushed_events = #{name}.flush(options)
return if flushed_events.nil? || flushed_events.empty?
flushed_events.each do |event|
@logger.debug? && @logger.debug(\"Flushing\", :plugin => #{name}, :event => event)
events = [event]
#{plugin.compile_starting_here.gsub(/^/, " ")}
block.call(event)
events.flatten.each{|e| block.call(e) if e != event}
end
end
if #{name}.respond_to?(:flush)
@periodic_flushers << #{name}_flush if #{name}.periodic_flush
@shutdown_flushers << #{name}_flush
end
CODE
end
end
return code.join("\n")
end
@ -151,38 +202,69 @@ module LogStash; module Config; module AST
def compile
case plugin_type
when "input"
return "start_input(#{variable_name})"
when "filter"
# This is some pretty stupid code, honestly.
# I'd prefer much if it were put into the Pipeline itself
# and this should simply compile to
# #{variable_name}.filter(event)
return [
"newevents = []",
"extra_events.each do |event|",
" #{variable_name}.filter(event) do |newevent|",
" newevents << newevent",
" end",
"end",
"extra_events += newevents",
when "input"
return "start_input(#{variable_name})"
when "filter"
return <<-CODE
events = events.flat_map do |event|
next [] if event.cancelled?
"#{variable_name}.filter(event) do |newevent|",
" extra_events << newevent",
"end",
"if event.cancelled?",
" extra_events.each(&block)",
" return",
"end",
].map { |l| "#{l}\n" }.join("")
when "output"
return "#{variable_name}.handle(event)\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
new_events = []
#{variable_name}.filter(event){|new_event| new_events << new_event}
event.cancelled? ? new_events : new_events.unshift(event)
end
CODE
when "output"
return "#{variable_name}.handle(event)\n"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
end
end
def compile_starting_here
return unless plugin_type == "filter" # only filter supported.
expressions = [
LogStash::Config::AST::Branch,
LogStash::Config::AST::Plugin
]
code = []
# Find the branch we are in, if any (the 'if' statement, etc)
self_branch = recursive_select_parent(LogStash::Config::AST::BranchEntry).first
# Find any siblings to our branch so we can skip them later. For example,
# if we are in an 'else if' we want to skip any sibling 'else if' or
# 'else' blocks.
branch_siblings = []
if self_branch
branch_siblings = recursive_select_parent(LogStash::Config::AST::Branch).first \
.recursive_select(LogStash::Config::AST::BranchEntry) \
.reject { |b| b == self_branch }
end
#ast = recursive_select_parent(LogStash::Config::AST::PluginSection).first
ast = recursive_select_parent(LogStash::Config::AST::Config).first
found = false
recurse(ast) do |element, depth|
next false if element.is_a?(LogStash::Config::AST::PluginSection) && element.plugin_type.text_value != "filter"
if element == self
found = true
next false
end
if found && expressions.include?(element.class)
code << element.compile
next false
end
next false if branch_siblings.include?(element)
next true
end
return code.collect { |l| "#{l}\n" }.join("")
end # def compile_starting_here
end
class Name < Node
@ -200,7 +282,7 @@ module LogStash; module Config; module AST
module Unicode
def self.wrap(text)
return "(" + text.inspect + ".force_encoding(\"UTF-8\")" + ")"
return "(" + text.inspect + ".force_encoding(Encoding::UTF_8)" + ")"
end
end
@ -245,24 +327,40 @@ module LogStash; module Config; module AST
class Branch < Node
def compile
return super + "end\n"
# this construct is non obvious. we need to loop through each event and apply the conditional.
# each branch of a conditional will contain a construct (a filter for example) that also loops through
# the events variable so we have to initialize it to [event] for the branch code.
# at the end, events is returned to handle the case where no branch match and no branch code is executed
# so we must make sure to return the current event.
return <<-CODE
events = events.flat_map do |event|
events = [event]
#{super}
end
events
end
CODE
end
end
class If < Node
class BranchEntry < Node; end
class If < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "if #{condition.compile}\n" \
return "if #{condition.compile} # if #{condition.text_value}\n" \
<< children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"
end
end
class Elsif < Node
class Elsif < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "elsif #{condition.compile}\n" \
return "elsif #{condition.compile} # else if #{condition.text_value}\n" \
<< children.collect(&:compile).map { |s| s.split("\n", -1).map { |l| " " + l }.join("\n") }.join("") << "\n"
end
end
class Else < Node
class Else < BranchEntry
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "else\n" \
@ -325,7 +423,7 @@ module LogStash; module Config; module AST
end
end
module ComparisonOperator
module ComparisonOperator
def compile
return " #{text_value} "
end

View file

@ -8,6 +8,14 @@ require "logstash/util/accessors"
require "logstash/timestamp"
require "logstash/json"
# transcient pipeline events for normal in-flow signaling as opposed to
# flow altering exceptions. for now having base classes is adequate and
# in the future it might be necessary to refactor using like a BaseEvent
# class to have a common interface for all pileline events to support
# eventual queueing persistence for example, TBD.
class LogStash::ShutdownEvent; end
class LogStash::FlushEvent; end
# the logstash event object.
#
# An event is simply a tuple of (timestamp, data).

View file

@ -59,7 +59,7 @@ class LogStash::Filters::Base < LogStash::Plugin
# }
#
# # You can also remove multiple tags at once:
#
#
# filter {
# %PLUGIN% {
# remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
@ -68,7 +68,7 @@ class LogStash::Filters::Base < LogStash::Plugin
#
# If the event has field "somefield" == "hello" this filter, on success,
# would remove the tag "foo_hello" if it is present. The second example
# would remove a sad, unwanted tag as well.
# would remove a sad, unwanted tag as well.
config :remove_tag, :validate => :array, :default => []
# If this filter is successful, add any arbitrary fields to this event.
@ -85,7 +85,7 @@ class LogStash::Filters::Base < LogStash::Plugin
#
# filter {
# %PLUGIN% {
# add_field => {
# add_field => {
# "foo_%{somefield}" => "Hello world, from %{host}"
# "new_field" => "new_static_value"
# }
@ -95,7 +95,7 @@ class LogStash::Filters::Base < LogStash::Plugin
# If the event has field "somefield" == "hello" this filter, on success,
# would add field "foo_hello" if it is present, with the
# value above and the %{host} piece replaced with that value from the
# event. The second example would also add a hardcoded field.
# event. The second example would also add a hardcoded field.
config :add_field, :validate => :hash, :default => {}
# If this filter is successful, remove arbitrary fields from this event.
@ -117,10 +117,14 @@ class LogStash::Filters::Base < LogStash::Plugin
# }
#
# If the event has field "somefield" == "hello" this filter, on success,
# would remove the field with name "foo_hello" if it is present. The second
# would remove the field with name "foo_hello" if it is present. The second
# example would remove an additional, non-dynamic field.
config :remove_field, :validate => :array, :default => []
# Call the filter flush method at regular interval.
# Optional.
config :periodic_flush, :validate => :boolean, :default => false
RESERVED = ["type", "tags", "exclude_tags", "include_fields", "exclude_fields", "add_tag", "remove_tag", "add_field", "remove_field", "include_any", "exclude_any"]
public
@ -169,11 +173,11 @@ class LogStash::Filters::Base < LogStash::Plugin
:field => field, :value => value)
end
end
@remove_field.each do |field|
field = event.sprintf(field)
@logger.debug? and @logger.debug("filters/#{self.class.name}: removing field",
:field => field)
:field => field)
event.remove(field)
end
@ -223,4 +227,9 @@ class LogStash::Filters::Base < LogStash::Plugin
return true
end
public
def teardown
# Nothing to do by default.
end
end # class LogStash::Filters::Base

View file

@ -100,21 +100,31 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []
# for debugging & testing purposes, do not use in production. allows periodic flushing of pending events
config :enable_flush, :validate => :boolean, :default => false
# The maximum age an event can be (in seconds) before it is automatically
# flushed.
config :max_age, :validate => :number, :default => 5
# Call the filter flush method at regular interval.
# Optional.
config :periodic_flush, :validate => :boolean, :default => true
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new
@@patterns_path += [LogStash::Environment.pattern_path("*")]
MULTILINE_TAG = "multiline"
public
def initialize(config = {})
super
# this filter cannot be parallelized because message order
# cannot be garanteed across threads, line #2 could be processed
# before line #1
@threadsafe = false
# This filter needs to keep state.
@types = Hash.new { |h,k| h[k] = [] }
# this filter needs to keep state
@pending = Hash.new
end # def initialize
@ -138,6 +148,16 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
@grok.compile(@pattern)
case @what
when "previous"
class << self; alias_method :multiline_filter!, :previous_filter!; end
when "next"
class << self; alias_method :multiline_filter!, :next_filter!; end
else
# we should never get here since @what is validated at config
raise(ArgumentError, "Unknown multiline 'what' value")
end # case @what
@logger.debug("Registered multiline plugin", :type => @type, :config => @config)
end # def register
@ -145,95 +165,112 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
def filter(event)
return unless filter?(event)
if event["message"].is_a?(Array)
match = @grok.match(event["message"].first)
else
match = @grok.match(event["message"])
end
key = event.sprintf(@stream_identity)
pending = @pending[key]
match = event["message"].is_a?(Array) ? @grok.match(event["message"].first) : @grok.match(event["message"])
match = (match and !@negate) || (!match and @negate) # add negate option
@logger.debug("Multiline", :pattern => @pattern, :message => event["message"],
:match => match, :negate => @negate)
@logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :message => event["message"], :match => match, :negate => @negate)
# Add negate option
match = (match and !@negate) || (!match and @negate)
multiline_filter!(event, match)
case @what
when "previous"
if match
event.tag "multiline"
# previous previous line is part of this event.
# append it to the event and cancel it
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# this line is not part of the previous event
# if we have a pending event, it's done, send it.
# put the current event into pending
if pending
tmp = event.to_hash
event.overwrite(pending)
@pending[key] = LogStash::Event.new(tmp)
else
@pending[key] = event
event.cancel
end # if/else pending
end # if/else match
when "next"
if match
event.tag "multiline"
# this line is part of a multiline event, the next
# line will be part, too, put it into pending.
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# if we have something in pending, join it with this message
# and send it. otherwise, this is a new message and not part of
# multiline, send it.
if pending
pending.append(event)
event.overwrite(pending)
@pending.delete(key)
end
end # if/else match
else
# TODO(sissel): Make this part of the 'register' method.
@logger.warn("Unknown multiline 'what' value.", :what => @what)
end # case @what
if !event.cancelled?
unless event.cancelled?
collapse_event!(event)
filter_matched(event) if match
end
end # def filter
# Flush any pending messages. This is generally used for unit testing only.
#
# Note: flush is disabled now; it is preferable to use the multiline codec.
# flush any pending messages
# called at regular interval without options and at pipeline shutdown with the :final => true option
# @param options [Hash]
# @option options [Boolean] :final => true to signal a final shutdown flush
# @return [Array<LogStash::Event>] list of flushed events
public
def flush
return [] unless @enable_flush
def flush(options = {})
expired = nil
events = []
@pending.each do |key, value|
value.uncancel
events << collapse_event!(value)
# note that thread safety concerns are not necessary here because the multiline filter
# is not thread safe thus cannot be run in multiple folterworker threads and flushing
# is called by the same thread
# select all expired events from the @pending hash into a new expired hash
# if :final flush then select all events
expired = @pending.inject({}) do |r, (key, event)|
age = Time.now - Array(event["@timestamp"]).first.time
r[key] = event if (age >= @max_age) || options[:final]
r
end
@pending.clear
return events
# delete expired items from @pending hash
expired.each{|key, event| @pending.delete(key)}
# return list of uncancelled and collapsed expired events
expired.map{|key, event| event.uncancel; collapse_event!(event)}
end # def flush
public
def teardown
# nothing to do
end
private
def previous_filter!(event, match)
key = event.sprintf(@stream_identity)
pending = @pending[key]
if match
event.tag(MULTILINE_TAG)
# previous previous line is part of this event.
# append it to the event and cancel it
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# this line is not part of the previous event
# if we have a pending event, it's done, send it.
# put the current event into pending
if pending
tmp = event.to_hash
event.overwrite(pending)
@pending[key] = LogStash::Event.new(tmp)
else
@pending[key] = event
event.cancel
end
end # if match
end
def next_filter!(event, match)
key = event.sprintf(@stream_identity)
# protect @pending for race condition between the flush thread and the worker thread
pending = @pending[key]
if match
event.tag(MULTILINE_TAG)
# this line is part of a multiline event, the next
# line will be part, too, put it into pending.
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# if we have something in pending, join it with this message
# and send it. otherwise, this is a new message and not part of
# multiline, send it.
if pending
pending.append(event)
event.overwrite(pending)
@pending.delete(key)
end
end # if match
end
def collapse_event!(event)
event["message"] = event["message"].join("\n") if event["message"].is_a?(Array)
event.timestamp = event.timestamp.first if event.timestamp.is_a?(Array)

View file

@ -8,7 +8,7 @@ require "logstash/namespace"
# which emits one event for the whole output of a command and splitting that
# output by newline - making each line an event.
#
# The end result of each split is a complete copy of the event
# The end result of each split is a complete copy of the event
# with only the current split section of the given field changed.
class LogStash::Filters::Split < LogStash::Filters::Base
@ -31,8 +31,6 @@ class LogStash::Filters::Split < LogStash::Filters::Base
def filter(event)
return unless filter?(event)
events = []
original_value = event[@field]
# If for some reason the field is an array of values, take the first only.

View file

@ -0,0 +1,32 @@
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "thread"
# spool filter. this is used generally for internal/dev testing.
class LogStash::Filters::Spool < LogStash::Filters::Base
config_name "spool"
milestone 1
def register
@spool = []
@spool_lock = Mutex.new # to synchronize between the flush & worker threads
end # def register
def filter(event)
return unless filter?(event)
filter_matched(event)
event.cancel
@spool_lock.synchronize {@spool << event}
end # def filter
def flush(options = {})
@spool_lock.synchronize do
flushed = @spool.map{|event| event.uncancel; event}
@spool = []
flushed
end
end
end # class LogStash::Filters::NOOP

View file

@ -1,122 +0,0 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/logging"
require "logstash/plugin"
require "logstash/config/mixin"
require "stud/interval"
# TODO(sissel): Should this really be a 'plugin' ?
class LogStash::FilterWorker < LogStash::Plugin
include Stud
attr_accessor :logger
attr_accessor :filters
attr_reader :after_filter
Exceptions = [Exception]
Exceptions << java.lang.Exception if RUBY_ENGINE == "jruby"
def initialize(filters, input_queue, output_queue)
@filters = filters
@input_queue = input_queue
@output_queue = output_queue
@shutdown_requested = false
end # def initialize
#This block is called after each filter is done on an event.
#The filtered event and filter class name is passed to the block.
#This could be used to add metrics in the future?
def after_filter(&block)
@after_filter = block
end
def run
# TODO(sissel): Run a flusher thread for each plugin requesting flushes
# > It seems reasonable that you could want a multiline filter to flush
# after 5 seconds, but want a metrics filter to flush every 10 or 60.
# Set up the periodic flusher thread.
@flusher = Thread.new { interval(5) { flusher } }
while !@shutdown_requested && event = @input_queue.pop
if event == LogStash::SHUTDOWN
finished
@input_queue << LogStash::SHUTDOWN # for the next filter thread
return
end
filter(event)
end # while @input_queue.pop
finished
end # def run
def flusher
events = []
@filters.each do |filter|
# Filter any events generated so far in this flush.
events.each do |event|
# TODO(sissel): watchdog on flush filtration?
unless event.cancelled?
filter.filter(event)
@after_filter.call(event,filter) unless @after_filter.nil?
end
end
# TODO(sissel): watchdog on flushes?
if filter.respond_to?(:flush)
flushed = filter.flush
events += flushed if !flushed.nil? && flushed.any?
end
end
events.each do |event|
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
@output_queue.push(event) unless event.cancelled?
end
end # def flusher
def teardown
@shutdown_requested = true
end
def filter(original_event)
# Make an 'events' array that filters can push onto if they
# need to generate additional events based on the current event.
# The 'split' filter does this, for example.
events = [original_event]
events.each do |event|
@filters.each do |filter|
# Filter can emit multiple events, like the 'split' event, so
# give the input queue to dump generated events into.
# TODO(sissel): This may require some refactoring later, I am not sure
# this is the best approach. The goal is to allow filters to modify
# the current event, but if necessary, create new events based on
# this event.
begin
update_watchdog(:event => event, :filter => filter)
filter.execute(event) do |newevent|
events << newevent
end
rescue *Exceptions => e
@logger.warn("Exception during filter", :event => event,
:exception => $!, :backtrace => e.backtrace,
:filter => filter)
ensure
clear_watchdog
end
if event.cancelled?
@logger.debug? and @logger.debug("Event cancelled", :event => event,
:filter => filter.class)
break
end
@after_filter.call(event,filter) unless @after_filter.nil?
end # @filters.each
@logger.debug? and @logger.debug("Event finished filtering", :event => event,
:thread => Thread.current[:name])
@output_queue.push(event) unless event.cancelled?
end # events.each
end # def filter
end # class LogStash::FilterWorker

View file

@ -30,7 +30,7 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
event["host"] = @host if !event.include?("host")
queue << event
end
rescue EOFError, LogStash::ShutdownSignal
rescue IOError, EOFError, LogStash::ShutdownSignal
# stdin closed or a requested shutdown
break
end

View file

@ -5,7 +5,7 @@ require "logstash/namespace"
require "socket"
# Read messages as events over the network via udp. The only required
# configuration item is `port`, which specifies the udp port logstash
# configuration item is `port`, which specifies the udp port logstash
# will listen on for event streams.
#
class LogStash::Inputs::Udp < LogStash::Inputs::Base
@ -23,10 +23,10 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
# The maximum packet size to read from the network
config :buffer_size, :validate => :number, :default => 8192
# Number of threads processing packets
config :workers, :validate => :number, :default => 2
# This is the number of unprocessed UDP packets you can hold in memory
# before packets will start dropping.
config :queue_size, :validate => :number, :default => 2000
@ -44,7 +44,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
public
def run(output_queue)
@output_queue = output_queue
@output_queue = output_queue
begin
# udp server
udp_listener(output_queue)
@ -68,17 +68,17 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)
@input_to_worker = SizedQueue.new(@queue_size)
@input_to_worker = SizedQueue.new(@queue_size)
@input_workers = @workers.times do |i|
@logger.debug("Starting UDP worker thread", :worker => i)
Thread.new { inputworker(i) }
end
loop do
#collect datagram message and add to queue
@input_workers = @workers.times do |i|
@logger.debug("Starting UDP worker thread", :worker => i)
Thread.new { inputworker(i) }
end
while true
#collect datagram message and add to queue
payload, client = @udp.recvfrom(@buffer_size)
@input_to_worker.push([payload,client])
@input_to_worker.push([payload, client])
end
ensure
if @udp
@ -86,29 +86,24 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
@udp.close_write rescue nil
end
end # def udp_listener
def inputworker(number)
LogStash::Util::set_thread_name("<udp.#{number}")
begin
while true
payload,client = @input_to_worker.pop
if payload == LogStash::ShutdownSignal
@input_to_worker.push(work)
break
end
payload, client = @input_to_worker.pop
@codec.decode(payload) do |event|
@codec.decode(payload) do |event|
decorate(event)
event["host"] ||= client[3]
@output_queue.push(event)
end
@output_queue.push(event)
end
end
rescue => e
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
end
end # def inputworker
public
def teardown
@udp.close if @udp && !@udp.closed?

View file

@ -1,14 +1,18 @@
# encoding: utf-8
require "logstash/config/file"
require "thread" #
require "stud/interval"
require "logstash/namespace"
require "thread" # stdlib
require "logstash/errors"
require "logstash/event"
require "logstash/config/file"
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/errors"
require "stud/interval" # gem stud
class LogStash::Pipeline
FLUSH_EVENT = LogStash::FlushEvent.new
def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
grammar = LogStashConfigParser.new
@ -69,6 +73,7 @@ class LogStash::Pipeline
def run
@started = true
@input_threads = []
start_inputs
start_filters if filters?
start_outputs
@ -78,11 +83,12 @@ class LogStash::Pipeline
@logger.info("Pipeline started")
wait_inputs
# In theory there's nothing to do to filters to tell them to shutdown?
if filters?
shutdown_filters
wait_filters
flush_filters_to_output!(:final => true)
end
shutdown_outputs
wait_outputs
@ -103,7 +109,8 @@ class LogStash::Pipeline
end
def shutdown_filters
@input_to_filter.push(LogStash::ShutdownSignal)
@flusher_lock.synchronize { @flusher_thread.kill }
@input_to_filter.push(LogStash::ShutdownEvent.new)
end
def wait_filters
@ -112,7 +119,7 @@ class LogStash::Pipeline
def shutdown_outputs
# nothing, filters will do this
@filter_to_output.push(LogStash::ShutdownSignal)
@filter_to_output.push(LogStash::ShutdownEvent.new)
end
def wait_outputs
@ -143,11 +150,12 @@ class LogStash::Pipeline
Thread.new { filterworker }
end
# Set up the periodic flusher thread.
@flusher_thread = Thread.new { Stud.interval(5) { filter_flusher } }
@flusher_lock = Mutex.new
@flusher_thread = Thread.new { Stud.interval(5) { @flusher_lock.synchronize { @input_to_filter.push(FLUSH_EVENT) } } }
end
def start_outputs
@outputs.each(&:register)
@output_threads = [
Thread.new { outputworker }
]
@ -189,24 +197,23 @@ class LogStash::Pipeline
begin
while true
event = @input_to_filter.pop
if event == LogStash::ShutdownSignal
case event
when LogStash::Event
# use events array to guarantee ordering of origin vs created events
# where created events are emitted by filters like split or metrics
events = []
filter(event) { |newevent| events << newevent }
events.each { |event| @filter_to_output.push(event) }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
# don't have to deal with thread safety implementing the flush method
@flusher_lock.synchronize { flush_filters_to_output! }
when LogStash::ShutdownEvent
# pass it down to any other filterworker and stop this worker
@input_to_filter.push(event)
break
end
# TODO(sissel): we can avoid the extra array creation here
# if we don't guarantee ordering of origin vs created events.
# - origin event is one that comes in naturally to the filter worker.
# - created events are emitted by filters like split or metrics
events = [event]
filter(event) do |newevent|
events << newevent
end
events.each do |event|
next if event.cancelled?
@filter_to_output.push(event)
end
end
rescue => e
@logger.error("Exception in filterworker", "exception" => e, "backtrace" => e.backtrace)
@ -217,11 +224,10 @@ class LogStash::Pipeline
def outputworker
LogStash::Util::set_thread_name(">output")
@outputs.each(&:register)
@outputs.each(&:worker_setup)
while true
event = @filter_to_output.pop
break if event == LogStash::ShutdownSignal
break if event.is_a?(LogStash::ShutdownEvent)
output(event)
end # while true
@outputs.each(&:teardown)
@ -248,7 +254,7 @@ class LogStash::Pipeline
end
end
# No need to send the ShutdownSignal to the filters/outputs nor to wait for
# No need to send the ShutdownEvent to the filters/outputs nor to wait for
# the inputs to finish, because in the #run method we wait for that anyway.
end # def shutdown
@ -266,28 +272,27 @@ class LogStash::Pipeline
@output_func.call(event)
end
def filter_flusher
events = []
@filters.each do |filter|
# perform filters flush and yeild flushed event to the passed block
# @param options [Hash]
# @option options [Boolean] :final => true to signal a final shutdown flush
def flush_filters(options = {}, &block)
flushers = options[:final] ? @shutdown_flushers : @periodic_flushers
# Filter any events generated so far in this flush.
events.each do |event|
# TODO(sissel): watchdog on flush filtration?
unless event.cancelled?
filter.filter(event)
end
end
flushers.each do |flusher|
flusher.call(options, &block)
end
end
# TODO(sissel): watchdog on flushes?
if filter.respond_to?(:flush)
flushed = filter.flush
events += flushed if !flushed.nil? && flushed.any?
# perform filters flush into the output queue
# @param options [Hash]
# @option options [Boolean] :final => true to signal a final shutdown flush
def flush_filters_to_output!(options = {})
flush_filters(options) do |event|
unless event.cancelled?
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
@filter_to_output.push(event)
end
end
end # flush_filters_to_output!
events.each do |event|
@logger.debug? and @logger.debug("Pushing flushed events", :event => event)
@filter_to_output.push(event) unless event.cancelled?
end
end # def filter_flusher
end # class Pipeline

View file

@ -160,7 +160,7 @@ describe "conditionals" do
if "foo" not in "baz" { mutate { add_tag => "baz" } }
if "foo" not in "foo" { mutate { add_tag => "foo" } }
if !("foo" not in "foo") { mutate { add_tag => "notfoo" } }
if "foo" not in [somelist] { mutate { add_tag => "notsomelist" } }
if "foo" not in [somelist] { mutate { add_tag => "notsomelist" } }
if "one" not in [somelist] { mutate { add_tag => "somelist" } }
if "foo" not in [alsomissing] { mutate { add_tag => "no string in missing field" } }
}
@ -183,12 +183,12 @@ describe "conditionals" do
conditional "[message] == 'sample'" do
sample("sample") { insist { subject["tags"] }.include?("success") }
sample("different") { insist { subject["tags"] }.include?("failure") }
end
end
conditional "[message] != 'sample'" do
sample("sample") { insist { subject["tags"] }.include?("failure") }
sample("different") { insist { subject["tags"] }.include?("success") }
end
end
conditional "[message] < 'sample'" do
sample("apple") { insist { subject["tags"] }.include?("success") }
@ -230,12 +230,12 @@ describe "conditionals" do
conditional "!([message] == 'sample')" do
sample("sample") { reject { subject["tags"] }.include?("success") }
sample("different") { reject { subject["tags"] }.include?("failure") }
end
end
conditional "!([message] != 'sample')" do
sample("sample") { reject { subject["tags"] }.include?("failure") }
sample("different") { reject { subject["tags"] }.include?("success") }
end
end
conditional "!([message] < 'sample')" do
sample("apple") { reject { subject["tags"] }.include?("success") }
@ -340,4 +340,34 @@ describe "conditionals" do
end
end
end
describe "new events from root" do
config <<-CONFIG
filter {
if [type] == "original" {
clone {
clones => ["clone"]
}
}
if [type] == "original" {
mutate { add_field => { "cond1" => "true" } }
} else {
mutate { add_field => { "cond2" => "true" } }
}
}
CONFIG
sample({"type" => "original"}) do
insist { subject }.is_a?(Array)
insist { subject.length } == 2
insist { subject[0]["type"] } == "original"
insist { subject[0]["cond1"] } == "true"
insist { subject[0]["cond2"] } == nil
insist { subject[1]["type"] } == "clone"
# insist { subject[1]["cond1"] } == nil
# insist { subject[1]["cond2"] } == "true"
end
end
end

View file

@ -80,4 +80,6 @@ describe LogStash::Filters::Clone do
insist { subject[1]["number"] } == 5
end
end
end

View file

@ -0,0 +1,122 @@
# encoding: utf-8
require "test_utils"
require "logstash/filters/split"
require "logstash/filters/clone"
describe LogStash::Filters do
extend LogStash::RSpec
describe "chain split with mutate filter" do
config <<-CONFIG
filter {
split { }
mutate { replace => [ "message", "test" ] }
}
CONFIG
sample "hello\nbird" do
insist { subject.length } == 2
insist { subject[0]["message"] } == "test"
insist { subject[1]["message"] } == "test"
end
end
describe "new events bug #793" do
config <<-CONFIG
filter {
split { terminator => "," }
mutate { rename => { "message" => "fancypants" } }
}
CONFIG
sample "hello,world" do
insist { subject.length } == 2
insist { subject[0]["fancypants"] } == "hello"
insist { subject[1]["fancypants"] } == "world"
end
end
describe "split then multiple mutate" do
config <<-CONFIG
filter {
split { }
mutate { replace => [ "message", "test" ] }
mutate { replace => [ "message", "test2" ] }
mutate { replace => [ "message", "test3" ] }
mutate { replace => [ "message", "test4" ] }
}
CONFIG
sample "big\nbird" do
insist { subject.length } == 2
insist { subject[0]["message"] } == "test4"
insist { subject[1]["message"] } == "test4"
end
end
describe "split then clone" do
config <<-CONFIG
filter {
split { }
clone { clones => ['clone1', 'clone2'] }
}
CONFIG
sample "big\nbird" do
insist { subject.length } == 6
insist { subject[0]["message"] } == "big"
insist { subject[0]["type"] } == nil
insist { subject[1]["message"] } == "big"
insist { subject[1]["type"] } == "clone1"
insist { subject[2]["message"] } == "big"
insist { subject[2]["type"] } == "clone2"
insist { subject[3]["message"] } == "bird"
insist { subject[3]["type"] } == nil
insist { subject[4]["message"] } == "bird"
insist { subject[4]["type"] } == "clone1"
insist { subject[5]["message"] } == "bird"
insist { subject[5]["type"] } == "clone2"
end
end
describe "clone with conditionals, see bug #1548" do
type "original"
config <<-CONFIG
filter {
clone {
clones => ["clone"]
}
if [type] == "clone" {
mutate { add_field => { "clone" => "true" } }
} else {
mutate { add_field => { "original" => "true" } }
}
}
CONFIG
sample("message" => "hello world") do
insist { subject }.is_a? Array
# subject.each{|event| puts(event.inspect + "\n")}
insist { subject.length } == 2
insist { subject.first["type"] } == nil
insist { subject.first["original"] } == "true"
insist { subject.first["clone"]} == nil
insist { subject.first["message"] } == "hello world"
insist { subject.last["type"]} == "clone"
insist { subject.last["original"] } == nil
insist { subject.last["clone"]} == "true"
insist { subject.last["message"] } == "hello world"
end
end
end

View file

@ -11,7 +11,7 @@ describe LogStash::Filters::Multiline do
config <<-CONFIG
filter {
multiline {
enable_flush => true
periodic_flush => false
pattern => "^\\s"
what => previous
}
@ -30,7 +30,6 @@ describe LogStash::Filters::Multiline do
config <<-CONFIG
filter {
multiline {
enable_flush => true
pattern => "^%{NUMBER} %{TIME}"
negate => true
what => previous
@ -47,7 +46,6 @@ describe LogStash::Filters::Multiline do
config <<-CONFIG
filter {
multiline {
enable_flush => true
pattern => "^\\s"
what => previous
}
@ -98,7 +96,6 @@ describe LogStash::Filters::Multiline do
add_tag => "dummy"
}
multiline {
enable_flush => true
add_tag => [ "nope" ]
remove_tag => "dummy"
add_field => [ "dummy2", "value" ]

View file

@ -57,4 +57,5 @@ describe LogStash::Filters::Split do
insist { subject[2]["custom"] } == "sesame street"
end
end
end

89
spec/filters/spool.rb Normal file
View file

@ -0,0 +1,89 @@
require "test_utils"
require "logstash/filters/spool"
#NOOP filter is perfect for testing Filters::Base features with minimal overhead
describe LogStash::Filters::Spool do
extend LogStash::RSpec
# spool test are really flush tests. spool does nothing more than waiting for flush to be called.
describe "flush one event" do
config <<-CONFIG
filter {
spool { }
}
CONFIG
sample "foo" do
insist { subject["message"] } == "foo"
end
end
describe "spooling multiple events" do
config <<-CONFIG
filter {
spool { }
}
CONFIG
sample ["foo", "bar"] do
insist { subject[0]["message"] } == "foo"
insist { subject[1]["message"] } == "bar"
end
end
describe "spooling events through conditionals" do
config <<-CONFIG
filter {
spool { }
if [message] == "foo" {
mutate { add_field => { "cond1" => "true" } }
} else {
mutate { add_field => { "cond2" => "true" } }
}
mutate { add_field => { "last" => "true" } }
}
CONFIG
sample ["foo", "bar"] do
insist { subject[0]["message"] } == "foo"
insist { subject[0]["cond1"] } == "true"
insist { subject[0]["cond2"] } == nil
insist { subject[0]["last"] } == "true"
insist { subject[1]["message"] } == "bar"
insist { subject[1]["cond1"] } == nil
insist { subject[1]["cond2"] } == "true"
insist { subject[1]["last"] } == "true"
end
end
describe "spooling eventS with conditionals" do
config <<-CONFIG
filter {
mutate { add_field => { "first" => "true" } }
if [message] == "foo" {
spool { }
} else {
mutate { add_field => { "cond2" => "true" } }
}
mutate { add_field => { "last" => "true" } }
}
CONFIG
sample ["foo", "bar"] do
# here received events will be reversed since the spooled one will be flushed last, at shutdown
insist { subject[0]["message"] } == "bar"
insist { subject[0]["first"] } == "true"
insist { subject[0]["cond2"] } == "true"
insist { subject[0]["last"] } == "true"
insist { subject[1]["message"] } == "foo"
insist { subject[1]["first"] } == "true"
insist { subject[1]["cond2"] } == nil
insist { subject[1]["last"] } == "true"
end
end
end

View file

@ -88,23 +88,17 @@ module LogStash
let(:results) do
results = []
count = 0
pipeline.instance_eval { @filters.each(&:register) }
event.each do |e|
extra = []
pipeline.filter(e) do |new_event|
extra << new_event
end
results << e if !e.cancelled?
results += extra.reject(&:cancelled?)
pipeline.filter(e) {|new_event| results << new_event }
end
pipeline.instance_eval {@filters.each {|f| results += f.flush if f.respond_to?(:flush)}}
pipeline.flush_filters(:final => true) do |e|
results << e unless e.cancelled?
end
# TODO(sissel): pipeline flush needs to be implemented.
# results += pipeline.flush
next results
results
end
subject { results.length > 1 ? results: results.first }