From e308e16067c7f60196f421d9a51e0378ea884457 Mon Sep 17 00:00:00 2001 From: Pete Fritchman Date: Sun, 13 Feb 2011 16:25:22 -0800 Subject: [PATCH 1/4] first cut at jruby/threaded logstash --- bin/logstash | 3 +- etc/logstash-jruby-test.yaml | 13 ++++ etc/logstash-reader.yaml | 14 ++-- lib/logstash/agent.rb | 145 ++++++++++++++++++++--------------- lib/logstash/inputs.rb | 20 ++--- lib/logstash/inputs/base.rb | 30 ++++---- lib/logstash/inputs/file.rb | 91 ++++++++++++---------- lib/logstash/multiqueue.rb | 17 ++++ lib/logstash/outputs.rb | 4 +- lib/logstash/outputs/base.rb | 3 +- 10 files changed, 201 insertions(+), 139 deletions(-) create mode 100644 etc/logstash-jruby-test.yaml create mode 100644 lib/logstash/multiqueue.rb diff --git a/bin/logstash b/bin/logstash index ca25c7b0e..f14c247dc 100755 --- a/bin/logstash +++ b/bin/logstash @@ -1,9 +1,8 @@ -#!/usr/bin/env ruby +#!/usr/bin/env jruby $: << File.dirname($0) + "/../lib" require "rubygems" -require "eventmachine" require "logstash/agent" require "optparse" require "yaml" diff --git a/etc/logstash-jruby-test.yaml b/etc/logstash-jruby-test.yaml new file mode 100644 index 000000000..0965774fa --- /dev/null +++ b/etc/logstash-jruby-test.yaml @@ -0,0 +1,13 @@ +# Example config that reads parsed logs from AMQP and prints to stdout +inputs: + linux-syslog: + - file:///var/log/messages +filters: +- grep: + linux-syslog: + - match: + @message: test + add_fields: + filter_test: hello world +outputs: + - stdout:/// diff --git a/etc/logstash-reader.yaml b/etc/logstash-reader.yaml index 18c2c8924..0965774fa 100644 --- a/etc/logstash-reader.yaml +++ b/etc/logstash-reader.yaml @@ -1,9 +1,13 @@ # Example config that reads parsed logs from AMQP and prints to stdout inputs: - all: - - amqp://localhost/topic/parsedlogs -#filters: - #field: - #- progname.include?("tester") + linux-syslog: + - file:///var/log/messages +filters: +- grep: + linux-syslog: + - match: + @message: test + add_fields: + filter_test: hello world outputs: - stdout:/// diff --git a/lib/logstash/agent.rb b/lib/logstash/agent.rb index c0a843d89..b0e60ca19 100644 --- a/lib/logstash/agent.rb +++ b/lib/logstash/agent.rb @@ -1,10 +1,13 @@ -require "eventmachine" -require "eventmachine-tail" require "logstash/filters" require "logstash/inputs" require "logstash/logging" +require "logstash/multiqueue" require "logstash/namespace" require "logstash/outputs" +require "java" +require "uri" + +JThread = java.lang.Thread # Collect logs, ship them out. class LogStash::Agent @@ -18,6 +21,7 @@ class LogStash::Agent log_to(STDERR) @config = config + @threads = {} @outputs = [] @inputs = [] @filters = [] @@ -25,6 +29,8 @@ class LogStash::Agent # - list of logs to monitor # - log config # - where to ship to + + Thread::abort_on_exception = true end # def initialize public @@ -32,80 +38,100 @@ class LogStash::Agent @logger = LogStash::Logger.new(target) end # def log_to - # Register any event handlers with EventMachine - # Technically, this agent could listen for anything (files, sockets, amqp, - # stomp, etc). public - def register - # TODO(sissel): warn when no inputs and no outputs are defined. - # TODO(sissel): Refactor this madness into a config lib - - if (["inputs", "outputs"] & @config.keys).length == 0 - $stderr.puts "No inputs or no outputs configured. This probably isn't what you want." + def run + if @config["inputs"].length == 0 or @config["outputs"].length == 0 + raise "Must have both inputs and outputs configured." end - # Register input and output stuff - inputs = @config["inputs"] - inputs.each do |value| - # If 'url' is an array, then inputs is a hash and the key is the type - if inputs.is_a?(Hash) - type, urls = value - else - raise "config error, no type for url #{urls.inspect}" - end + # XXX we should use a SizedQueue here (w/config params for size) + filter_queue = Queue.new + output_queue = MultiQueue.new + # Register input and output stuff + input_configs = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } } + @config["inputs"].each do |url_type, urls| # url could be a string or an array. urls = [urls] if !urls.is_a?(Array) - - urls.each do |url| - @logger.debug("Using input #{url} of type #{type}") - input = LogStash::Inputs.from_url(url, type) { |event| receive(event) } - input.logger = @logger - input.register - @inputs << input + urls.each do |url_str| + url = URI.parse(url_str) + input_type = url.scheme + input_configs[input_type][url_type] = url end end # each input - if @config.include?("filters") - filters = @config["filters"] - filters.collect { |x| x.to_a[0] }.each do |filter| - name, value = filter - @logger.debug("Using filter #{name} => #{value.inspect}") - filter = LogStash::Filters.from_name(name, value) - filter.logger = @logger - filter.register - @filters << filter - end # each filter - end # if we have filters + input_configs.each do |input_type, config| + if @config.include?("filters") + queue = filter_queue + else + queue = output_queue + end + input = LogStash::Inputs.from_name(input_type, config, queue) + @threads["input/#{input_type}"] = Thread.new do + JThread.currentThread().setName("input/#{input_type}") + input.run + end + end + # Create N filter-worker threads + if @config.include?("filters") + 3.times do |n| + @threads["worker/filter/#{n}"] = Thread.new do + JThread.currentThread().setName("worker/filter/#{n}") + puts "top of worker/filter/#{n} thread" + filters = [] + + @config["filters"].collect { |x| x.to_a[0] }.each do |filter| + name, value = filter + @logger.info("Using filter #{name} => #{value.inspect}") + filter = LogStash::Filters.from_name(name, value) + filter.logger = @logger + filter.register + filters << filter + end + + while event = filter_queue.pop + filters.each do |filter| + filter.filter(event) + if event.cancelled? + @logger.debug({:message => "Event cancelled", + :event => event, + :filter => filter.class, + }) + break + end + end # filters.each + + output_queue.push(event) unless event.cancelled? + end # event pop + end # Thread + end # N.times + end # if @config.include?("filters") + + # Create output threads @config["outputs"].each do |url| - @logger.debug("Using output #{url}") - output = LogStash::Outputs.from_url(url) - output.logger = @logger - output.register - @outputs << output - end # each output + queue = Queue.new + @threads["outputs/#{url}"] = Thread.new do + JThread.currentThread().setName("output:#{url}") + output = LogStash::Outputs.from_url(url) + while event = queue.pop + output.receive(event) + end + end # Thread + output_queue.add_queue(queue) + end # Register any signal handlers - register_signal_handler + #register_signal_handler + + while sleep 5 + end end # def register - public - def run(&block) - EventMachine.run do - self.register - yield if block_given? - end # EventMachine.run - end # def run - public def stop - # TODO(sissel): Stop inputs, fluch outputs, wait for finish, + # TODO(petef): Stop inputs, fluch outputs, wait for finish, # then stop the event loop - EventMachine.stop_event_loop - - # EventMachine has no default way to indicate a 'stopping' state. - $EVENTMACHINE_STOPPING = true end # def stop protected @@ -164,8 +190,7 @@ class LogStash::Agent end when :INT @logger.warn("SIGINT received. Shutting down.") - EventMachine::stop_event_loop - # TODO(sissel): Should have input/output/filter register shutdown + # TODO(petef): Should have input/output/filter register shutdown # hooks. end # case msg end # @sigchannel.subscribe diff --git a/lib/logstash/inputs.rb b/lib/logstash/inputs.rb index 02919eebd..1cb365c5e 100644 --- a/lib/logstash/inputs.rb +++ b/lib/logstash/inputs.rb @@ -3,21 +3,11 @@ require "logstash/ruby_fixes" require "uri" module LogStash::Inputs - # Given a URL, try to load the class that supports it. - # That is, if we have an input of "foo://blah/" then - # we will try to load logstash/inputs/foo and will - # expect a class LogStash::Inputs::Foo public - def self.from_url(url, type, &block) - # Assume file paths if we start with "/" - url = "file://#{url}" if url.start_with?("/") - - uri = URI.parse(url) - # TODO(sissel): Add error handling - # TODO(sissel): Allow plugin paths - klass = uri.scheme.capitalize - file = uri.scheme.downcase + def self.from_name(type, configs, output_queue) + klass = type.capitalize + file = type.downcase require "logstash/inputs/#{file}" - LogStash::Inputs.const_get(klass).new(uri, type, &block) - end # def from_url + LogStash::Inputs.const_get(klass).new(configs, output_queue) + end # def from_name end # module LogStash::Inputs diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index e025c5a22..42f9e5945 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -7,22 +7,24 @@ class LogStash::Inputs::Base attr_accessor :logger public - def initialize(url, type, config={}, &block) + def initialize(configs, output_queue) @logger = LogStash::Logger.new(STDERR) - @url = url - @url = URI.parse(url) if url.is_a? String - @config = config - @callback = block - @type = type - @tags = [] + @configs = configs + @output_queue = output_queue + #@url = url + #@url = URI.parse(url) if url.is_a? String + #@config = config + #@callback = block + #@type = type + #@tags = [] - @urlopts = {} - if @url.query - @urlopts = CGI.parse(@url.query) - @urlopts.each do |k, v| - @urlopts[k] = v.last if v.is_a?(Array) - end - end + #@urlopts = {} + #if @url.query + # @urlopts = CGI.parse(@url.query) + # @urlopts.each do |k, v| + # @urlopts[k] = v.last if v.is_a?(Array) + # end + #end end # def initialize public diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index 6c5cf1a7f..bca23a733 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -1,53 +1,66 @@ -require "eventmachine-tail" +require "file/tail" require "logstash/inputs/base" require "logstash/namespace" require "socket" # for Socket.gethostname class LogStash::Inputs::File < LogStash::Inputs::Base public - def initialize(url, type, config={}, &block) + def initialize(configs, output_queue) super - # Hack the hostname into the url. - # This works since file:// urls don't generally have a host in it. - @url.host = Socket.gethostname + @output_queue = output_queue + @file_threads = {} end # def initialize public - def register - @logger.info("Registering #{@url}") - EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60, - exclude=[], receiver=self) - end # def register - - public - def receive(filetail, event) - url = @url.clone - url.path = filetail.path - @logger.debug(["original url", { :originalurl => @url, :newurl => url }]) - event = LogStash::Event.new({ - "@message" => event, - "@type" => @type, - "@tags" => @tags.clone, - }) - event.source = url - @logger.debug(["Got event", event]) - @callback.call(event) - end # def receive - - private - class Reader < EventMachine::FileTail - def initialize(path, receiver) - super(path) - @receiver = receiver - @buffer = BufferedTokenizer.new # From eventmachine + def run + @configs.each do |type, url| + glob = url.path + if File.exists?(glob) + files = [glob] + else + files = Dir.glob(glob) + end + files.each do |file| + @file_threads[file] = Thread.new do + JThread.currentThread().setName("inputs/file/reader:#{file}") + watch(file, url, type, [type]) + end + end end - def receive_data(data) - # TODO(2.0): Support multiline log data - @buffer.extract(data).each do |line| - @receiver.receive(self, line) - end - end # def receive_data - end # class Reader + # http://jira.codehaus.org/browse/JRUBY-891 + # NOTE(petef): IO::Select is broken in jruby, so we start a + # thread per file for now. + while sleep 5 + # foo + end + + #event = LogStash::Event.new({ + # "@message" => event, + # "@type" => @type, + # "@tags" => @tags.clone, + #}) + #event.source = url + #@logger.debug(["Got event", event]) + #@callback.call(event) + end # def run + + private + def watch(file, source_url, type, tags) + File.open(file, "r") do |f| + f.extend(File::Tail) + f.interval = 5 + f.backward(0) + f.tail do |line| + e = LogStash::Event.new({ + "@message" => line, + "@type" => type, + "@tags" => tags, + }) + e.source = source_url.to_s + @output_queue.push(e) + end # f.tail + end # File.open + end end # class LogStash::Inputs::File diff --git a/lib/logstash/multiqueue.rb b/lib/logstash/multiqueue.rb new file mode 100644 index 000000000..4def0cbfe --- /dev/null +++ b/lib/logstash/multiqueue.rb @@ -0,0 +1,17 @@ +class MultiQueue + def initialize(*queues) + @mutex = Mutex.new + @queues = queues + end + + def push(object) + @queues.each { |q| q.push(object) } + end + + public + def add_queue(queue) + @mutex.synchronize do + @queues << queue + end + end +end diff --git a/lib/logstash/outputs.rb b/lib/logstash/outputs.rb index 46ab6188e..7abfe93fc 100644 --- a/lib/logstash/outputs.rb +++ b/lib/logstash/outputs.rb @@ -3,13 +3,13 @@ require "uri" module LogStash::Outputs public - def self.from_url(url, &block) + def self.from_url(url) uri = URI.parse(url) # TODO(sissel): Add error handling # TODO(sissel): Allow plugin paths klass = uri.scheme.capitalize file = uri.scheme require "logstash/outputs/#{file}" - LogStash::Outputs.const_get(klass).new(uri, &block) + LogStash::Outputs.const_get(klass).new(uri) end # def from_url end # module LogStash::Outputs diff --git a/lib/logstash/outputs/base.rb b/lib/logstash/outputs/base.rb index 5b3a2db03..56af53090 100644 --- a/lib/logstash/outputs/base.rb +++ b/lib/logstash/outputs/base.rb @@ -8,10 +8,9 @@ class LogStash::Outputs::Base attr_accessor :logger public - def initialize(url, config={}, &block) + def initialize(url) @url = url @url = URI.parse(url) if url.is_a? String - @config = config @logger = LogStash::Logger.new(STDOUT) @urlopts = {} if @url.query From 40a48ab9e5bc8a243369648e606a52e9b1d9cf1b Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 14 Feb 2011 20:01:11 -0800 Subject: [PATCH 2/4] - Start working on a DSL for describing how each component can be configured. --- lib/logstash/config.rb | 42 ++++++++++++++++++++++++++++++++++++ lib/logstash/filters/date.rb | 6 ++++++ lib/logstash/filters/grok.rb | 4 ++++ lib/logstash/inputs/base.rb | 6 ++++++ 4 files changed, 58 insertions(+) create mode 100644 lib/logstash/config.rb diff --git a/lib/logstash/config.rb b/lib/logstash/config.rb new file mode 100644 index 000000000..3a9c72bbf --- /dev/null +++ b/lib/logstash/config.rb @@ -0,0 +1,42 @@ + +require "logstash/namespace" + +# This module is meant as a mixin to classes wishing to be configurable from +# config files +# +# The idea is that you can do this: +# +# class Foo < LogStash::Config +# config "path" => ... +# config "tag" => ... +# end +# +# And the config file should let you do: +# +# foo { +# "path" => ... +# "tag" => ... +# } +# +# TODO(sissel): This is not yet fully designed. +module LogStash::Config + # This method is called when someone does 'include LogStash::Config' + def self.included(base) + # Add ClassMethods module methods to the 'base' given. + base.extend(ClassMethods) + end + + module ClassMethods + def section(name) + @section = name + end # def self.section + + def config(cfg) + # cfg should be hash with one entry of { "key" => "val" } + key, value = cfg.to_a.first + puts "#{@section} {" + puts " #{key} => #{value}" + puts "}" + end # def self.config + end # module ClassMethods +end # module LogStash::Config diff --git a/lib/logstash/filters/date.rb b/lib/logstash/filters/date.rb index d88172550..5b8bcc847 100644 --- a/lib/logstash/filters/date.rb +++ b/lib/logstash/filters/date.rb @@ -3,6 +3,12 @@ require "logstash/namespace" require "logstash/time" class LogStash::Filters::Date < LogStash::Filters::Base + + # Config for date is: + # fieldname: dateformat + # Allow arbitrary keys for this config. + config /[A-Za-z0-9_-]+/ => :string + # The 'date' filter will take a value from your event and use it as the # event timestamp. This is useful for parsing logs generated on remote # servers or for importing old logs. diff --git a/lib/logstash/filters/grok.rb b/lib/logstash/filters/grok.rb index 4c81aad05..0def51692 100644 --- a/lib/logstash/filters/grok.rb +++ b/lib/logstash/filters/grok.rb @@ -5,6 +5,10 @@ gem "jls-grok", ">=0.2.3071" require "grok" # rubygem 'jls-grok' class LogStash::Filters::Grok < LogStash::Filters::Base + + config :pattern => LogStash::Config::String, + :patterns_dir => LogStash::Config::Path + public def initialize(config = {}) super diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index 42f9e5945..a455d5758 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -1,11 +1,17 @@ require "logstash/namespace" require "logstash/event" require "logstash/logging" +require "logstash/config" require "uri" class LogStash::Inputs::Base + include LogStash::Config attr_accessor :logger + # Define the basic config + config "path" => :string #LogStash::Config::Path + config "tag" => :string #LogStash::Config::Array + public def initialize(configs, output_queue) @logger = LogStash::Logger.new(STDERR) From 8fb2eefc1d29a1df626cca343d8579b13b02a6d1 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 14 Feb 2011 22:24:49 -0800 Subject: [PATCH 3/4] - Make configs inheritable and other goodies in this prototype Example: % RUBYLIB=lib ruby -rubygems -e 'require "logstash/config"; require "logstash/inputs/file"; require "logstash/inputs/amqp"; LogStash::Inputs::Amqp.dsl_gen; LogStash::Inputs::File.dsl_gen' input { #parent amqp { #node "somename": tag => string, path => string, pantscon => string, } #node } #parent input { #parent file { #node "somename": tag => string, path => string, } #node } #parent --- lib/logstash/config.rb | 61 ++++++++++++++++++++++++++++++------- lib/logstash/inputs/amqp.rb | 3 ++ lib/logstash/inputs/base.rb | 6 ++-- lib/logstash/inputs/file.rb | 4 +++ 4 files changed, 61 insertions(+), 13 deletions(-) diff --git a/lib/logstash/config.rb b/lib/logstash/config.rb index 3a9c72bbf..9a1cdc2ed 100644 --- a/lib/logstash/config.rb +++ b/lib/logstash/config.rb @@ -23,20 +23,59 @@ module LogStash::Config # This method is called when someone does 'include LogStash::Config' def self.included(base) # Add ClassMethods module methods to the 'base' given. - base.extend(ClassMethods) + base.extend(LogStash::Config::DSL) end - module ClassMethods - def section(name) - @section = name - end # def self.section + module DSL + attr_accessor :dsl_name + attr_accessor :dsl_parent - def config(cfg) + # Set the parent config for this class. + def dsl_parent(*args) + @dsl_parent = args[0] if args.length > 0 + return @dsl_parent + end + + # Set the config name for this class. + def dsl_name(*args) + @dsl_name = args[0] if args.length > 0 + return @dsl_name + end + + def dsl_config(cfg) # cfg should be hash with one entry of { "key" => "val" } + @dsl_config ||= Hash.new key, value = cfg.to_a.first - puts "#{@section} {" - puts " #{key} => #{value}" - puts "}" - end # def self.config - end # module ClassMethods + @dsl_config[key] = value + end # def config + + def dsl_gen + puts "#{@dsl_parent.dsl_name} { #parent" if @dsl_parent + config = [] + config << "#{@dsl_name} { #node" + config << " \"somename\":" + attrs = [] + (@dsl_config || Hash.new).each do |key, value| + attrs << " #{key} => #{value}," + end + config += attrs + config << "} #node" + config = config.collect { |p| "#{@dsl_parent.nil? ? "" : " "}#{p}" } + puts config.join("\n") + puts "} #parent" if @dsl_parent + end + + def inherited(subclass) + # Copy our parent's config to a subclass. + # This method is invoked whenever someone subclasses us, like: + # class Foo < Bar ... + config = Hash.new + @dsl_config.each do |key, val| + #puts "#{self}: Sharing config '#{key}' with subclass #{subclass}" + config[key] = val + end + subclass.instance_variable_set("@dsl_config", config) + subclass.dsl_parent = self + end # def inherited + end # module LogStash::Config::DSL end # module LogStash::Config diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index fde344217..afba6750c 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -7,6 +7,9 @@ require "uuidtools" # rubygem 'uuidtools' class LogStash::Inputs::Amqp < LogStash::Inputs::Base MQTYPES = [ "fanout", "queue", "topic" ] + dsl_name "amqp" + dsl_config "pantscon" => :string #LogStash::Config::Path + public def initialize(url, type, config={}, &block) super diff --git a/lib/logstash/inputs/base.rb b/lib/logstash/inputs/base.rb index a455d5758..bce42ad33 100644 --- a/lib/logstash/inputs/base.rb +++ b/lib/logstash/inputs/base.rb @@ -8,9 +8,11 @@ class LogStash::Inputs::Base include LogStash::Config attr_accessor :logger + dsl_name "input" + dsl_parent nil # Define the basic config - config "path" => :string #LogStash::Config::Path - config "tag" => :string #LogStash::Config::Array + dsl_config "path" => :string #LogStash::Config::Path + dsl_config "tag" => :string #LogStash::Config::Array public def initialize(configs, output_queue) diff --git a/lib/logstash/inputs/file.rb b/lib/logstash/inputs/file.rb index bca23a733..6cf02f549 100644 --- a/lib/logstash/inputs/file.rb +++ b/lib/logstash/inputs/file.rb @@ -4,6 +4,10 @@ require "logstash/namespace" require "socket" # for Socket.gethostname class LogStash::Inputs::File < LogStash::Inputs::Base + + dsl_name "file" + #dsl_parent LogStash::Inputs::Base + public def initialize(configs, output_queue) super From bc2c59c15d8c414ecaf581437277f9d60ea4acfa Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 14 Feb 2011 23:27:49 -0800 Subject: [PATCH 4/4] - Add more config settings --- lib/logstash/filters/base.rb | 6 ++++++ lib/logstash/filters/grok.rb | 5 +++-- lib/logstash/filters/multiline.rb | 5 +++++ lib/logstash/outputs/amqp.rb | 4 ++++ lib/logstash/outputs/base.rb | 6 ++++++ lib/logstash/outputs/elasticsearch.rb | 7 +++++++ lib/logstash/outputs/nagios.rb | 2 ++ lib/logstash/outputs/stdout.rb | 3 +++ lib/logstash/outputs/websocket.rb | 2 ++ 9 files changed, 38 insertions(+), 2 deletions(-) diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 4d89c7643..d5a2aff31 100644 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -1,9 +1,15 @@ require "logstash/namespace" require "logstash/logging" +require "logstash/config" class LogStash::Filters::Base + include LogStash::Config + attr_accessor :logger + dsl_name "filters" + dsl_parent nil + public def initialize(config = {}) @logger = LogStash::Logger.new(STDERR) diff --git a/lib/logstash/filters/grok.rb b/lib/logstash/filters/grok.rb index 0def51692..355be7bbf 100644 --- a/lib/logstash/filters/grok.rb +++ b/lib/logstash/filters/grok.rb @@ -6,8 +6,9 @@ require "grok" # rubygem 'jls-grok' class LogStash::Filters::Grok < LogStash::Filters::Base - config :pattern => LogStash::Config::String, - :patterns_dir => LogStash::Config::Path + config :pattern => :string + config :patterns_dir => :path + config :drop_if_match => :boolean # googlecode/issue/26 public def initialize(config = {}) diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index d6420fb0f..7600aafee 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -7,6 +7,11 @@ require "logstash/filters/base" require "logstash/namespace" class LogStash::Filters::Multiline < LogStash::Filters::Base + + config :pattern => :string + config :negate => :boolean + config :what => ["previous", "next"] + # The 'date' filter will take a value from your event and use it as the # event timestamp. This is useful for parsing logs generated on remote # servers or for importing old logs. diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index 282a2b4b0..b01b7f0fe 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -6,6 +6,10 @@ require "mq" # rubygem 'amqp' class LogStash::Outputs::Amqp < LogStash::Outputs::Base MQTYPES = [ "fanout", "queue", "topic" ] + config :host => :string + config :queue_type => :string + config :queue_name => :string + public def initialize(url, config={}, &block) super diff --git a/lib/logstash/outputs/base.rb b/lib/logstash/outputs/base.rb index 56af53090..6887b801f 100644 --- a/lib/logstash/outputs/base.rb +++ b/lib/logstash/outputs/base.rb @@ -2,11 +2,17 @@ require "cgi" require "logstash/event" require "logstash/logging" require "logstash/namespace" +require "logstash/config" require "uri" class LogStash::Outputs::Base + include LogStash::Config + attr_accessor :logger + dsl_name "outputs" + dsl_parent nil + public def initialize(url) @url = url diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 4eca6f3c3..85a58b39a 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -4,6 +4,13 @@ require "logstash/outputs/amqp" require "logstash/outputs/base" class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base + + # http://host/index/type + config :host => :string + config :index => :string + config :type => :string + # TODO(sissel): Config for river? + public def register @pending = [] diff --git a/lib/logstash/outputs/nagios.rb b/lib/logstash/outputs/nagios.rb index eedd931f6..fd09b3e57 100644 --- a/lib/logstash/outputs/nagios.rb +++ b/lib/logstash/outputs/nagios.rb @@ -5,6 +5,8 @@ class LogStash::Outputs::Nagios < LogStash::Outputs::Base NAGIOS_CRITICAL = 2 NAGIOS_WARN = 1 + config :commandfile => :string + public def initialize(url, config={}, &block) super diff --git a/lib/logstash/outputs/stdout.rb b/lib/logstash/outputs/stdout.rb index 591afa268..83c668085 100644 --- a/lib/logstash/outputs/stdout.rb +++ b/lib/logstash/outputs/stdout.rb @@ -2,6 +2,9 @@ require "logstash/outputs/base" require "logstash/namespace" class LogStash::Outputs::Stdout < LogStash::Outputs::Base + + config :debug => :boolean + public def register # nothing to do diff --git a/lib/logstash/outputs/websocket.rb b/lib/logstash/outputs/websocket.rb index 7ea3509b4..61117f0d5 100644 --- a/lib/logstash/outputs/websocket.rb +++ b/lib/logstash/outputs/websocket.rb @@ -3,6 +3,8 @@ require "logstash/namespace" require "logstash/outputs/base" class LogStash::Outputs::Websocket < LogStash::Outputs::Base + config :address => :string + public def register @channel = EventMachine::Channel.new