Merge branch 'new-config-parser'

This merge brings many many things:

* A new config parser with proper array, hash, and other value support.
* Experimental conditionals support is present.
* The 'agent2' rewrite of the agent is now the only agent.
* backwards compatibility to "old style hashes" in the config is supported
* the multiline filter has been replaced by a codec.

Test results on JRuby 1.7.4, rbx-head (2.0.0.rc1), MRI 1.9.3:

  % rvm 1.7.4,rbx-head,1.9.3 do bin/logstash rspec spec/filters/*.rb spec/codecs/*.rb

  Finished in 9.37 seconds
  217 examples, 0 failures

  Finished in 2.75 seconds
  182 examples, 0 failures

  Finished in 1.74 seconds
  182 examples, 0 failures
This commit is contained in:
Jordan Sissel 2013-06-14 23:11:30 -07:00
commit 95bc7dad86
51 changed files with 4572 additions and 3605 deletions

View file

@ -2,16 +2,14 @@
# general
- THE LOGSTASH EVENT SCHEMA HAS CHANGED.
TODO(sissel): Document what this means, etc.
- Kibana is now available for launching:
- Kibana 3 is now available in jar releases:
java -jar logstash.jar kibana
- elasticsearch version 0.90.0 is included.
- grok now defaults 'singles' to true, meaning captured fields are
stored as single values in most cases instead of the old behavior
of being captured as an array of values.
- the multiline filter is replaced by the multiline codec.
- Many deprecated features have been removed.
TODO(sissel): Document what these were.
- 'type' is no longer a required setting on inputs.
- feature: codecs. Used to abstract serialization/encoding out of
- feature: codecs. Used to abstract encoding/decoding out of
transports(inputs/outputs)
TODO(nickethier): Document how to use and how to hack.
@ -26,6 +24,10 @@
of appending to them.
- feature: the useragent filter now defaults to writing results to the top
level of the event instead of "ua"
- feature: grok now defaults 'singles' to true, meaning captured fields are
stored as single values in most cases instead of the old behavior of being
captured as an array of values.
- removed: the multiline filter is gone. See the multiline codec instead.
## outputs
- feature: irc: add messages_per_second tunable (LOGSTASH-962)

View file

@ -56,7 +56,7 @@ endif
# Compile config grammar (ragel -> ruby)
.PHONY: compile-grammar
compile-grammar: lib/logstash/config/grammar.rb
lib/logstash/config/grammar.rb: lib/logstash/config/grammar.rl
lib/logstash/config/grammar.rb: lib/logstash/config/grammar.treetop
$(QUIET)$(MAKE) -C lib/logstash/config grammar.rb
.PHONY: clean

File diff suppressed because it is too large Load diff

View file

@ -1,245 +0,0 @@
require "clamp" # gem 'clamp'
class LogStash::Agent2 < Clamp::Command
class ConfigurationError < StandardError; end
option ["-f", "--config"], "CONFIG_PATH",
I18n.t("logstash.agent.flag.config"),
:attribute_name => :config_path
option "-e", "CONFIG_STRING",
I18n.t("logstash.agent.flag.config-string"),
:attribute_name => :config_string
option ["-w", "--filterworkers"], "COUNT",
I18n.t("logstash.agent.flag.filterworkers"),
:attribute_name => :filter_workers, :default => 1, &:to_i
option "--watchdog-timeout", "SECONDS",
I18n.t("logstash.agent.flag.watchdog-timeout"),
:default => 10, &:to_f
option ["-l", "--log"], "FILE",
I18n.t("logstash.agent.flag.log"),
:attribute_name => :log_file
# Old support for the '-v' flag'
option "-v", :flag,
I18n.t("logstash.agent.flag.verbosity"),
:attribute_name => :verbosity, :multivalued => true
option "--quiet", :flag, I18n.t("logstash.agent.flag.quiet")
option "--verbose", :flag, I18n.t("logstash.agent.flag.verbose")
option "--debug", :flag, I18n.t("logstash.agent.flag.debug")
option ["-V", "--version"], :flag,
I18n.t("logstash.agent.flag.version")
option ["-p", "--pluginpath"] , "PATH",
I18n.t("logstash.agent.flag.pluginpath"),
:multivalued => true,
:attribute_name => :plugin_paths
# Emit a warning message.
def warn(message)
# For now, all warnings are fatal.
raise ConfigurationError, message
end # def warn
# Emit a failure message and abort.
def fail(message)
raise ConfigurationError, message
end # def fail
# Run the agent. This method is invoked after clamp parses the
# flags given to this program.
def execute
require "logstash/pipeline"
require "cabin" # gem 'cabin'
require "logstash/plugin"
@logger = Cabin::Channel.get(LogStash)
if version?
show_version
return 0
end
configure
# You must specify a config_string or config_path
if config_string.nil? && config_path.nil?
puts help
fail(I18n.t("logstash.agent.missing-configuration"))
end
if @config_path
@config_string = load_config(@config_path)
end
begin
pipeline = LogStash::Pipeline.new(@config_string)
rescue LoadError => e
fail("Configuration problem.")
end
# Make SIGINT shutdown the pipeline.
trap_id = Stud::trap("INT") do
@logger.warn(I18n.t("logstash.agent.interrupted"))
pipeline.shutdown
end
# TODO(sissel): Get pipeline completion status.
pipeline.run
return 0
rescue ConfigurationError, LogStash::Plugin::ConfigurationError => e
puts I18n.t("logstash.agent.error", :error => e)
return 1
rescue => e
puts I18n.t("oops", :error => e)
puts e.backtrace if @logger.debug?
return 1
ensure
Stud::untrap("INT", trap_id) unless trap_id.nil?
end # def execute
def show_version
show_version_logstash
if RUBY_PLATFORM == "java"
show_version_java
show_version_jruby
show_version_elasticsearch
end
# Was the -v or --v flag given? Show all gems, too.
show_gems if [:info, :debug].include?(verbosity?)
end # def show_version
def show_version_logstash
require "logstash/version"
puts "logstash #{LOGSTASH_VERSION}"
end # def show_version_logstash
def show_version_jruby
puts "jruby #{JRUBY_VERSION} (ruby #{RUBY_VERSION})"
end # def show_version_jruby
def show_version_elasticsearch
# Not running in the jar, assume elasticsearch jars are
# in ../../vendor/jar/...
if __FILE__ !~ /^(?:jar:)?file:/
jarpath = File.join(File.dirname(__FILE__), "../../vendor/jar/elasticsearch*/lib/*.jar")
Dir.glob(jarpath).each do |jar|
require jar
end
end
org.elasticsearch.Version::main([])
end # def show_version_elasticsearch
def show_version_java
properties = java.lang.System.getProperties
puts "java #{properties["java.version"]} (#{properties["java.vendor"]})"
puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}"
end # def show_version_java
def show_gems
require "rubygems"
Gem::Specification.each do |spec|
puts "gem #{spec.name} #{spec.version}"
end
end # def show_gems
# Do any start-time configuration.
#
# Log file stuff, plugin path checking, etc.
def configure
configure_logging(log_file)
configure_plugin_path(plugin_paths) if !plugin_paths.nil?
end # def configure
# Point logging at a specific path.
def configure_logging(path)
# Set with the -v (or -vv...) flag
if verbosity?
# this is an array with length of how many times the flag is given
if verbosity?.length == 1
@logger.level = :info
else
@logger.level = :debug
end
end
if quiet?
@logger.level = :error
elsif verbose?
@logger.level = :info
elsif debug?
@logger.level = :debug
else
@logger.level = :warn
end
if !log_file.nil?
# TODO(sissel): Implement file output/rotation in Cabin.
# TODO(sissel): Catch exceptions, report sane errors.
begin
file = File.new(path, "a")
rescue => e
fail(I18n.t("logstash.agent.configuration.log_file_failed",
:path => path, :error => e))
end
puts "Sending all output to #{path}."
@logger.subscribe(file)
else
@logger.subscribe(STDOUT)
end
# TODO(sissel): redirect stdout/stderr to the log as well
# http://jira.codehaus.org/browse/JRUBY-7003
end # def configure_logging
# Validate and add any paths to the list of locations
# logstash will look to find plugins.
def configure_plugin_path(paths)
# Append any plugin paths to the ruby search path
paths.each do |path|
# Verify the path exists
if !Dir.exists?(path)
warn(I18n.t("logstash.agent.configuration.plugin_path_missing",
:path => path))
end
# TODO(sissel): Verify the path looks like the correct form.
# aka, there must be file in path/logstash/{filters,inputs,outputs}/*.rb
plugin_glob = File.join(path, "logstash", "{inputs,filters,outputs}", "*.rb")
if Dir.glob(plugin_glob).empty?
warn(I18n.t("logstash.agent.configuration.no_plugins_found",
:path => path, :plugin_glob => plugin_glob))
end
# We push plugin paths to the front of the LOAD_PATH so that folks
# can override any core logstash plugins if they need to.
@logger.debug("Adding plugin path", :path => path)
$LOAD_PATH.unshift(path)
end
end # def configure_plugin_path
def load_config(path)
path = File.join(path, "*") if File.directory?(path)
if Dir.glob(path).length == 0
fail(I18n.t("logstash.agent.configuration.file-not-found", :path => path))
end
config = ""
Dir.glob(path).sort.each do |file|
next unless File.file?(file)
@logger.debug("Reading config file", :file => file)
config << File.read(file) + "\n"
end
return config
end # def load_config
end # class LogStash::Agent2

View file

@ -2,37 +2,17 @@ require "logstash/namespace"
require "logstash/event"
require "logstash/plugin"
require "logstash/logging"
require "extlib"
# This is the base class for logstash codecs.
module LogStash::Codecs
public
def self.for(codec)
return codec unless codec.is_a? String
#TODO: codec paths or just use plugin paths
plugin = File.join('logstash', 'codecs', codec) + ".rb"
#@logger.info "Loading codec", :codec => plugin
require plugin
klass_name = codec.camel_case
if LogStash::Codecs.const_defined?(klass_name)
return LogStash::Codecs.const_get(klass_name)
end
nil
end
class Base < LogStash::Plugin
include LogStash::Config::Mixin
config_name "codec"
attr_reader :on_event
attr_accessor :charset
public
def initialize(params={})
super
config_init(params)
register if respond_to?(:register)
end
public
@ -56,4 +36,4 @@ module LogStash::Codecs
end
end # class LogStash::Codecs::Base
end
end

View file

@ -0,0 +1,17 @@
require "logstash/codecs/base"
class LogStash::Codecs::Dots < LogStash::Codecs::Base
config_name "dots"
plugin_status "experimental"
public
def decode(data)
raise "Not implemented"
end # def decode
public
def encode(data)
@on_event.call(".")
end # def encode
end # class LogStash::Codecs::Dots

View file

@ -0,0 +1,175 @@
require "logstash/codecs/base"
# The multiline codec is for taking line-oriented text and merging them into a
# single event.
#
# The original goal of this codec was to allow joining of multi-line messages
# from files into a single event. For example - joining java exception and
# stacktrace messages into a single event.
#
# The config looks like this:
#
# input {
# stdin {
# codec => multiline {
# pattern => "pattern, a regexp"
# negate => true or false
# what => "previous" or "next"
# }
# }
# }
#
# The 'pattern' should match what you believe to be an indicator that the field
# is part of a multi-line event.
#
# The 'what' must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# The 'negate' can be "true" or "false" (defaults false). If true, a
# message not matching the pattern will constitute a match of the multiline
# filter and the what will be applied. (vice-versa is also true)
#
# For example, java stack traces are multiline and usually have the message
# starting at the far-left, then each subsequent line indented. Do this:
#
# input {
# stdin {
# codec => multiline {
# pattern => "^\s"
# what => "previous"
# }
# }
# }
#
# This says that any line starting with whitespace belongs to the previous line.
#
# Another example is C line continuations (backslash). Here's how to do that:
#
# filter {
# multiline {
# type => "somefiletype "
# pattern => "\\$"
# what => "next"
# }
# }
#
# This is the base class for logstash codecs.
class LogStash::Codecs::Multiline < LogStash::Codecs::Base
config_name "multiline"
plugin_status "experimental"
# The regular expression to match
config :pattern, :validate => :string, :required => true
# If the pattern matched, does event belong to the next or previous event?
config :what, :validate => ["previous", "next"], :required => true
# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false
# logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns.
#
# Pattern files are plain text with format:
#
# NAME PATTERN
#
# For example:
#
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []
# The character encoding used in this input. Examples include "UTF-8"
# and "cp1252"
#
# This setting is useful if your log files are in Latin-1 (aka cp1252)
# or in another character set other than UTF-8.
#
# This only affects "plain" format logs since json is UTF-8 already.
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"
public
def register
require "grok-pure" # rubygem 'jls-grok'
# Detect if we are running from a jarfile, pick the right path.
patterns_path = []
if __FILE__ =~ /file:\/.*\.jar!.*/
patterns_path += ["#{File.dirname(__FILE__)}/../../patterns/*"]
else
patterns_path += ["#{File.dirname(__FILE__)}/../../../patterns/*"]
end
@grok = Grok.new
@patterns_dir = patterns_path.to_a + @patterns_dir
@patterns_dir.each do |path|
# Can't read relative paths from jars, try to normalize away '../'
while path =~ /file:\/.*\.jar!.*\/\.\.\//
# replace /foo/bar/../baz => /foo/baz
path = path.gsub(/[^\/]+\/\.\.\//, "")
end
if File.directory?(path)
path = File.join(path, "*")
end
Dir.glob(path).each do |file|
@logger.info("Grok loading patterns from file", :path => file)
@grok.add_patterns_from_file(file)
end
end
@grok.compile(@pattern)
@logger.debug("Registered multiline plugin", :type => @type, :config => @config)
@buffer = []
@handler = method("do_#{@what}".to_sym)
end # def register
public
def decode(text, &block)
text.force_encoding(@charset)
if @charset != "UTF-8"
# Convert to UTF-8 if not in that character set.
text = text.encode("UTF-8", :invalid => :replace, :undef => :replace)
end
match = @grok.match(text)
@logger.debug("Multiline", :pattern => @pattern, :text => text,
:match => !match.nil?, :negate => @negate)
# Add negate option
match = (match and !@negate) || (!match and @negate)
@handler.call(text, match, &block)
end # def decode
def buffer(text)
@time = Time.now if @buffer.empty?
@buffer << text
end
def flush(&block)
if @buffer.any?
yield LogStash::Event.new("@timestamp" => @time, "message" => @buffer.join("\n"))
@buffer = []
end
end
def do_next(text, matched, &block)
buffer(text)
flush(&block) if !matched
end
def do_previous(text, matched, &block)
flush(&block) if !matched
buffer(text)
end
public
def encode(data)
# Nothing to do.
@on_event.call(data)
end # def encode
end # class LogStash::Codecs::Plain

View file

@ -6,7 +6,17 @@ class LogStash::Codecs::Plain < LogStash::Codecs::Base
plugin_status "experimental"
# TODO(sissel): Document.
config :format, :validate => :string, :default => nil
# The character encoding used in this input. Examples include "UTF-8"
# and "cp1252"
#
# This setting is useful if your log files are in Latin-1 (aka cp1252)
# or in another character set other than UTF-8.
#
# This only affects "plain" format logs since json is UTF-8 already.
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"
public
def decode(data)
@ -21,9 +31,9 @@ class LogStash::Codecs::Plain < LogStash::Codecs::Base
public
def encode(data)
if data.is_a? LogStash::Event and !@format.nil?
@on_event.call data.sprintf(@format)
@on_event.call(data.sprintf(@format))
else
@on_event.call data.to_s
@on_event.call(data.to_s)
end
end # def encode

View file

@ -0,0 +1,21 @@
require "logstash/codecs/base"
class LogStash::Codecs::RubyDebug < LogStash::Codecs::Base
config_name "rubydebug"
plugin_status "experimental"
def register
require "ap"
end
public
def decode(data)
raise "Not implemented"
end # def decode
public
def encode(data)
@on_event.call(data.to_hash.awesome_inspect + "\n")
end # def encode
end # class LogStash::Codecs::Dots

View file

@ -1,3 +1,4 @@
grammar.rb: grammar.rl
ragel -R grammar.rl
#ragel -R grammar.rl
grammar.rb: grammar.treetop
tt grammar.treetop

View file

@ -0,0 +1,318 @@
require "treetop"
class Treetop::Runtime::SyntaxNode
def compile
return "" if elements.nil?
return elements.collect(&:compile).reject(&:empty?).join("")
end
def recursive_inject(results=[], &block)
if !elements.nil?
elements.each do |element|
if block.call(element)
results << element
else
element.recursive_inject(results, &block)
end
end
end
return results
end
def recursive_select(klass)
return recursive_inject { |e| e.is_a?(klass) }
end
def recursive_inject_parent(results=[], &block)
if !parent.nil?
if block.call(parent)
results << parent
else
parent.recursive_inject_parent(results, &block)
end
end
return results
end
def recursive_select_parent(results=[], klass)
return recursive_inject_parent(results) { |e| e.is_a?(klass) }
end
end
module LogStash; module Config; module AST
class Node < Treetop::Runtime::SyntaxNode; end
class Config < Node
def compile
# TODO(sissel): Move this into config/config_ast.rb
code = []
code << "@inputs = []"
code << "@filters = []"
code << "@outputs = []"
sections = recursive_select(LogStash::Config::AST::PluginSection)
sections.each do |s|
code << s.compile_initializer
end
# start inputs
code << "class << self"
definitions = []
["filter", "output"].each do |type|
definitions << "def #{type}(event)"
if type == "filter"
definitions << " extra_events = []"
end
definitions << " @logger.info(\"#{type} received\", :event => event)"
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
definitions << s.compile.split("\n").map { |e| " #{e}" }.join("\n")
end
if type == "filter"
definitions << " extra_events.each { |e| yield e }"
end
definitions << "end"
end
code += definitions.join("\n").split("\n").collect { |l| " #{l}" }
code << "end"
return code.join("\n")
end
end
class Comment < Node; end
class Whitespace < Node; end
class PluginSection < Node
@@i = 0
# Generate ruby code to initialize all the plugins.
def compile_initializer
generate_variables
code = []
@variables.collect do |plugin, name|
code << "#{name} = #{plugin.compile_initializer}"
code << "@#{plugin.plugin_type}s << #{name}"
end
return code.join("\n")
end
def variable(object)
generate_variables
return @variables[object]
end
def generate_variables
return if !@variables.nil?
@variables = {}
plugins = recursive_select(Plugin)
plugins.each do |plugin|
# Unique number for every plugin.
@@i += 1
# store things as ivars, like @filter_grok_3
var = "@#{plugin.plugin_type}_#{plugin.plugin_name}_#{@@i}"
@variables[plugin] = var
end
return @variables
end
end
class Plugins < Node; end
class Plugin < Node
def plugin_type
if recursive_select_parent(Plugin).any?
return "codec"
else
return recursive_select_parent(PluginSection).first.plugin_type.text_value
end
end
def plugin_name
return name.text_value
end
def variable_name
return recursive_select_parent(PluginSection).first.variable(self)
end
def compile_initializer
# If any parent is a Plugin, this must be a codec.
if attributes.elements.nil?
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect})" << (plugin_type == "codec" ? "" : "\n")
else
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})" << (plugin_type == "codec" ? "" : "\n")
end
end
def compile
case plugin_type
when "input"
return "start_input(#{variable_name})"
when "filter"
return [
"newevents = []",
"extra_events.each do |event|",
" #{variable_name}.filter(event) do |newevent|",
" newevents << newevent",
" end",
"end",
"extra_events += newevents",
"#{variable_name}.filter(event) do |newevent|",
" extra_events << newevent",
"end",
"if event.cancelled?",
" extra_events.each { |e| yield e }",
" return",
"end",
].map { |l| "#{l}\n" }.join("")
when "output"
return "#{variable_name}.receive(event)"
when "codec"
settings = attributes.recursive_select(Attribute).collect(&:compile).reject(&:empty?)
attributes_code = "LogStash::Util.hash_merge_many(#{settings.map { |c| "{ #{c} }" }.join(", ")})"
return "plugin(#{plugin_type.inspect}, #{plugin_name.inspect}, #{attributes_code})"
end
end
end
class Name < Node
def compile
return text_value.inspect
end
end
class Attribute < Node
def compile
return %Q(#{name.compile} => #{value.compile})
end
end
class Value < Node; end
class Bareword < Value
def compile
return text_value.inspect
end
end
class String < Value
def compile
return text_value[1...-1].inspect
end
end
class Number < Value
def compile
return text_value
end
end
class Array < Value
def compile
return "[" << recursive_select(Value).collect(&:compile).reject(&:empty?).join(", ") << "]"
end
end
class Hash < Value
def compile
return "{" << recursive_select(HashEntry).collect(&:compile).reject(&:empty?).join(", ") << "}"
end
end
class HashEntries < Node; end
class HashEntry < Node
def compile
return %Q(#{name.compile} => #{value.compile})
end
end
class BranchOrPlugin < Node; end
class Branch < Node
def compile
return super + "end\n"
end
end
class If < Node
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "if #{condition.compile}\n" \
<< children.collect(&:compile).map { |s| s.split("\n").map { |l| " " + l }.join("\n") }.join("") << "\n"
end
end
class Elsif < Node
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "elsif #{condition.compile}\n" \
<< children.collect(&:compile).map { |s| s.split("\n").map { |l| " " + l }.join("\n") }.join("") << "\n"
end
end
class Else < Node
def compile
children = recursive_inject { |e| e.is_a?(Branch) || e.is_a?(Plugin) }
return "else\n" \
<< children.collect(&:compile).map { |s| s.split("\n").map { |l| " " + l }.join("\n") }.join("") << "\n"
end
end
class Condition < Node
def compile
return "(#{super})"
end
end
module Expression
def compile
return "(#{super})"
end
end
class Rvalue < Node
end
class MethodCall < Node
def compile
arguments = recursive_inject { |e| [String, Number, Selector, Array, MethodCall].any? { |c| e.is_a?(c) } }
return "#{method.text_value}(" << arguments.collect(&:compile).join(", ") << ")"
end
end
module ComparisonOperator
def compile
return " #{text_value} "
end
end
module BooleanOperator
def compile
return " #{text_value} "
end
end
class Selector < Node
def compile
return "event[#{text_value.inspect}]"
end
end
class SelectorElement < Node; end
end; end; end
# Monkeypatch Treetop::Runtime::SyntaxNode's inspect method to skip
# any Whitespace or SyntaxNodes with no children.
class Treetop::Runtime::SyntaxNode
def _inspect(indent="")
em = extension_modules
interesting_methods = methods-[em.last ? em.last.methods : nil]-self.class.instance_methods
im = interesting_methods.size > 0 ? " (#{interesting_methods.join(",")})" : ""
tv = text_value
tv = "...#{tv[-20..-1]}" if tv.size > 20
indent +
self.class.to_s.sub(/.*:/,'') +
em.map{|m| "+"+m.to_s.sub(/.*:/,'')}*"" +
" offset=#{interval.first}" +
", #{tv.inspect}" +
im +
(elements && elements.size > 0 ?
":" +
(elements.select { |e| !e.is_a?(LogStash::Config::AST::Whitespace) && e.elements && e.elements.size > 0 }||[]).map{|e|
begin
"\n"+e.inspect(indent+" ")
rescue # Defend against inspect not taking a parameter
"\n"+indent+" "+e.inspect
end
}.join("") :
""
)
end
end

View file

@ -1,102 +1,38 @@
require "logstash/namespace"
require "logstash/config/grammar"
require "logstash/config/config_ast"
require "logstash/config/registry"
require "logstash/errors"
require "logger"
class LogStash::Config::File
include Enumerable
attr_accessor :logger
public
def initialize(path=nil, string=nil)
@path = path
@string = string
def initialize(text)
@logger = Cabin::Channel.get(LogStash)
if (path.nil? and string.nil?) or (!path.nil? and !string.nil?)
raise "Must give path or string, not both or neither"
end
@text = text
@config = parse(text)
end # def initialize
def _get_config_data
if @string.nil?
File.new(@path).read
else
@string
end
end
def _get_config(data)
grammar = LogStash::Config::Grammar.new
grammar.parse(data)
grammar.config
end
public
def parse
@config = _get_config(_get_config_data);
registry = LogStash::Config::Registry::registry
each do |o|
# Load the base class for the type given (like inputs/base, or filters/base)
# TODO(sissel): Error handling
tryload o[:type], :base
type = registry[o[:type]]
# Load the plugin itself (inputs/file, outputs/rabbitmq, etc)
# TODO(sissel): Error handling
tryload o[:type], o[:plugin]
plugin = registry[o[:plugin]]
if type.nil?
@logger.info("Unknown plugin", :type => o[:type], :plugin => o[:plugin])
end
yield :type => type, :plugin => plugin, :parameters => o[:parameters]
def parse(text)
grammar = LogStashConfigParser.new
result = grammar.parse(text)
if result.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
return result
end # def parse
public
def tryload(parent, child)
child = child.downcase if child.is_a? String
begin
loaded = require("logstash/#{parent}s/#{child}")
rescue LoadError => e
if child == :base
@logger.fatal("Failure loading plugin type '#{parent}' - is that " \
"really a valid plugin type? (check for typos!)")
else
@logger.fatal("Failure loading plugin from config: " \
"'#{parent} { #{child} { ... } }' - is that " \
"really a valid #{parent} plugin? (check for typos!)")
end
raise e
end
end # def tryload
public
def each(&block)
# First level is the components
# Like:
# input {
# ...
# }
@config.each do |type, plugin_config_array|
# plugin_config_array has arrays of each component config:
# input {
# rabbitmq { ... }
# file { ... }
# file { ... }
# }
plugin_config_array.each do |plugin_config|
yield({
:type => type,
:plugin => plugin_config.keys.first,
:parameters => plugin_config.values.first
})
end
end # @config.each
end # def each
def plugin(plugin_type, name, *args)
klass = LogStash::Plugin.lookup(plugin_type, name)
return klass.new(*args)
end
def each
@config.recursive_select(LogStash::Config::AST::Plugin)
end
end # class LogStash::Config::Parser
#agent.config(cfg)

File diff suppressed because it is too large Load diff

View file

@ -1,215 +0,0 @@
require "logstash/namespace"
%%{
machine logstash_config;
action mark {
@tokenstack.push(p)
#puts "Mark: #{self.line(string, p)}##{self.column(string, p)}"
}
action stack_numeric {
startpos = @tokenstack.pop
endpos = p
token = string[startpos ... endpos]
#puts "numeric: #{token}"
#puts "numeric?: #{string[startpos,50]}"
#puts [startpos, endpos].join(",")
# TODO(sissel): Don't do 'to_i' here. Type coersion is the job of the
# plugin and the validator.
@stack << token.to_i
}
action stack_string {
startpos = @tokenstack.pop
endpos = p
token = string[startpos ... endpos]
#puts "string: #{token}"
@stack << token
}
action stack_quoted_string {
startpos = @tokenstack.pop
endpos = p
token = string[startpos + 1 ... endpos - 1] # Skip quotations
# Parse escapes.
token.gsub(/\\./) { |m| m[1,1] }
#puts "quotedstring: #{token}"
@stack << token
}
action array_init {
@array = []
@stack << :array_init
}
action array_push {
while @stack.last != :array_init
@array.unshift @stack.pop
end
@stack.pop # pop :array_init
@stack << @array
}
action parameter_init {
# nothing
}
action parameter {
value = @stack.pop
name = @stack.pop
#puts "parameter: #{name} => #{value}"
if value.is_a?(Array)
@parameters[name] += value
else
@parameters[name] << value
end
}
action plugin {
@components ||= []
name = @stack.pop
#@components << { :name => name, :parameters => @parameters }
@components << { name => @parameters }
@parameters = Hash.new { |h,k| h[k] = [] }
}
action component_init {
@components = []
@parameters = Hash.new { |h,k| h[k] = [] }
}
action component {
name = @stack.pop
@config ||= Hash.new { |h,k| h[k] = [] }
@config[name] += @components
#puts "Config component: #{name}"
}
#%{ e = @tokenstack.pop; puts "Comment: #{string[e ... p]}" };
comment = "#" (any - [\r\n])* >mark ;
ws = ([ \t\r\n] | comment)** ;
#ws = ([ \t\n])** ;
# TODO(sissel): Support floating point values?
numeric = ( ("+" | "-")? [0-9] :>> [0-9]** ) >mark %stack_numeric;
quoted_string = (
( "\"" ( ( (any - [\\"\r\n]) | "\\" any )* ) "\"" )
| ( "'" ( ( (any - [\\'\r\n]) | "\\" any )* ) "'" )
) >mark %stack_quoted_string ;
naked_string = ( [A-Za-z_] :>> [A-Za-z0-9_]* ) >mark %stack_string ;
string = ( quoted_string | naked_string ) ;
# TODO(sissel): allow use of this.
regexp_literal = ( "/" ( ( (any - [\\'\r\n]) | "\\" any )* ) "/" ) ;
array = (
( "[" ( ws | "" ) "]" )
| ( "[" ws ( string | numeric ) ws ("," ws (string | numeric ) ws)* "]" )
) >array_init %array_push;
# TODO(sissel): Implement hash syntax { key => value, ... }
# TODO(sissel): hashes should support arrays as values.
parameter_value = ( numeric | string | array );
parameter = ( string ws "=>" ws parameter_value ) %parameter ;
parameters = ( parameter ( ws parameter )** ) >parameter_init ;
# Statement:
# component {
# plugin_name {
# bar => ...
# baz => ...
# }
# ...
# }
plugin = (
(
naked_string ws "{" ws
parameters
ws "}"
) | (
naked_string ws "{" ws "}"
)
) %plugin ;
component = (
naked_string ws "{"
>component_init
( ws plugin )**
ws "}"
) %component ;
config = (ws component? )** ;
main := config %{ puts "END" }
$err {
# Compute line and column of the cursor (p)
$stderr.puts "Error at line #{self.line(string, p)}, column #{self.column(string, p)}: #{string[p .. -1].inspect}"
# TODO(sissel): Note what we were expecting?
} ;
}%%
class LogStash::Config::Grammar
attr_accessor :eof
attr_accessor :config
def initialize
# BEGIN RAGEL DATA
%% write data;
# END RAGEL DATA
@tokenstack = Array.new
@stack = Array.new
@types = Hash.new { |h,k| h[k] = [] }
@edges = []
end
def parse(string)
# TODO(sissel): Due to a bug in my parser, we need one trailing whitespace
# at the end of the string. I'll fix this later.
string += "\n"
data = string.unpack("c*")
# BEGIN RAGEL INIT
%% write init;
# END RAGEL INIT
begin
# BEGIN RAGEL EXEC
%% write exec;
# END RAGEL EXEC
rescue => e
# Compute line and column of the cursor (p)
raise e
end
if cs < self.logstash_config_first_final
$stderr.puts "Error at line #{self.line(string, p)}, column #{self.column(string, p)}: #{string[p .. -1].inspect}"
raise "Invalid Configuration. Check syntax of config file."
end
return cs
end # def parse
def line(str, pos)
return str[0 .. pos].count("\n") + 1
end
def column(str, pos)
return str[0 .. pos].split("\n").last.length
end
end # class LogStash::Config::Grammar
#def parse(string)
#cfgparser = LogStash::Config::Grammar.new
#result = cfgparser.parse(string)
#puts "result %s" % result
#ap cfgparser.config
#end
#parse(File.open(ARGV[0]).read)

View file

@ -0,0 +1,188 @@
require "treetop"
require "logstash/config/config_ast"
grammar LogStashConfig
rule config
_ plugin_section _ (_ plugin_section)* _ <LogStash::Config::AST::Config>
end
rule comment
(whitespace? "#" [^\r\n]+ "\r"? "\n")+ <LogStash::Config::AST::Comment>
end
rule _
(comment / whitespace)* <LogStash::Config::AST::Whitespace>
end
rule whitespace
[ \t\r\n]+ <LogStash::Config::AST::Whitespace>
end
rule plugin_section
plugin_type _ "{"
(_ branch_or_plugin _)*
"}"
<LogStash::Config::AST::PluginSection>
end
rule branch_or_plugin
branch / plugin
end
rule plugin_type
("input" / "filter" / "output")
end
rule plugins
(plugin (_ plugin)*)?
<LogStash::Config::AST::Plugins>
end
rule plugin
name _ "{"
_
attributes:( attribute (whitespace attribute)*)?
_
"}"
<LogStash::Config::AST::Plugin>
end
rule name
[A-Za-z0-9_-]+
<LogStash::Config::AST::Name>
end
rule attribute
name _ "=>" _ value
<LogStash::Config::AST::Attribute>
end
rule value
plugin / bareword / string / number / array / hash
#<LogStash::Config::AST::Value>
end
rule array_value
bareword / string / number / array / hash
end
rule bareword
[A-Za-z_] [A-Za-z0-9_]+
<LogStash::Config::AST::Bareword>
end
rule string
(
( '"' ( '\"' / !'"' . )* '"' <LogStash::Config::AST::String>)
/ ( "'" ( "\\'" / !"'" . )* "'" <LogStash::Config::AST::String> )
)
end
rule number
"-"? [0-9]+ ("." [0-9]*)?
<LogStash::Config::AST::Number>
end
rule array
"["
_
(
value (_ "," _ value)*
)?
_
"]"
<LogStash::Config::AST::Array>
end
rule hash
"{"
_
hashentries?
_
"}"
<LogStash::Config::AST::Hash>
end
rule hashentries
hashentry (whitespace hashentry)*
<LogStash::Config::AST::HashEntries>
end
rule hashentry
name:(number / bareword / string) _ "=>" _ value
<LogStash::Config::AST::HashEntry>
end
# Conditions
rule branch
if (_ elsif)* (_ else)?
<LogStash::Config::AST::Branch>
end
rule if
"if" _ condition _ "{" _ (branch_or_plugin _)* "}"
<LogStash::Config::AST::If>
end
rule elsif
"elsif" _ condition _ "{" _ ( branch_or_plugin _)* "}"
<LogStash::Config::AST::Elsif>
end
rule else
"else" _ "{" _ (branch_or_plugin _)* "}"
<LogStash::Config::AST::Else>
end
rule condition
expression (_ boolean_operator _ expression)*
<LogStash::Config::AST::Condition>
end
rule expression
(
("(" _ condition _ ")")
/ ("!" _ condition)
/ (rvalue _ comparison _ rvalue)
/ (rvalue)
) <LogStash::Config::AST::Expression>
end
rule rvalue
string / number / selector / array / method_call
end
rule method_call
method _ "(" _
(
rvalue ( _ "," _ rvalue )*
)?
_ ")"
<LogStash::Config::AST::MethodCall>
end
rule method
bareword
end
rule comparison
("==" / "!=" / "<" / ">" / "<=" / ">=" / "=~" / "!~" / "in")
<LogStash::Config::AST::ComparisonOperator>
end
rule boolean_operator
("and" / "or" / "xor" / "nand")
<LogStash::Config::AST::BooleanOperator>
end
rule selector
selector_element+
<LogStash::Config::AST::Selector>
end
rule selector_element
"[" [^\], ]+ "]"
<LogStash::Config::AST::SelectorElement>
end
end

View file

@ -49,8 +49,17 @@ module LogStash::Config::Mixin
# store the plugin type, turns LogStash::Inputs::Base into 'input'
@plugin_type = self.class.ancestors.find { |a| a.name =~ /::Base$/ }.config_name
# Set defaults from 'config :foo, :default => somevalue'
self.class.get_config.each do |name, opts|
next if params.include?(name.to_s)
if opts.include?(:default) and (name.is_a?(Symbol) or name.is_a?(String))
params[name.to_s] = opts[:default] unless params.include?(name.to_s)
end
end
if !self.class.validate(params)
raise LogStash::Plugin::ConfigurationError,
raise LogStash::ConfigurationError,
I18n.t("logstash.agent.configuration.invalid_plugin_settings")
end
@ -63,23 +72,6 @@ module LogStash::Config::Mixin
end
end
# Set defaults from 'config :foo, :default => somevalue'
self.class.get_config.each do |name, opts|
next if params.include?(name.to_s)
if opts.include?(:default) and (name.is_a?(Symbol) or name.is_a?(String))
if opts[:validate] == :password
@logger.debug("Converting default value in #{self.class.name} (#{name}) to password object")
params[name.to_s] = ::LogStash::Util::Password.new(opts[:default])
else
default = opts[:default]
if default.is_a?(Array) or default.is_a?(Hash)
default = default.clone
end
params[name.to_s] = default
end
end
end
# set instance variables like '@foo' for each config value given.
params.each do |key, value|
next if key[0, 1] == "@"
@ -163,9 +155,8 @@ module LogStash::Config::Mixin
end # def inherited
def validate(params)
@plugin_name = config_name #[superclass.config_name, config_name].join("/")
@plugin_name = config_name
@plugin_type = ancestors.find { |a| a.name =~ /::Base$/ }.config_name
#.name.split("::")[1].downcase.gsub(/s$/,"")
@logger = Cabin::Channel.get(LogStash)
is_valid = true
@ -178,20 +169,16 @@ module LogStash::Config::Mixin
end # def validate
def validate_plugin_status
return true if @@status_notice_given
docmsg = "For more information about plugin statuses, see http://logstash.net/docs/#{LOGSTASH_VERSION}/plugin-status "
plugin_type = ancestors.find { |a| a.name =~ /::Base$/ }.config_name
case @plugin_status
when "unsupported"
@@status_notice_given || @logger.warn("Using unsupported plugin '#{@config_name}'. This plugin isn't well supported by the community and likely has no maintainer. #{docmsg}")
when "experimental"
@@status_notice_given || @logger.warn("Using experimental plugin '#{@config_name}'. This plugin is untested and may change in the future. #{docmsg}")
when "beta"
@@status_notice_given || @logger.info("Using beta plugin '#{@config_name}'. #{docmsg}")
when "stable"
# This is cool. Nothing worth logging.
when nil
raise "#{@config_name} must set a plugin_status. #{docmsg}"
else
raise "#{@config_name} set an invalid plugin status #{@plugin_status}. Valid values are unsupported, experimental, beta and stable. #{docmsg}"
when "unsupported"; @logger.warn(I18n.t("logstash.plugin.unsupported", :type => plugin_type, :name => @config_name, :LOGSTASH_VERSION => LOGSTASH_VERSION))
when "experimental"; @logger.warn(I18n.t("logstash.plugin.experimental", :type => plugin_type, :name => @config_name, :LOGSTASH_VERSION => LOGSTASH_VERSION))
when "beta"; @logger.warn(I18n.t("logstash.plugin.beta", :type => plugin_type, :name => @config_name, :LOGSTASH_VERSION => LOGSTASH_VERSION))
when "stable"; # This is cool. Nothing worth logging.
when nil; raise "#{@config_name} must set a plugin_status. #{docmsg}"
else; raise "#{@config_name} set an invalid plugin status #{@plugin_status}. Valid values are unsupported, experimental, beta and stable. #{docmsg}"
end
@@status_notice_given = true
return true
@ -245,18 +232,8 @@ module LogStash::Config::Mixin
# config /foo.*/ => ...
is_valid = true
# string/symbols are first, then regexes.
config_keys = @config.keys.sort do |a,b|
CONFIGSORT[a.class] <=> CONFIGSORT[b.class]
end
#puts "Key order: #{config_keys.inspect}"
#puts @config.keys.inspect
params.each do |key, value|
config_keys.each do |config_key|
#puts
#puts "Candidate: #{key.inspect} / #{value.inspect}"
#puts "Config: #{config_key} / #{config_val} "
@config.keys.each do |config_key|
next unless (config_key.is_a?(Regexp) && key =~ config_key) \
|| (config_key.is_a?(String) && key == config_key)
config_val = @config[config_key][:validate]
@ -301,8 +278,6 @@ module LogStash::Config::Mixin
if validator.nil?
return true
elsif validator.is_a?(Proc)
return validator.call(value)
elsif validator.is_a?(Array)
value = [*value]
if value.size > 1
@ -319,6 +294,14 @@ module LogStash::Config::Mixin
value = hash_or_array(value)
case validator
when :codec
if value.first.is_a?(String)
value = LogStash::Plugin.lookup("codec", value.first).new
return true, value
else
value = value.first
return true, value
end
when :hash
if value.is_a?(Hash)
return true, value

5
lib/logstash/errors.rb Normal file
View file

@ -0,0 +1,5 @@
module LogStash
class Error < ::StandardError; end
class ConfigurationError < Error; end
class PluginLoadingError < Error; end
end

View file

@ -1,12 +1,255 @@
# General event type.
require "json"
require "time"
require "date"
require "logstash/namespace"
# Use a custom serialization for jsonifying Time objects.
# TODO(sissel): Put this in a separate file.
class Time
def to_json(*args)
return iso8601(3).to_json(*args)
end
end
# the logstash event object.
#
# Basically a light wrapper on top of a hash.
# An event is simply a tuple of (timestamp, data).
# The 'timestamp' is an ISO8601 timestamp. Data is anything - any message,
# context, references, etc that are relevant to this event.
#
# Internally, this is represented as a hash with only two guaranteed fields.
#
# * "@timestamp" - an ISO8601 timestamp representing the time the event
# occurred at.
# * "@version" - the version of the schema. Currently "1"
#
# They are prefixed with an "@" symbol to avoid clashing with your
# own custom fields.
#
# When serialized, this is represented in JSON. For example:
#
# {
# "@timestamp": "2013-02-09T20:39:26.234Z",
# "@version": "1",
# message: "hello world"
# }
class LogStash::Event
if ENV["LOGSTASH_SCHEMA"] == "0"
require "logstash/event_v0"
include LogStash::EventV0
class DeprecatedMethod < StandardError; end
public
def initialize(data={})
@cancelled = false
@data = data
@data["@timestamp"] = ::Time.now.utc if !@data.include?("@timestamp")
@data["@version"] = "1" if !@data.include?("@version")
end # def initialize
# Add class methods on inclusion.
public
def self.included(klass)
klass.extend(ClassMethods)
end # def included
module ClassMethods
public
def from_json(json)
return self.new(JSON.parse(json))
end # def from_json
end
public
def cancel
@cancelled = true
end # def cancel
public
def uncancel
@cancelled = false
end # def uncancel
public
def cancelled?
return @cancelled
end # def cancelled?
# Create a deep-ish copy of this event.
public
def clone
copy = {}
@data.each do |k,v|
# TODO(sissel): Recurse if this is a hash/array?
copy[k] = v.clone
end
return self.class.new(copy)
end # def clone
if RUBY_ENGINE == "jruby"
public
def to_s
return self.sprintf("%{+yyyy-MM-dd'T'HH:mm:ss.SSSZ} %{source} %{message}")
end # def to_s
else
require "logstash/event_v1"
include LogStash::EventV1
public
def to_s
return self.sprintf("#{self["@timestamp"].iso8601} %{source} %{message}")
end # def to_s
end
public
def timestamp; return @data["@timestamp"]; end # def timestamp
def timestamp=(val); return @data["@timestamp"] = val; end # def timestamp=
def unix_timestamp
raise DeprecatedMethod
end # def unix_timestamp
def ruby_timestamp
raise DeprecatedMethod
end # def unix_timestamp
# field-related access
public
def [](key)
if key[0] == '['
val = @data
key.gsub(/(?<=\[).+?(?=\])/).each do |tok|
if val.is_a? Array
val = val[tok.to_i]
else
val = val[tok]
end
end
return val
else
return @data[key]
end
end # def []
public
def []=(key, value)
if key[0] == '['
val = @data
keys = key.scan(/(?<=\[).+?(?=\])/)
last = keys.pop
keys.each do |tok|
if val.is_a? Array
val = val[tok.to_i]
else
val = val[tok]
end
end
val[last] = value
else
@data[key] = value
end
end # def []=
public
def fields
raise DeprecatedMethod
end
public
def to_json(*args)
return @data.to_json(*args)
end # def to_json
def to_hash
return @data
end # def to_hash
public
def overwrite(event)
@data = event.to_hash
end
public
def include?(key)
return !self[key].nil?
end # def include?
# Append an event to this one.
public
def append(event)
# non-destructively merge that event with ourselves.
LogStash::Util.hash_merge(@data, event.to_hash)
end # append
# Remove a field. Returns the value of that field when deleted
public
def remove(field)
return @data.delete(field)
end # def remove
# sprintf. This could use a better method name.
# The idea is to take an event and convert it to a string based on
# any format values, delimited by %{foo} where 'foo' is a field or
# metadata member.
#
# For example, if the event has @type == "foo" and @source == "bar"
# then this string:
# "type is %{@type} and source is %{@source}"
# will return
# "type is foo and source is bar"
#
# If a %{name} value is an array, then we will join by ','
# If a %{name} value does not exist, then no substitution occurs.
#
# TODO(sissel): It is not clear what the value of a field that
# is an array (or hash?) should be. Join by comma? Something else?
public
def sprintf(format)
if format.index("%").nil?
return format
end
return format.gsub(/%\{[^}]+\}/) do |tok|
# Take the inside of the %{ ... }
key = tok[2 ... -1]
if key == "+%s"
# Got %{+%s}, support for unix epoch time
next @data["@timestamp"].to_i
elsif key[0,1] == "+"
t = @data["@timestamp"]
formatter = org.joda.time.format.DateTimeFormat.forPattern(key[1 .. -1])\
.withZone(org.joda.time.DateTimeZone::UTC)
#next org.joda.time.Instant.new(t.tv_sec * 1000 + t.tv_usec / 1000).toDateTime.toString(formatter)
# Invoke a specific Instant constructor to avoid this warning in JRuby
# > ambiguous Java methods found, using org.joda.time.Instant(long)
org.joda.time.Instant.java_class.constructor(Java::long).new_instance(
t.tv_sec * 1000 + t.tv_usec / 1000
).to_java.toDateTime.toString(formatter)
else
value = self[key]
case value
when nil
tok # leave the %{foo} if this field does not exist in this event.
when Array
value.join(",") # Join by ',' if value is an array
when Hash
value.to_json # Convert hashes to json
else
value # otherwise return the value
end # case value
end # 'key' checking
end # format.gsub...
end # def sprintf
# Shims to remove after event v1 is the default.
def tags=(value); self["tags"] = value; end
def tags; return self["tags"]; end
def message=(value); self["message"] = value; end
def source=(value); self["source"] = value; end
def type=(value); self["type"] = value; end
def type; return self["type"]; end
def fields; return self.to_hash; end
def tag(value)
# Generalize this method for more usability
self["tags"] ||= []
self["tags"] << value unless self["tags"].include?(value)
end
end # class LogStash::Event

View file

@ -1,305 +0,0 @@
require "json"
require "time"
require "date"
require "logstash/time_addon"
require "logstash/namespace"
require "uri"
# General event type.
# Basically a light wrapper on top of a hash.
#
# TODO(sissel): properly handle lazy properties like parsed time formats, urls,
# etc, as necessary.
module LogStash::EventV0
public
def initialize(data=nil)
@cancelled = false
@data = {
"@source" => "unknown",
"@tags" => [],
"@fields" => {},
}
@data.merge!(data) unless data.nil?
@data["@timestamp"] ||= LogStash::Time.now
end # def initialize
if defined?(RUBY_ENGINE) && RUBY_ENGINE == "jruby"
@@date_parser = Java::org.joda.time.format.ISODateTimeFormat.dateTimeParser.withOffsetParsed
else
# TODO(sissel): LOGSTASH-217
@@date_parser ||= nil
end
public
def cancel
@cancelled = true
end # def cancel
public
def uncancel
@cancelled = false
end # def uncancel
public
def cancelled?
return @cancelled
end # def cancelled?
# Create a deep-ish copy of this event.
public
def clone
return LogStash::Event.new(Marshal.load(Marshal.dump(@data)))
end # def clone
public
def to_s
return self.sprintf("%{@timestamp} %{@source}: %{@message}")
end # def to_s
public
def timestamp; @data["@timestamp"]; end # def timestamp
def timestamp=(val); @data["@timestamp"] = val; end # def timestamp=
public
def unix_timestamp
if RUBY_ENGINE != "jruby"
# This is really slow. See LOGSTASH-217
# For some reason, ::Time.parse isn't present even after 'require "time"'
# so use DateTime.parse
return ::DateTime.parse(timestamp).to_time.to_f
else
time = @@date_parser.parseDateTime(timestamp)
return time.getMillis.to_f / 1000
end
end
def ruby_timestamp
return ::DateTime.parse(timestamp).to_time
end
public
def source; @data["@source"]; end # def source
def source=(val)
uri = URI.parse(val) rescue nil
val = uri if uri
if val.is_a?(URI)
@data["@source"] = val.to_s
@data["@source_host"] = val.host if @data["@source_host"].nil?
@data["@source_path"] = val.path
else
@data["@source"] = val
end
end # def source=
public
def source_host; @data["@source_host"]; end # def source_host
def source_host=(val); @data["@source_host"] = val; end # def source_host=
public
def source_path; @data["@source_path"]; end # def source_path
def source_path=(val); @data["@source_path"] = val; end # def source_path=
public
def message; @data["@message"]; end # def message
def message=(val); @data["@message"] = val; end # def message=
public
def type; @data["@type"]; end # def type
def type=(val); @data["@type"] = val; end # def type=
public
def tags; @data["@tags"]; end # def tags
def tags=(val); @data["@tags"] = val; end # def tags=
def id; @data["@id"]; end # def id
def id=(val); @data["@id"] = val; end # def id=
# field-related access
public
def [](key)
# If the key isn't in fields and it starts with an "@" sign, get it out of data instead of fields
if ! @data["@fields"].has_key?(key) and key.slice(0,1) == "@"
return @data[key]
elsif key.index(/(?<!\\)\./)
value = nil
obj = @data["@fields"]
# "." is what ES uses to access structured data, so adopt that
# idea here, too. "foo.bar" will access key "bar" under hash "foo".
key.split(/(?<!\\)\./).each do |segment|
segment.gsub!(/\\\./, ".")
if (obj.is_a?(Array) || (obj.is_a?(Hash) && !obj.member?(segment)) )
# try to safely cast segment to integer for the 0 in foo.0.bar
begin
segment = Integer(segment)
rescue Exception
#not an int, do nothing, segment remains a string
end
end
if obj
value = obj[segment] rescue nil
obj = obj[segment] rescue nil
else
value = nil
break
end
end # key.split.each
return value
else
return @data["@fields"][key.gsub(/\\\./, ".")]
end
end # def []
public
def []=(key, value)
if @data.has_key?(key) || key[0,1] == "@"
@data[key] = value
else
@data["@fields"][key] = value
end
end # def []=
def fields; return @data["@fields"] end # def fields
public
def to_json(*args); return @data.to_json(*args) end # def to_json
def to_hash; return @data end # def to_hash
public
def overwrite(event)
@data = event.to_hash
end
public
def include?(key)
return !self[key].nil?
end # def include?
# Append an event to this one.
public
def append(event)
if event.message
if self.message
self.message += "\n" + event.message
else
self.message = event.message
end
end
self.tags |= event.tags
# Append all fields
event.fields.each do |name, value|
if self.fields.include?(name)
if !self.fields[name].is_a?(Array)
self.fields[name] = [self.fields[name]]
end
if value.is_a?(Array)
self.fields[name] |= value
else
self.fields[name] << value unless self.fields[name].include?(value)
end
else
self.fields[name] = value
end
end # event.fields.each
end # def append
# Remove a field. Returns the value of that field when deleted
public
def remove(field)
if @data.has_key?(field)
return @data.delete(field)
else
return @data["@fields"].delete(field)
end
end # def remove
# sprintf. This could use a better method name.
# The idea is to take an event and convert it to a string based on
# any format values, delimited by %{foo} where 'foo' is a field or
# metadata member.
#
# For example, if the event has @type == "foo" and @source == "bar"
# then this string:
# "type is %{@type} and source is %{@source}"
# will return
# "type is foo and source is bar"
#
# If a %{name} value is an array, then we will join by ','
# If a %{name} value does not exist, then no substitution occurs.
#
# TODO(sissel): It is not clear what the value of a field that
# is an array (or hash?) should be. Join by comma? Something else?
public
def sprintf(format)
if format.index("%").nil?
return format
end
return format.gsub(/%\{[^}]+\}/) do |tok|
# Take the inside of the %{ ... }
key = tok[2 ... -1]
if key == "+%s"
# Got %{+%s}, support for unix epoch time
if RUBY_ENGINE != "jruby"
# This is really slow. See LOGSTASH-217
Time.parse(self.timestamp).to_i
else
datetime = @@date_parser.parseDateTime(self.timestamp)
(datetime.getMillis / 1000).to_i
end
elsif key[0,1] == "+"
# We got a %{+TIMEFORMAT} so use joda to format it.
if RUBY_ENGINE != "jruby"
# This is really slow. See LOGSTASH-217
datetime = Date.parse(self.timestamp)
format = key[1 .. -1]
datetime.strftime(format)
else
datetime = @@date_parser.parseDateTime(self.timestamp)
format = key[1 .. -1]
datetime.toString(format) # return requested time format
end
else
# Use an event field.
value = self[key]
case value
when nil
tok # leave the %{foo} if this field does not exist in this event.
when Array
value.join(",") # Join by ',' if value is an array
when Hash
value.to_json # Convert hashes to json
else
value # otherwise return the value
end
end
end
end # def sprintf
public
def ==(other)
#puts "#{self.class.name}#==(#{other.inspect})"
if !other.is_a?(self.class)
return false
end
return other.to_hash == self.to_hash
end # def ==
# Add class methods on inclusion.
def self.included(klass)
klass.extend(ClassMethods)
end # def included
module ClassMethods
public
def from_json(json)
return self.new(JSON.parse(json))
end # def from_json
end
end # module LogStash::EventV0

View file

@ -1,234 +0,0 @@
require "json"
require "time"
require "date"
require "logstash/namespace"
require "uri"
# Use a custom serialization for jsonifying Time objects.
# TODO(sissel): Put this in a separate file.
class Time
def to_json(*args)
return iso8601(3).to_json(*args)
end
end
# the logstash event object.
#
# An event is simply a tuple of (timestamp, data).
# The 'timestamp' is an ISO8601 timestamp. Data is anything - any message,
# context, references, etc that are relevant to this event.
#
# Internally, this is represented as a hash with only two guaranteed fields.
#
# * "@timestamp" - an ISO8601 timestamp representing the time the event
# occurred at.
# * "@version" - the version of the schema. Currently "1"
#
# They are prefixed with an "@" symbol to avoid clashing with your
# own custom fields.
#
# When serialized, this is represented in JSON. For example:
#
# {
# "@timestamp": "2013-02-09T20:39:26.234Z",
# "@version": "1",
# message: "hello world"
# }
module LogStash::EventV1
class DeprecatedMethod < StandardError; end
public
def initialize(data={})
@cancelled = false
@data = data
@data["@timestamp"] = ::Time.now.utc if !@data.include?("@timestamp")
@data["@version"] = "1" if !@data.include?("@version")
end # def initialize
# Add class methods on inclusion.
public
def self.included(klass)
klass.extend(ClassMethods)
end # def included
module ClassMethods
public
def from_json(json)
return self.new(JSON.parse(json))
end # def from_json
end
public
def cancel
@cancelled = true
end # def cancel
public
def uncancel
@cancelled = false
end # def uncancel
public
def cancelled?
return @cancelled
end # def cancelled?
# Create a deep-ish copy of this event.
public
def clone
copy = {}
@data.each do |k,v|
# TODO(sissel): Recurse if this is a hash/array?
copy[k] = v.clone
end
return self.class.new(copy)
end # def clone
public
def to_s
return self.sprintf("%{@timestamp} %{source_host} %{message}")
end # def to_s
public
def timestamp; return @data["@timestamp"]; end # def timestamp
def timestamp=(val); return @data["@timestamp"] = val; end # def timestamp=
def unix_timestamp
raise DeprecatedMethod
end # def unix_timestamp
def ruby_timestamp
raise DeprecatedMethod
end # def unix_timestamp
# field-related access
public
def [](key)
if key[0] == '['
val = @data
key.gsub(/(?<=\[).+?(?=\])/).each do |tok|
if val.is_a? Array
val = val[tok.to_i]
else
val = val[tok]
end
end
return val
else
return @data[key]
end
end # def []
public
def []=(key, value)
@data[key] = value
end # def []=
public
def fields
raise DeprecatedMethod
end
public
def to_json(*args)
return @data.to_json(*args)
end # def to_json
def to_hash
return @data
end # def to_hash
public
def overwrite(event)
@data = event.to_hash
end
public
def include?(key)
return !self[key].nil?
end # def include?
# Append an event to this one.
public
def append(event)
# non-destructively merge that event with ourselves.
LogStash::Util.hash_merge(@data, event.to_hash)
end # append
# Remove a field. Returns the value of that field when deleted
public
def remove(field)
return @data.delete(field)
end # def remove
# sprintf. This could use a better method name.
# The idea is to take an event and convert it to a string based on
# any format values, delimited by %{foo} where 'foo' is a field or
# metadata member.
#
# For example, if the event has @type == "foo" and @source == "bar"
# then this string:
# "type is %{@type} and source is %{@source}"
# will return
# "type is foo and source is bar"
#
# If a %{name} value is an array, then we will join by ','
# If a %{name} value does not exist, then no substitution occurs.
#
# TODO(sissel): It is not clear what the value of a field that
# is an array (or hash?) should be. Join by comma? Something else?
public
def sprintf(format)
if format.index("%").nil?
return format
end
return format.gsub(/%\{[^}]+\}/) do |tok|
# Take the inside of the %{ ... }
key = tok[2 ... -1]
if key == "+%s"
# Got %{+%s}, support for unix epoch time
next @data["@timestamp"].to_i
elsif key[0,1] == "+"
t = @data["@timestamp"]
formatter = org.joda.time.format.DateTimeFormat.forPattern(key[1 .. -1])\
.withZone(org.joda.time.DateTimeZone::UTC)
#next org.joda.time.Instant.new(t.tv_sec * 1000 + t.tv_usec / 1000).toDateTime.toString(formatter)
# Invoke a specific Instant constructor to avoid this warning in JRuby
# > ambiguous Java methods found, using org.joda.time.Instant(long)
org.joda.time.Instant.java_class.constructor(Java::long).new_instance(
t.tv_sec * 1000 + t.tv_usec / 1000
).to_java.toDateTime.toString(formatter)
else
value = self[key]
case value
when nil
tok # leave the %{foo} if this field does not exist in this event.
when Array
value.join(",") # Join by ',' if value is an array
when Hash
value.to_json # Convert hashes to json
else
value # otherwise return the value
end # case value
end # 'key' checking
end # format.gsub...
end # def sprintf
# Shims to remove after event v1 is the default.
def tags=(value); self["tags"] = value; end
def tags; return self["tags"]; end
def message=(value); self["message"] = value; end
def source=(value); self["source"] = value; end
def type=(value); self["type"] = value; end
def type; return self["type"]; end
def fields; return self.to_hash; end
def tag(value)
# Generalize this method for more usability
self["tags"] ||= []
self["tags"] << value unless self["tags"].include?(value)
end
end # module LogStash::EventV1

View file

@ -15,32 +15,16 @@ class LogStash::Filters::Base < LogStash::Plugin
# act on messages with the same type. See any input plugin's "type"
# attribute for more.
# Optional.
config :type, :validate => :string, :default => ""
config :type, :validate => :string, :default => "", :deprecated => true
# Only handle events with all/any (controlled by include_any config option) of these tags.
# Optional.
# TODO(piavlo): sould we rename/alias this to include_tags for clearness and consistency?
config :tags, :validate => :array, :default => []
config :tags, :validate => :array, :default => [], :deprecated => true
# Only handle events without all/any (controlled by exclude_any config option) of these tags.
# Only handle events without all/any (controlled by exclude_any config
# option) of these tags.
# Optional.
config :exclude_tags, :validate => :array, :default => []
# Only handle events with all/any (controlled by include_any config option) of these fields.
# Optional.
config :include_fields, :validate => :array, :default => []
# Only handle events without all/any (controlled by exclude_any config option) of these fields.
# Optional.
config :exclude_fields, :validate => :array, :default => []
# Should all or any of the specified tags/include_fields be present for event to
# be handled. Defaults to all.
config :include_any, :validate => :boolean, :default => false
# Should all or any of the specified exclude_tags/exclude_fields be missing for event to
# be handled. Defaults to all.
config :exclude_any, :validate => :boolean, :default => true
config :exclude_tags, :validate => :array, :default => [], :deprecated => true
# If this filter is successful, add arbitrary tags to the event.
# Tags can be dynamic and include parts of the event using the %{field}
@ -107,9 +91,6 @@ class LogStash::Filters::Base < LogStash::Plugin
super
config_init(params)
@threadsafe = true
@include_method = @include_any ? :any? : :all?
@exclude_method = @exclude_any ? :any? : :all?
end # def initialize
public
@ -181,30 +162,20 @@ class LogStash::Filters::Base < LogStash::Plugin
end
if !@tags.empty?
return false if event["tags"].nil?
if !@tags.send(@include_method) { |tag| event.tags.include?(tag) }
@logger.debug? and @logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event])
# this filter has only works on events with certain tags,
# and this event has no tags.
return false if !event["tags"]
# Is @tags a subset of the event's tags? If not, skip it.
if (event["tags"] & @tags).size != @tags.size
@logger.debug(["Skipping event because tags don't match #{@tags.inspect}", event])
return false
end
end
if !@exclude_tags.empty? && !event["tags"].nil?
if @exclude_tags.send(@exclude_method) {|tag| event.tags.include?(tag)}
@logger.debug? and @logger.debug(["Skipping event because tags contains excluded tags: #{exclude_tags.inspect}", event])
return false
end
end
if !@include_fields.empty?
if !@include_fields.send(@include_method) {|field| event.include?(field)}
@logger.debug? and @logger.debug(["Skipping event because fields don't match #{@include_fields.inspect}", event])
return false
end
end
if !@exclude_fields.empty?
if @exclude_fields.send(@exclude_method) {|field| event.include?(field)}
@logger.debug? and @logger.debug(["Skipping event because fields contain excluded fields #{@exclude_fields.inspect}", event])
if !@exclude_tags.empty? && event["tags"]
if (diff_tags = (event["tags"] & @exclude_tags)).size != 0
@logger.debug(["Skipping event because tags contains excluded tags: #{diff_tags.inspect}", event])
return false
end
end

View file

@ -138,15 +138,19 @@ class LogStash::Filters::Date < LogStash::Filters::Base
end
parser = lambda { |date| joda_parser.parseDateTime(date) }
when "UNIX" # unix epoch
parser = lambda { |date| org.joda.time.Instant.new((date.to_f * 1000).to_i).toDateTime }
joda_instant = org.joda.time.Instant.java_class.constructor(Java::long).method(:new_instance)
parser = lambda { |date| joda_instant.call((date.to_f * 1000).to_i).to_java.toDateTime }
when "UNIX_MS" # unix epoch in ms
parser = lambda { |date| org.joda.time.Instant.new(date.to_i).toDateTime }
joda_instant = org.joda.time.Instant.java_class.constructor(Java::long).method(:new_instance)
parser = lambda do |date|
return joda_instant.call(date.to_i).to_java.toDateTime
end
when "TAI64N" # TAI64 with nanoseconds, -10000 accounts for leap seconds
joda_instant = org.joda.time.Instant.java_class.constructor(Java::long).method(:new_instance)
parser = lambda do |date|
# Skip leading "@" if it is present (common in tai64n times)
date = date[1..-1] if date[0, 1] == "@"
org.joda.time.Instant.new((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).toDateTime
return joda_instant.call((date[1..15].hex * 1000 - 10000)+(date[16..23].hex/1000000)).to_java.toDateTime
end
else
joda_parser = org.joda.time.format.DateTimeFormat.forPattern(format).withDefaultYear(Time.new.year)
@ -200,9 +204,7 @@ class LogStash::Filters::Date < LogStash::Filters::Base
end
end # fieldparsers.each
if !success
raise last_exception
end
raise last_exception unless success
time = time.withZone(UTC)
# Convert joda DateTime to a ruby Time

View file

@ -0,0 +1,31 @@
require "logstash/filters/base"
require "logstash/namespace"
# Drop filter.
#
# Drops everything that gets to this filter.
#
# This is best used in combination with conditionals, for example:
#
# filter {
# if [loglevel] == "debug" {
# drop { }
# }
# }
#
# The above will only pass events to the drop filter if the loglevel field is
# "debug". This will cause all events matching to be dropped.
class LogStash::Filters::Drop < LogStash::Filters::Base
config_name "drop"
plugin_status "experimental"
public
def register
# nothing to do.
end
public
def filter(event)
event.cancel
end # def filter
end # class LogStash::Filters::Drop

View file

@ -1,251 +1,30 @@
# multiline filter
#
# This filter will collapse multiline messages into a single event.
#
require "logstash/filters/base"
require "logstash/namespace"
require "set"
require "logstash/errors"
# The multiline filter is for combining multiple events from a single source
# into the same event.
# ## This filter was replaced by a codec.
#
# The original goal of this filter was to allow joining of multi-line messages
# from files into a single event. For example - joining java exception and
# stacktrace messages into a single event.
#
# The config looks like this:
#
# filter {
# multiline {
# type => "type"
# pattern => "pattern, a regexp"
# negate => boolean
# what => "previous" or "next"
# }
# }
#
# The 'regexp' should match what you believe to be an indicator that
# the field is part of a multi-line event
#
# The 'what' must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# The 'negate' can be "true" or "false" (defaults false). If true, a
# message not matching the pattern will constitute a match of the multiline
# filter and the what will be applied. (vice-versa is also true)
#
# For example, java stack traces are multiline and usually have the message
# starting at the far-left, then each subsequent line indented. Do this:
#
# filter {
# multiline {
# type => "somefiletype"
# pattern => "^\s"
# what => "previous"
# }
# }
#
# This says that any line starting with whitespace belongs to the previous line.
#
# Another example is C line continuations (backslash). Here's how to do that:
#
# filter {
# multiline {
# type => "somefiletype "
# pattern => "\\$"
# what => "next"
# }
# }
#
# See the multiline codec instead.
class LogStash::Filters::Multiline < LogStash::Filters::Base
config_name "multiline"
plugin_status "stable"
# The regular expression to match
# Leave these config settings until we remove this filter entirely.
# THe idea is that we want the register method to cause an abort
# giving the user a clue to use the codec instead of the filter.
config :pattern, :validate => :string, :required => true
# The field to use for matching a multiline event.
config :source, :validate => :string, :default => "message"
# If the pattern matched, does event belong to the next or previous event?
config :what, :validate => ["previous", "next"], :required => true
# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false
# The stream identity is how the multiline filter determines which stream an
# event belongs. This is generally used for differentiating, say, events
# coming from multiple files in the same file input, or multiple connections
# coming from a tcp input.
#
# The default value here is usually what you want, but there are some cases
# where you want to change it. One such example is if you are using a tcp
# input with only one client connecting at any time. If that client
# reconnects (due to error or client restart), then logstash will identify
# the new connection as a new stream and break any multiline goodness that
# may have occurred between the old and new connection. To solve this use
# case, you can use "%{host}.%{type}" instead.
config :stream_identity , :validate => :string, :default => "%{host}-%{path}-%{type}"
# logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns.
#
# Pattern files are plain text with format:
#
# NAME PATTERN
#
# For example:
#
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []
# Flush inactive multiline streams older than the given number of
# seconds.
#
# This is useful when your event stream is slow and you do not want to wait
# for the next event before seeing the current event.
#config :flush_age, :validate => :number, :default => 5
# Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/
@@patterns_path += ["#{File.dirname(__FILE__)}/../../patterns/*"]
else
@@patterns_path += ["#{File.dirname(__FILE__)}/../../../patterns/*"]
end
public
def initialize(config = {})
super
@threadsafe = false
# This filter needs to keep state.
@types = Hash.new { |h,k| h[k] = [] }
@pending = Hash.new
end # def initialize
public
def register
require "grok-pure" # rubygem 'jls-grok'
@grok = Grok.new
@patterns_dir = @@patterns_path.to_a + @patterns_dir
@patterns_dir.each do |path|
# Can't read relative paths from jars, try to normalize away '../'
while path =~ /file:\/.*\.jar!.*\/\.\.\//
# replace /foo/bar/../baz => /foo/baz
path = path.gsub(/[^\/]+\/\.\.\//, "")
end
if File.directory?(path)
path = File.join(path, "*")
end
Dir.glob(path).each do |file|
@logger.info("Grok loading patterns from file", :path => file)
@grok.add_patterns_from_file(file)
end
end
@grok.compile(@pattern)
@logger.debug("Registered multiline plugin", :type => @type, :config => @config)
raise LogStash::ConfigurationError, "The multiline filter has been replaced by the multiline codec. Please see http://logstash.net/docs/%VERSION%/codecs/multiline.\n"
end # def register
public
def filter(event)
return unless filter?(event)
return unless event.include?(@source)
if event[@source].is_a?(Array)
match = @grok.match(event[@source].first)
else
match = @grok.match(event[@source])
end
key = event.sprintf(@stream_identity)
pending = @pending[key]
@logger.debug("Multiline", :pattern => @pattern, :message => event["message"],
:match => match, :negate => @negate)
# Add negate option
match = (match and !@negate) || (!match and @negate)
case @what
when "previous"
if match
event.tag("multiline")
# previous previous line is part of this event.
# append it to the event and cancel it
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# this line is not part of the previous event
# if we have a pending event, it's done, send it.
# put the current event into pending
if pending
pending.uncancel
pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
yield pending
end
@pending[key] = event
event.cancel
end # if/else match
when "next"
if match
event.tag("multiline")
# this line is part of a multiline event, the next
# line will be part, too, put it into pending.
if pending
pending.append(event)
else
@pending[key] = event
end
event.cancel
else
# if we have something in pending, join it with this message
# and send it. otherwise, this is a new message and not part of
# multiline, send it.
if pending
pending.append(event)
@pending.delete(key)
pending[@source] = pending[@source].join("\n") if pending[@source].is_a?(Array)
yield pending
end
end # if/else match
else
# TODO(sissel): Make this part of the 'register' method.
@logger.warn("Unknown multiline 'what' value.", :what => @what)
end # case @what
if !event.cancelled?
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
filter_matched(event)
end
end # def filter
public
def flush
events = []
#flushed = []
@pending.each do |key, event|
#next unless @flush_age.nil? || (Time.now - event.timestamp) > @flush_age
event.uncancel
event[@source] = event[@source].join("\n") if event[@source].is_a?(Array)
events << event
#flushed << key
end
#flushed.each { |k| @pending.delete(k) }
@pending.clear
return events
end # def flush
end # class LogStash::Filters::Multiline

View file

@ -87,22 +87,22 @@ class LogStash::Filters::Xml < LogStash::Filters::Base
if event[key].length > 1
@logger.warn("XML filter only works on fields of length 1",
:key => key, :value => event[key])
next
return
end
raw = event[key].first
# for some reason, an empty string is considered valid XML
next if raw.strip.length == 0
return if raw.strip.length == 0
if @xpath
if @xpath
begin
doc = Nokogiri::XML(raw)
rescue => e
event.tag("_xmlparsefailure")
@logger.warn("Trouble parsing xml", :key => key, :raw => raw,
:exception => e, :backtrace => e.backtrace)
next
return
end
@xpath.each do |xpath_src, xpath_dest|
@ -115,7 +115,7 @@ class LogStash::Filters::Xml < LogStash::Filters::Base
normalized_nodeset.each do |value|
# some XPath functions return empty arrays as string
if value.is_a?(Array)
next if value.length == 0
return if value.length == 0
end
unless value.nil?
@ -135,7 +135,7 @@ class LogStash::Filters::Xml < LogStash::Filters::Base
event.tag("_xmlparsefailure")
@logger.warn("Trouble parsing xml with XmlSimple", :key => key,
:raw => raw, :exception => e, :backtrace => e.backtrace)
next
return
end
end # if @store_xml

View file

@ -34,10 +34,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
config :format, :validate => ["plain", "json", "json_event", "msgpack_event"], :deprecated => true
# The codec used for input data
config :codec, :validate => :string, :default => 'plain'
# Optional arguments to get passed into the codec
config :codec_args, :validate => :hash, :default => {}
config :codec, :validate => :codec, :default => "plain"
# The character encoding used in this input. Examples include "UTF-8"
# and "cp1252"
@ -69,7 +66,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
attr_accessor :threadable
public
def initialize(params)
def initialize(params={})
super
@threadable = false
config_init(params)
@ -86,12 +83,6 @@ class LogStash::Inputs::Base < LogStash::Plugin
@tags << newtag
end # def tag
protected
def enable_codecs
@codec = LogStash::Codecs.for(@codec).new(@codec_args)
@codec.charset = @charset
end
protected
def to_event(raw, source)
@format ||= "plain"

View file

@ -26,7 +26,6 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base
public
def register
enable_codecs
@logger.info("Registering Exec Input", :type => @type,
:command => @command, :interval => @interval)
end # def register

View file

@ -62,7 +62,6 @@ class LogStash::Inputs::File < LogStash::Inputs::Base
require "addressable/uri"
require "filewatch/tail"
require "digest/md5"
enable_codecs
LogStash::Util::set_thread_name("input|file|#{path.join(":")}")
@logger.info("Registering file input", :path => @path)

View file

@ -47,7 +47,6 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable
public
def register
enable_codecs
@host = Socket.gethostname
if @count.is_a?(Array)

View file

@ -88,7 +88,6 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
public
def register
enable_codecs
@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
@vhost ||= "/"

View file

@ -7,22 +7,21 @@ require "socket" # for Socket.gethostname
# By default, each event is assumed to be one line. If you
# want to join lines, you'll want to use the multiline filter.
class LogStash::Inputs::Stdin < LogStash::Inputs::Base
config_name "stdin"
plugin_status "beta"
public
def register
enable_codecs
@host = Socket.gethostname
end # def register
def run(queue)
require "ap"
while true
begin
@codec.decode($stdin.readline.chomp) do |event|
event["source"] = "stdin://#{@host}/"
line = $stdin.readline.chomp
@codec.decode(line) do |event|
event["source"] = @host
queue << event
end
rescue EOFError => ex
@ -35,7 +34,8 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
public
def teardown
$stdin.close
@logger.debug("stdin shutting down.")
$stdin.close rescue nil
finished
end # def teardown
end # class LogStash::Inputs::Stdin

View file

@ -57,7 +57,6 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
public
def register
enable_codecs
require "socket"
require "timeout"
if @ssl_enable

View file

@ -15,57 +15,23 @@ class LogStash::Outputs::Base < LogStash::Plugin
# act on messages with the same type. See any input plugin's "type"
# attribute for more.
# Optional.
config :type, :validate => :string, :default => ""
config :type, :validate => :string, :default => "", :deprecated => true
# Only handle events with all of these tags. Note that if you specify
# a type, the event must also match that type.
# Optional.
config :tags, :validate => :array, :default => []
config :tags, :validate => :array, :default => [], :deprecated => true
# Only handle events without any of these tags. Note this check is additional to type and tags.
config :exclude_tags, :validate => :array, :default => []
# Only handle events with all of these fields.
# Optional.
config :fields, :validate => :array, :deprecated => true
# Only handle events with all/any (controlled by include_any config option) of these fields.
# Optional.
config :include_fields, :validate => :array, :default => []
# Only handle events without all/any (controlled by exclude_any config option) of these fields.
# Optional.
config :exclude_fields, :validate => :array, :default => []
# Should all or any of the specified tags/include_fields be present for event to
# be handled. Defaults to all.
config :include_any, :validate => :boolean, :default => false
# Should all or any of the specified exclude_tags/exclude_fields be missing for event to
# be handled. Defaults to all.
config :exclude_any, :validate => :boolean, :default => true
# Don't send events that have @timestamp older than specified number of seconds.
config :ignore_older_than, :validate => :number, :default => 0
config :exclude_tags, :validate => :array, :default => [], :deprecated => true
# The codec used for output data
config :codec, :validate => :string, :default => 'plain'
# Optional arguments to get passed into the codec
config :codec_args, :validate => :hash, :default => {}
config :codec, :validate => :codec, :default => "plain"
public
def initialize(params)
def initialize(params={})
super
config_init(params)
@include_method = @include_any ? :any? : :all?
@exclude_method = @exclude_any ? :any? : :all?
# TODO(piavlo): Remove this once fields config will be removed
if @include_fields.empty? && !@fields.nil? && !@fields.empty?
@include_fields = @fields
end
end
public
@ -78,11 +44,6 @@ class LogStash::Outputs::Base < LogStash::Plugin
raise "#{self.class}#receive must be overidden"
end # def receive
protected
def enable_codecs
@codec = LogStash::Codecs.for(@codec).new(@codec_args)
end
public
def handle(event)
if event == LogStash::SHUTDOWN
@ -117,25 +78,6 @@ class LogStash::Outputs::Base < LogStash::Plugin
end
end
if !@include_fields.empty?
if !@include_fields.send(@include_method) {|field| event.include?(field)}
@logger.debug? and @logger.debug(["Dropping event because fields don't match #{@include_fields.inspect}", event])
return false
end
end
if !@exclude_fields.empty?
if @exclude_fields.send(@exclude_method) {|field| event.include?(field)}
@logger.debug? and @logger.debug(["Dropping event because fields contain excluded fields #{@exclude_fields.inspect}", event])
return false
end
end
if @ignore_older_than > 0 && Time.now - event.ruby_timestamp > @ignore_older_than
@logger.debug? and @logger.debug("Skipping metriks for old event", :event => event)
return
end
return true
end
end # class LogStash::Outputs::Base

View file

@ -11,7 +11,7 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
plugin_status "stable"
# Enable debugging. Tries to pretty-print the entire event object.
config :debug, :validate => :boolean
config :debug, :validate => :boolean, :default => false
# Debug output format: ruby (default), json
config :debug_format, :default => "ruby", :validate => ["ruby", "dots"]
@ -21,29 +21,9 @@ class LogStash::Outputs::Stdout < LogStash::Outputs::Base
public
def register
enable_codecs
begin
@codec.format = @message
rescue NoMethodError
end
@print_method = method(:ap) rescue method(:p)
if @debug
case @debug_format
when "ruby"
@codec.on_event do |event|
@print_method.call(event)
end
when "dots"
@codec.on_event do |event|
$stdout.write(".")
end
else
raise "unknown debug_format #{@debug_format}, this should never happen"
end
else
@codec.on_event do |event|
puts event
end
@codec.on_event do |event|
$stdout.write(event)
end
end

View file

@ -1,105 +1,60 @@
require "logstash/config/file"
#require "logstash/agent" # only needed for now for parse_config
require "logstash/namespace"
require "thread" # stdlib
require "stud/trap"
require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
class LogStash::Pipeline
class ShutdownSignal < StandardError; end
def initialize(configstr)
# hacks for now to parse a config string
config = LogStash::Config::File.new(nil, configstr)
@inputs, @filters, @outputs = parse_config(config)
@logger = Cabin::Channel.get(LogStash)
grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)
if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
# This will compile the config to ruby and evaluate the resulting code.
# The code will initialize all the plugins and define the
# filter and output methods.
code = @config.compile
# The config code is hard to represent as a log message...
# So just print it.
puts code if @logger.debug?
eval(code)
@input_to_filter = SizedQueue.new(20)
@filter_to_output = SizedQueue.new(20)
# If no filters, pipe inputs directly to outputs
if @filters.empty?
@input_to_filter = @filter_to_output
if !filters?
@filter_to_output = @input_to_filter
else
@filter_to_output = SizedQueue.new(20)
end
@logger = Cabin::Channel.get(LogStash)
(@inputs + @filters + @outputs).each do |plugin|
plugin.logger = @logger
end
@inputs.each(&:register)
@filters.each(&:register)
@outputs.each(&:register)
end # def initialize
# Parses a config and returns [inputs, filters, outputs]
def parse_config(config)
# TODO(sissel): Move this method to config/file.rb
inputs = []
filters = []
outputs = []
config.parse do |plugin|
# 'plugin' is a has containing:
# :type => the base class of the plugin (LogStash::Inputs::Base, etc)
# :plugin => the class of the plugin (LogStash::Inputs::File, etc)
# :parameters => hash of key-value parameters from the config.
type = plugin[:type].config_name # "input" or "filter" etc...
klass = plugin[:plugin]
# Create a new instance of a plugin, called like:
# -> LogStash::Inputs::File.new( params )
instance = klass.new(plugin[:parameters])
instance.logger = @logger
case type
when "input"
inputs << instance
when "filter"
filters << instance
when "output"
outputs << instance
else
msg = "Unknown config type '#{type}'"
@logger.error(msg)
raise msg
end # case type
end # config.parse
return inputs, filters, outputs
end # def parse_config
def filters?
return @filters.any?
end
def run
#start = Time.now
# one thread per input
@input_threads = @inputs.collect do |input|
Thread.new(input) do |input|
inputworker(input)
end
end
# one filterworker thread
#@filter_threads = @filters.collect do |input
# TODO(sissel): THIS IS WHERE I STOPPED WORKING
# one outputworker thread
@output_thread = Thread.new do
outputworker
end
@input_threads = []
start_inputs
start_filters if filters?
start_outputs
@logger.info("Pipeline started")
@input_threads.each(&:join)
# All input plugins have completed, send a shutdown signal.
#duration = Time.now - start
#puts "Duration: #{duration}"
@input_to_filter.push(ShutdownSignal)
# Wait for filters to stop
@filter_threads.each(&:join) if @filter_threads
# Wait for the outputs to stop
@output_thread.join
wait_inputs
if filters?
shutdown_filters
wait_filters
end
shutdown_outputs
wait_outputs
@logger.info("Pipeline shutdown complete.")
@ -107,33 +62,97 @@ class LogStash::Pipeline
return 0
end # def run
def wait_inputs
@input_threads.each(&:join)
end
def shutdown_filters
@input_to_filter.push(ShutdownSignal)
end
def wait_filters
@filter_threads.each(&:join) if @filter_threads
end
def shutdown_outputs
# nothing, filters will do this
end
def wait_outputs
# Wait for the outputs to stop
@output_thread.join
end
def start_inputs
moreinputs = []
@inputs.each do |input|
if input.threadable && input.threads > 1
(input.threads-1).times do |i|
moreinputs << input.clone
end
end
end
@inputs += moreinputs
@inputs.each do |input|
start_input(input)
end
end
def start_filters
@filter_threads = [
Thread.new { filterworker }
]
end
def start_outputs
@output_thread = Thread.new do
outputworker
end
end
def start_input(plugin)
@input_threads << Thread.new { inputworker(plugin) }
end
def inputworker(plugin)
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
begin
plugin.run(@input_to_filter)
rescue ShutdownSignal
plugin.teardown
rescue => e
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
puts e.backtrace
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e,
:stacktrace => e.backtrace.join("\n")))
else
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
end
puts e.backtrace if @logger.debug?
plugin.teardown
sleep 1
retry
end
end # def inputworker
def filterworker
LogStash::Util::set_thread_name("|worker")
@filters.each(&:register)
begin
while true
event = @input_to_filter.pop
break if event == ShutdownSignal
# Apply filters, in order, to the event.
@filters.each do |filter|
filter.execute(event)
events = []
filter(event) do |newevent|
events << newevent
end
events.each do |event|
next if event.cancelled?
@filter_to_output.push(event)
end
next if event.cancelled?
@filter_to_output.push(event)
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
@ -141,21 +160,16 @@ class LogStash::Pipeline
end
@filters.each(&:teardown)
@filter_to_output.push(ShutdownSignal)
end # def filterworker
def outputworker
LogStash::Util::set_thread_name(">output")
@outputs.each(&:register)
while true
event = @filter_to_output.pop
break if event == ShutdownSignal
@outputs.each do |output|
begin
output.receive(event)
rescue => e
@logger.error("Exception in plugin #{output.class}",
"plugin" => output.inspect, "exception" => e)
end
end # @outputs.each
output(event)
end # while true
@outputs.each(&:teardown)
end # def filterworker
@ -175,4 +189,10 @@ class LogStash::Pipeline
# No need to send the ShutdownSignal to the filters/outputs nor to wait for
# the inputs to finish, because in the #run method we wait for that anyway.
end # def shutdown
def plugin(plugin_type, name, *args)
args << {} if args.empty?
klass = LogStash::Plugin.lookup(plugin_type, name)
return klass.new(*args)
end
end # class Pipeline

View file

@ -4,8 +4,6 @@ require "logstash/config/mixin"
require "cabin"
class LogStash::Plugin
class ConfigurationError < StandardError; end
attr_accessor :params
attr_accessor :logger
@ -119,4 +117,28 @@ class LogStash::Plugin
return "<#{self.class.name} --->"
end
end
# Look up a plugin by type and name.
public
def self.lookup(type, name)
# Try to load the plugin requested.
# For example, load("filter", "grok") will try to require
# logstash/filters/grok
#
# And expects to find LogStash::Filters::Grok (or something similar based
# on pattern matching
path = "logstash/#{type}s/#{name}"
require(path)
base = LogStash.const_get("#{type.capitalize}s")
klass_sym = base.constants.find { |c| c.to_s =~ /^#{Regexp.quote(name)}$/i }
raise LoadError if klass_sym.nil?
klass = base.const_get(klass_sym)
return klass
rescue LoadError => e
puts e.backtrace
raise LogStash::PluginLoadingError,
I18n.t("logstash.pipeline.plugin-loading-error", :type => type, :name => name, :path => path)
end # def load
end # class LogStash::Plugin

View file

@ -1,6 +1,8 @@
$START = Time.now
$DEBUGLIST = (ENV["DEBUG"] || "").split(",")
Thread.abort_on_exception = true
if ENV["PROFILE_BAD_LOG_CALLS"] || $DEBUGLIST.include?("log")
# Set PROFILE_BAD_LOG_CALLS=1 in your environment if you want
# to track down logger calls that cause performance problems
@ -89,12 +91,6 @@ class LogStash::Runner
command = args.shift
commands = {
"version" => lambda { emit_version(args) },
"agent" => lambda do
require "logstash/agent"
agent = LogStash::Agent.new
@runners << agent
return agent.run(args)
end,
"web" => lambda do
require "logstash/web/runner"
web = LogStash::Web::Runner.new
@ -171,8 +167,8 @@ class LogStash::Runner
require "pry"
return binding.pry
end,
"agent2" => lambda do
require "logstash/agent2"
"agent" => lambda do
require "logstash/agent"
# Hack up a runner
runner = Class.new do
def initialize(args)
@ -180,7 +176,7 @@ class LogStash::Runner
end
def run
@thread = Thread.new do
@result = LogStash::Agent2.run($0, @args)
@result = LogStash::Agent.run($0, @args)
end
end
def wait

View file

@ -65,5 +65,52 @@ module LogStash::Util
dst[name] = svalue
end
end
return dst
end # def self.hash_merge
# Merge hash 'src' into 'dst' nondestructively
#
# Duplicate keys will become array values
# Arrays merged will simply be appended.
#
# [ src["foo"], dst["foo"] ]
def self.hash_merge_with_dups(dst, src)
src.each do |name, svalue|
if dst.include?(name)
dvalue = dst[name]
if dvalue.is_a?(Hash) && svalue.is_a?(Hash)
dvalue = hash_merge(dvalue, svalue)
elsif svalue.is_a?(Array)
if dvalue.is_a?(Array)
# merge arrays without duplicates.
dvalue += svalue
else
dvalue = [dvalue] + svalue
end
else
if dvalue.is_a?(Array)
dvalue << svalue unless dvalue.include?(svalue)
else
dvalue = [dvalue, svalue] unless dvalue == svalue
end
end
dst[name] = dvalue
else
# dst doesn't have this key, just set it.
dst[name] = svalue
end
end
return dst
end # def self.hash_merge
def self.hash_merge_many(*hashes)
dst = {}
hashes.each do |hash|
hash_merge_with_dups(dst, hash)
end
return dst
end # def hash_merge_many
end # module LogStash::Util

View file

@ -22,13 +22,40 @@ en:
A plugin had an unrecoverable error. Will restart this plugin.
Plugin: %{plugin}
Error: %{error}
worker-error-debug: |-
A plugin had an unrecoverable error. Will restart this plugin.
Plugin: %{plugin}
Error: %{error}
Stack: %{stacktrace}
plugin-loading-error: >-
Couldn't find any %{type} plugin named '%{name}'. Are you
sure this is correct?
plugin-type-loading-error: >-
Could not find any plugin type named '%{type}'. Check for typos.
Valid plugin types are 'input' 'filter' and 'output'
plugin:
unsupported: >-
Using unsupported %{type} plugin '%{name}'. This plugin isn't well
supported by the commnity and likely has no maintainer.
For more information on plugin statuses, see
http://logstash.net/docs/%{LOGSTASH_VERSION}/plugin_status
experimental: >-
Using experimental %{type} plugin '%{name}'. This plugin should work,
but would benefit from use by folks like you. Please let us know if you
find bugs or have suggestions on how to improve this plugin. For more
information on plugin statuses, see
http://logstash.net/docs/%{LOGSTASH_VERSION}/plugin_status
beta: >-
Using beta %{type} plugin '%{name}'.
For more information on plugin statuses, see
http://logstash.net/docs/%{LOGSTASH_VERSION}/plugin_status
agent:
missing-configuration: |-
missing-configuration: >-
No configuration file was specified. Perhaps you forgot to provide
the '-f yourlogstash.conf' flag?
error: |-
error: >-
Error: %{error}
interrupted: |-
interrupted: >-
Interrupt received. Shutting down the pipeline.
configuration:
file-not-found: |-
@ -54,9 +81,9 @@ en:
...
}
}
invalid_plugin_settings: |-
invalid_plugin_settings: >-
Something is wrong with your configuration.
plugin_path_missing: |-
plugin_path_missing: >-
You specified a plugin path that does not exist: %{path}
no_plugins_found: |-
Could not find any plugins in "%{path}"

43
spec/codecs/multiline.rb Normal file
View file

@ -0,0 +1,43 @@
require "logstash/codecs/multiline"
require "logstash/event"
require "insist"
describe LogStash::Codecs::Multiline do
context "#decode" do
it "should be able to handle multiline events with additional lines space-indented" do
codec = LogStash::Codecs::Multiline.new("pattern" => "^\\s", "what" => "previous")
lines = [ "hello world", " second line", "another first line" ]
events = []
lines.each do |line|
codec.decode(line) do |event|
events << event
end
end
codec.flush { |e| events << e }
insist { events.size } == 2
insist { events[0]["message"] } == "hello world\n second line"
insist { events[1]["message"] } == "another first line"
end
it "should allow grok patterns to be used" do
codec = LogStash::Codecs::Multiline.new(
"pattern" => "^%{NUMBER} %{TIME}",
"negate" => true,
"what" => "previous"
)
lines = [ "120913 12:04:33 first line", "second line", "third line" ]
events = []
lines.each do |line|
codec.decode(line) do |event|
events << event
end
end
codec.flush { |e| events << e }
insist { events.size } == 1
insist { events.first["message"] } == lines.join("\n")
end
end
end

View file

@ -3,26 +3,26 @@ require "insist"
describe LogStash::Event do
subject do
event = LogStash::Event.new
event.timestamp = Time.at(1356998400) #"2013-01-01T00:00:00.000Z"
event.type = "sprintf"
event.message = "hello world"
event.tags = [ "tag1" ]
event.source = "/home/foo"
event.to_hash.merge!(
LogStash::Event.new(
"@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
"type" => "sprintf",
"message" => "hello world",
"tags" => [ "tag1" ],
"source" => "/home/foo",
"a" => "b",
"c" => { "d" => "f", "e.f" => "g" },
"c.d" => "e",
"f.g" => { "h" => "i" },
"c" => {
"d" => "f",
"e" => {"f" => "g"}
},
"f" => { "g" => { "h" => "i" } },
"j" => {
"k1" => "v",
"k2" => [ "w", "x" ],
"k3.4" => "m",
5 => 6,
"5" => 7
"k1" => "v",
"k2" => [ "w", "x" ],
"k3" => {"4" => "m"},
5 => 6,
"5" => 7
}
)
next event
end
context "#sprintf" do
@ -53,19 +53,24 @@ describe LogStash::Event do
end
it "should fetch fields" do
insist { subject["a"] } == "b"
insist { subject["c.d"] } == "e"
insist { subject['c']['d'] } == "f"
end
it "should fetch deep fields" do
insist { subject["[j][k1]"] } == "v"
insist { subject["[c][d]"] } == "f"
insist { subject["[f.g][h]"] } == "i"
insist { subject["[j][k3.4]"] } == "m"
insist { subject["[j][5]"] } == 7
insist { subject['[f][g][h]'] } == "i"
insist { subject['[j][k3][4]'] } == "m"
insist { subject['[j][5]'] } == 7
end
end
context "#append" do
it "should append strings to an array" do
subject.append(LogStash::Event.new("message" => "another thing"))
insist { subject["message"] } == [ "hello world", "another thing" ]
end
it "should concatenate tags" do
subject.append(LogStash::Event.new("tags" => [ "tag2" ]))
insist { subject["tags"] } == [ "tag1", "tag2" ]
@ -73,11 +78,11 @@ describe LogStash::Event do
context "when event field is nil" do
it "should add single value as string" do
subject.append(LogStash::Event.new("field1" => "append1"))
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == "append1"
end
it "should add multi values as array" do
subject.append(LogStash::Event.new("field1" => [ "append1","append2" ]))
subject.append(LogStash::Event.new({"field1" => [ "append1","append2" ]}))
insist { subject[ "field1" ] } == [ "append1","append2" ]
end
end
@ -86,19 +91,19 @@ describe LogStash::Event do
before { subject[ "field1" ] = "original1" }
it "should append string to values, if different from current" do
subject.append(LogStash::Event.new("field1" => "append1"))
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
it "should not change value, if appended value is equal current" do
subject.append(LogStash::Event.new("field1" => "original1"))
subject.append(LogStash::Event.new({"field1" => "original1"}))
insist { subject[ "field1" ] } == "original1"
end
it "should concatenate values in an array" do
subject.append(LogStash::Event.new("field1" => [ "append1" ]))
subject.append(LogStash::Event.new({"field1" => [ "append1" ]}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
it "should join array, removing duplicates" do
subject.append(LogStash::Event.new("field1" => [ "append1","original1" ]))
subject.append(LogStash::Event.new({"field1" => [ "append1","original1" ]}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
end
@ -106,15 +111,15 @@ describe LogStash::Event do
before { subject[ "field1" ] = [ "original1", "original2" ] }
it "should append string values to array, if not present in array" do
subject.append(LogStash::Event.new("field1" => "append1"))
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == [ "original1", "original2", "append1" ]
end
it "should not append string values, if the array already contains it" do
subject.append(LogStash::Event.new("field1" => "original1"))
subject.append(LogStash::Event.new({"field1" => "original1"}))
insist { subject[ "field1" ] } == [ "original1", "original2" ]
end
it "should join array, removing duplicates" do
subject.append(LogStash::Event.new("field1" => [ "append1","original1" ]))
subject.append(LogStash::Event.new({"field1" => [ "append1","original1" ]}))
insist { subject[ "field1" ] } == [ "original1", "original2", "append1" ]
end
end

View file

@ -1,135 +0,0 @@
require "logstash/event"
require "insist"
describe LogStash::Event do
before :each do
@event = LogStash::Event.new({
"a" => "b",
"c" => {
"d" => "f",
"e" => {"f" => "g"}
},
"f" => {"g" => {
"h" => "i"
}
},
"j" => {
"k1" => "v",
"k2" => [
"w",
"x"
],
"k3" => {"4" => "m"},
5 => 6,
"5" => 7
}
})
@event.timestamp = "2013-01-01T00:00:00.000Z"
@event["type"] = "sprintf"
@event["message"] = "hello world"
@event["tags"] = [ "tag1" ]
@event["source"] = "/home/foo"
end
subject { @event }
context "#sprintf" do
it "should report a unix timestamp for %{+%s}" do
insist { @event.sprintf("%{+%s}") } == "1356998400"
end
it "should report a time with %{+format} syntax" do
insist { @event.sprintf("%{+YYYY}") } == "2013"
insist { @event.sprintf("%{+MM}") } == "01"
insist { @event.sprintf("%{+HH}") } == "00"
end
it "should report fields with %{field} syntax" do
insist { @event.sprintf("%{type}") } == "sprintf"
insist { @event.sprintf("%{message}") } == subject["message"]
end
it "should print deep fields" do
insist { @event.sprintf("%{[j][k1]}") } == "v"
insist { @event.sprintf("%{[j][k2][0]}") } == "w"
end
end
context "#[]" do
it "should fetch data" do
insist { @event["type"] } == "sprintf"
end
it "should fetch fields" do
insist { @event["a"] } == "b"
insist { @event['c']['d'] } == "f"
end
it "should fetch deep fields" do
insist { @event["[j][k1]"] } == "v"
insist { @event["[c][d]"] } == "f"
insist { @event['[f][g][h]'] } == "i"
insist { @event['[j][k3][4]'] } == "m"
insist { @event['[j][5]'] } == 7
end
end
context "#append" do
it "should append message with \\n" do
subject.append(LogStash::Event.new("message" => "hello world"))
insist { subject["message"] } == "hello world\nhello world"
end
it "should concatenate tags" do
subject.append(LogStash::Event.new("tags" => [ "tag2" ]))
insist { subject["tags"] } == [ "tag1", "tag2" ]
end
context "when event field is nil" do
it "should add single value as string" do
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == "append1"
end
it "should add multi values as array" do
subject.append(LogStash::Event.new({"field1" => [ "append1","append2" ]}))
insist { subject[ "field1" ] } == [ "append1","append2" ]
end
end
context "when event field is a string" do
before { subject[ "field1" ] = "original1" }
it "should append string to values, if different from current" do
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
it "should not change value, if appended value is equal current" do
subject.append(LogStash::Event.new({"field1" => "original1"}))
insist { subject[ "field1" ] } == [ "original1" ]
end
it "should concatenate values in an array" do
subject.append(LogStash::Event.new({"field1" => [ "append1" ]}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
it "should join array, removing duplicates" do
subject.append(LogStash::Event.new({"field1" => [ "append1","original1" ]}))
insist { subject[ "field1" ] } == [ "original1", "append1" ]
end
end
context "when event field is an array" do
before { subject[ "field1" ] = [ "original1", "original2" ] }
it "should append string values to array, if not present in array" do
subject.append(LogStash::Event.new({"field1" => "append1"}))
insist { subject[ "field1" ] } == [ "original1", "original2", "append1" ]
end
it "should not append string values, if the array already contains it" do
subject.append(LogStash::Event.new({"field1" => "original1"}))
insist { subject[ "field1" ] } == [ "original1", "original2" ]
end
it "should join array, removing duplicates" do
subject.append(LogStash::Event.new({"field1" => [ "append1","original1" ]}))
insist { subject[ "field1" ] } == [ "original1", "original2", "append1" ]
end
end
end
end

View file

@ -15,11 +15,11 @@ describe LogStash::Filters::Clone do
}
CONFIG
sample "hello world" do
insist { subject}.is_a? Array
sample("message" => "hello world", "type" => "original") do
insist { subject }.is_a? Array
insist { subject.length } == 4
subject.each_with_index do |s,i|
if i == 3 # last one should be 'original'
if i == 0 # last one should be 'original'
insist { s["type"] } == "original"
else
insist { s["type"]} == "clone"
@ -45,22 +45,23 @@ describe LogStash::Filters::Clone do
sample("type" => "nginx-access", "tags" => ["TESTLOG"], "message" => "hello world") do
insist { subject }.is_a? Array
insist { subject.length } == 3
#All clones go through filter_matched
insist { subject[0].type } == "nginx-access-clone1"
reject { subject[0].tags }.include? "TESTLOG"
insist { subject[0].tags }.include? "RABBIT"
insist { subject[0].tags }.include? "NO_ES"
insist { subject[1].type } == "nginx-access-clone2"
insist { subject[0].type } == "nginx-access"
#Initial event remains unchanged
insist { subject[0].tags }.include? "TESTLOG"
reject { subject[0].tags }.include? "RABBIT"
reject { subject[0].tags }.include? "NO_ES"
#All clones go through filter_matched
insist { subject[1].type } == "nginx-access-clone1"
reject { subject[1].tags }.include? "TESTLOG"
insist { subject[1].tags }.include? "RABBIT"
insist { subject[1].tags }.include? "NO_ES"
insist { subject[2].type } == "nginx-access"
#Initial event remains unchanged
insist { subject[2].tags }.include? "TESTLOG"
reject { subject[2].tags }.include? "RABBIT"
reject { subject[2].tags }.include? "NO_ES"
insist { subject[2].type } == "nginx-access-clone2"
reject { subject[2].tags }.include? "TESTLOG"
insist { subject[2].tags }.include? "RABBIT"
insist { subject[2].tags }.include? "NO_ES"
end
end
end

View file

@ -1,7 +1,8 @@
require "test_utils"
require "logstash/filters/date"
describe LogStash::Filters::Date do
puts "Skipping date performance tests because this ruby is not jruby" if RUBY_ENGINE != "jruby"
RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do
extend LogStash::RSpec
describe "parsing with ISO8601" do
@ -168,7 +169,7 @@ describe LogStash::Filters::Date do
config <<-'CONFIG'
filter {
date {
match => [ t, TAI64N ]
match => [ "t", TAI64N ]
}
}
CONFIG

View file

@ -1,7 +1,8 @@
require "test_utils"
require "logstash/filters/date"
describe LogStash::Filters::Date do
puts "Skipping date tests because this ruby is not jruby" if RUBY_ENGINE != "jruby"
RUBY_ENGINE == "jruby" and describe LogStash::Filters::Date do
extend LogStash::RSpec
describe "performance test of java syntax parsing" do

View file

@ -102,7 +102,7 @@ describe LogStash::Filters::Grok do
end
end
describe "processing fields other than @message" do
describe "processing selected fields" do
config <<-CONFIG
filter {
grok {

View file

@ -33,7 +33,6 @@ describe LogStash::Filters::Json do
CONFIG
sample '{ "hello": "world", "list": [ 1, 2, 3 ], "hash": { "k": "v" } }' do
puts subject.to_json
insist { subject["data"]["hello"] } == "world"
insist { subject["data"]["list" ] } == [1,2,3]
insist { subject["data"]["hash"] } == { "k" => "v" }

View file

@ -1,91 +0,0 @@
require "test_utils"
require "logstash/filters/multiline"
describe LogStash::Filters::Multiline do
extend LogStash::RSpec
describe "simple multiline" do
config <<-CONFIG
filter {
multiline {
pattern => "^\\s"
what => previous
}
}
CONFIG
sample [ "hello world", " second line", "another first line" ] do
insist { subject.length } == 2
insist { subject[0]["message"] } == "hello world\n second line"
insist { subject[1]["message"] } == "another first line"
end
end
describe "multiline using grok patterns" do
config <<-CONFIG
filter {
multiline {
pattern => "^%{NUMBER} %{TIME}"
negate => true
what => previous
}
}
CONFIG
sample [ "120913 12:04:33 first line", "second line", "third line" ] do
reject { subject }.is_a? Array
insist { subject["message"] } == "120913 12:04:33 first line\nsecond line\nthird line"
end
end
describe "multiline safety among multiple concurrent streams" do
config <<-CONFIG
filter {
multiline {
pattern => "^\\s"
what => previous
}
}
CONFIG
multiline_event = [
"hello world",
]
count = 20
stream_count = 2
id = 0
eventstream = count.times.collect do |i|
stream = "stream#{i % stream_count}"
(
[ "hello world #{stream}" ] \
+ rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" }
) .collect do |line|
{ "message" => line, "source" => stream, "type" => stream, "event" => i }
end
end
alllines = eventstream.flatten
# Take whole events and mix them with other events (maintain order)
# This simulates a mixing of multiple streams being received
# and processed. It requires that the multiline filter correctly partition
# by stream_identity
concurrent_stream = eventstream.flatten.count.times.collect do
index = rand(eventstream.count)
event = eventstream[index].shift
eventstream.delete_at(index) if eventstream[index].empty?
next event
end
sample concurrent_stream do
insist { subject.count } == count
subject.each_with_index do |event, i|
#puts "#{i}/#{event["event"]}: #{event.to_json}"
#insist { event.type } == stream
#insist { event.source } == stream
insist { event["message"].split("\n").first } =~ /hello world /
end
end
end
end

View file

@ -159,5 +159,20 @@ describe LogStash::Filters::Mutate do
reject { subject.fields }.include?("hello")
end
end
describe "convert should work on nested fields" do
config <<-CONFIG
filter {
mutate {
convert => [ "[foo][bar]", "integer" ]
}
}
CONFIG
sample({ "foo" => { "bar" => "1000" } }) do
insist { subject["[foo][bar]"] } == 1000
insist { subject["[foo][bar]"] }.is_a?(Fixnum)
end
end
end

View file

@ -218,200 +218,4 @@ describe LogStash::Filters::NOOP do
reject { subject }.include?("go")
end
end
describe "checking AND include_any logic on tags filter" do
config <<-CONFIG
filter {
noop {
tags => ["two", "three", "four"]
include_any => false
add_tag => ["match"]
}
}
CONFIG
sample("tags" => ["one", "two", "three", "four", "five"]) do
insist { subject["tags"] }.include?("match")
end
sample("tags" => ["one", "two", "four", "five"]) do
reject { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.nil?
end
end
describe "checking OR include_any logic on tags filter" do
config <<-CONFIG
filter {
noop {
tags => ["two", "three", "four"]
include_any => true
add_tag => ["match"]
}
}
CONFIG
sample("tags" => ["one1", "two2", "three", "four4", "five5"]) do
insist { subject["tags"] }.include?("match")
end
sample("tags" => ["one1", "two2", "three3", "four4", "five5"]) do
reject { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.nil?
end
end
describe "checking AND include_any logic on include_fields filter" do
config <<-CONFIG
filter {
noop {
include_fields => ["two", "two", "three", "three", "four", "four"]
include_any => false
add_tag => ["match"]
}
}
CONFIG
sample("one" => "1", "two" => "2", "three" => "3", "four" => "4", "five" => "5") do
insist { subject["tags"] }.include?("match")
end
sample("one" => "1", "two" => "2", "four" => "4", "five" => "5") do
insist { subject["tags"] }.nil?
end
sample({}) do
insist { subject["tags"] }.nil?
end
end
describe "checking OR include_any logic on include_fields filter" do
config <<-CONFIG
filter {
noop {
include_fields => ["two", "two", "three", "three", "four", "four"]
include_any => true
add_tag => ["match"]
}
}
CONFIG
sample("one1" => "1", "two2" => "2", "three" => "3", "four4" => "4", "five5" => "5") do
insist { subject["tags"] }.include?("match")
end
sample("one1" => "1", "two2" => "2", "three3" => "3", "four4" => "4", "five5" => "5") do
insist { subject["tags"] }.nil?
end
sample({}) do
insist { subject["tags"] }.nil?
end
end
describe "checking AND exclude_any logic on exclude_tags filter" do
config <<-CONFIG
filter {
noop {
exclude_tags => ["two", "three", "four"]
exclude_any => false
add_tag => ["match"]
}
}
CONFIG
sample("tags" => ["one", "two", "three", "four", "five"]) do
reject { subject["tags"] }.include?("match")
end
sample("tags" => ["one", "two", "four", "five"]) do
insist { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.include?("match")
end
end
describe "checking OR exclude_any logic on exclude_tags filter" do
config <<-CONFIG
filter {
noop {
exclude_tags => ["two", "three", "four"]
exclude_any => true
add_tag => ["match"]
}
}
CONFIG
sample("tags" => ["one", "two", "three", "four", "five"]) do
reject { subject["tags"] }.include?("match")
end
sample("tags" => ["one1", "two2", "three", "four4", "five5"]) do
reject { subject["tags"] }.include?("match")
end
sample("tags" => ["one1", "two2", "three3", "four4", "five5"]) do
insist { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.include?("match")
end
end
describe "checking AND exclude_any logic on exclude_fields filter" do
config <<-CONFIG
filter {
noop {
exclude_fields => ["two", "two", "three", "three", "four", "four"]
exclude_any => false
add_tag => ["match"]
}
}
CONFIG
sample("one" => "1", "two" => "2", "three" => "3", "four" => "4", "five" => "5") do
insist { subject["tags"] }.nil?
end
sample("one" => "1", "two" => "2", "four" => "4", "five" => "5") do
insist { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.include?("match")
end
end
describe "checking OR exclude_any logic on exclude_fields filter" do
config <<-CONFIG
filter {
noop {
exclude_fields => ["two", "two", "three", "three", "four", "four"]
exclude_any => true
add_tag => ["match"]
}
}
CONFIG
sample("one1" => "1", "two2" => "2", "three" => "3", "four4" => "4", "five5" => "5") do
insist { subject["tags"] }.nil?
end
sample("one1" => "1", "two2" => "2", "three3" => "3", "four4" => "4", "five5" => "5") do
insist { subject["tags"] }.include?("match")
end
sample({}) do
insist { subject["tags"] }.include?("match")
end
end
end

View file

@ -1,5 +1,6 @@
require "insist"
require "logstash/agent"
require "logstash/pipeline"
require "logstash/event"
require "logstash/logging"
require "insist"
@ -42,61 +43,34 @@ module LogStash
end
def sample(event, &block)
default_type = @default_type || "default"
default_tags = @default_tags || nil
config = get_config
agent = LogStash::Agent.new
agent.instance_eval { parse_options(["--quiet"]) }
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }
[@inputs, @filters, @outputs].flatten.each do |plugin|
plugin.logger = $logger
plugin.logger.level = :error
plugin.register
end
pipeline = LogStash::Pipeline.new(@config_str)
filters = @filters
name = event.to_s
name = event.is_a?(String) ? event : event.to_json
name = name[0..50] + "..." if name.length > 50
describe "\"#{name}\"" do
before :all do
before :each do
# Coerce to an array of LogStash::Event
event = [event] unless event.is_a?(Array)
event = event.collect do |e|
if e.is_a?(String)
e = { "message" => e, "type" => default_type }
e["tags"] = default_tags.clone unless default_tags.nil?
end
e = { "message" => e } if e.is_a?(String)
next LogStash::Event.new(e)
end
results = []
count = 0
pipeline.instance_eval { @filters.each(&:register) }
event.each do |e|
filters.each do |filter|
next if e.cancelled?
filter.filter(e) do |newevent|
results << newevent unless e.cancelled?
end
extra = []
pipeline.filter(e) do |new_event|
extra << new_event
end
results << e unless e.cancelled?
results << e if !e.cancelled?
results += extra.reject(&:cancelled?)
end
# do any flushing.
filters.each_with_index do |filter, i|
if filter.respond_to?(:flush)
# get any event from flushing
list = filter.flush
if list
list.each do |e|
filters[i+1 .. -1].each do |f|
f.filter(e)
end
results << e unless e.cancelled?
end
end # if list
end # filter.respond_to?(:flush)
end # filters.each_with_index
# TODO(sissel): pipeline flush needs to be implemented.
#results += pipeline.flush
@results = results
end # before :all
@ -115,28 +89,17 @@ module LogStash
end
end # def input
def get_config
if @is_yaml
require "logstash/config/file/yaml"
config = LogStash::Config::File::Yaml.new(nil, @config_str)
else
require "logstash/config/file"
config = LogStash::Config::File.new(nil, @config_str)
end
end # def get_config
def agent(&block)
@agent_count ||= 0
require "logstash/agent"
require "logstash/pipeline"
# scoping is hard, let's go shopping!
config_str = @config_str
describe "agent(#{@agent_count}) #{caller[1]}" do
before :each do
start = ::Time.now
@agent = LogStash::Agent.new
@agent.run(["--quiet", "-e", config_str])
@agent.wait
pipeline = LogStash::Pipeline.new(config_str)
pipeline.run
@duration = ::Time.now - start
end
it("looks good", &block)