diff --git a/etc/agent.conf b/etc/agent.conf index 1ad1ff91c..a01b25bf9 100644 --- a/etc/agent.conf +++ b/etc/agent.conf @@ -3,22 +3,6 @@ input { path => [ "/var/log/messages", "/var/log/*.log" ] type => "linux-syslog" } - - tcp { - port => 1234 - type => "linux-syslog" - } -} - -filter { - grok { - type => "linux-syslog" - pattern => ["%{SYSLOG_SUDO}", "%{SYSLOG_KERNEL}", "%{SYSLOGLINE}"] - add_tag => "test_tag1" - add_tag => ["test_tag2", "test_tag3"] - add_field => ["grok_filtered", "true"] - add_field => ["test_key", "the pid is %{pid}"] - } } output { diff --git a/etc/indexer.conf b/etc/indexer.conf new file mode 100644 index 000000000..1437a49c3 --- /dev/null +++ b/etc/indexer.conf @@ -0,0 +1,30 @@ +input { + amqp { + host => "127.0.0.1" + user => "guest" + pass => "guest" + exchange_type => "topic" + name => "testing" + type => "all" + } + + tcp { + port => 1234 + type => "linux-syslog" + } +} + +filter { + grok { + type => "linux-syslog" + pattern => ["%{SYSLOG_SUDO}", "%{SYSLOG_KERNEL}", "%{SYSLOGLINE}"] + add_tag => "grok" + add_field => ["test_key", "the pid is %{pid}"] + } +} + +output { + stdout { + debug => true + } +} diff --git a/lib/logstash/agent.rb b/lib/logstash/agent.rb index 948c6855d..a1b58feaa 100644 --- a/lib/logstash/agent.rb +++ b/lib/logstash/agent.rb @@ -78,7 +78,7 @@ class LogStash::Agent # NOTE(petef) we should use a SizedQueue here (w/config params for size) #filter_queue = Queue.new filter_queue = SizedQueue.new(10) - output_queue = MultiQueue.new + output_queue = LogStash::MultiQueue.new input_target = @filters.length > 0 ? filter_queue : output_queue # Start inputs @@ -126,14 +126,19 @@ class LogStash::Agent @outputs.each do |output| queue = SizedQueue.new(10) output_queue.add_queue(queue) + @threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue| + begin + JThread.currentThread().setName("output/#{output.to_s}") + output.logger = @logger + output.register - @threads["outputs/#{output}"] = Thread.new do - JThread.currentThread().setName("output/#{output}") - output.logger = @logger - output.register - - while event = queue.pop do - output.receive(event) + while event = queue.pop do + @logger.debug("Sending event to #{output.to_s}") + output.receive(event) + end + rescue Exception => e + @logger.warn(["Output #{output.to_s} thread exception", e]) + retry end end # Thread.new end # @outputs.each diff --git a/lib/logstash/file/manager.rb b/lib/logstash/file/manager.rb index 76158f39f..d7ca63f76 100644 --- a/lib/logstash/file/manager.rb +++ b/lib/logstash/file/manager.rb @@ -80,6 +80,7 @@ class LogStash::File::Manager "@tags" => config["tag"].dup, }) e.source = "file://#{@hostname}/#{path}" + @logger.debug(["New event from file input", path, e]) @output_queue << e end # f.tail end # File.open diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index ce06fa526..2ee13dc8f 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -1,25 +1,11 @@ require "bunny" # rubygem 'bunny' require "logstash/inputs/base" require "logstash/namespace" -require "mq" # rubygem 'amqp' -#require "uuidtools" # rubygem 'uuidtools' -require "cgi" -require "uri" class LogStash::Inputs::Amqp < LogStash::Inputs::Base MQTYPES = [ "fanout", "queue", "topic" ] config_name "amqp" - #config "host" => (lambda do |value| - ## Use URI to validate. - #u = URI.parse("dummy:///") - #begin - #u.host = value - #rescue => e - #return false, "Invalid hostname #{value.inspect}" - #end - #return true - #) # config "host" config :host => :string config :user => :string diff --git a/lib/logstash/multiqueue.rb b/lib/logstash/multiqueue.rb index 97397f63a..547510296 100644 --- a/lib/logstash/multiqueue.rb +++ b/lib/logstash/multiqueue.rb @@ -1,15 +1,19 @@ -class MultiQueue +require "logstash/namespace" + +class LogStash::MultiQueue public def initialize(*queues) @mutex = Mutex.new @queues = queues - end + end # def initialize # Push an object to all queues. public def push(object) + puts "*** Pushing object into MultiQueue: #{object}" @queues.each { |q| q.push(object) } - end + end # def push + alias :<< :push alias_method :<<, :push @@ -19,5 +23,10 @@ class MultiQueue @mutex.synchronize do @queues << queue end - end + end # def add_queue + + public + def size + return @queues.collect { |q| q.size } + end # def size end