mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge remote branch 'origin/master'
Conflicts: lib/logstash/agent.rb
This commit is contained in:
commit
af2e249d7a
6 changed files with 57 additions and 42 deletions
|
@ -3,22 +3,6 @@ input {
|
|||
path => [ "/var/log/messages", "/var/log/*.log" ]
|
||||
type => "linux-syslog"
|
||||
}
|
||||
|
||||
tcp {
|
||||
port => 1234
|
||||
type => "linux-syslog"
|
||||
}
|
||||
}
|
||||
|
||||
filter {
|
||||
grok {
|
||||
type => "linux-syslog"
|
||||
pattern => ["%{SYSLOG_SUDO}", "%{SYSLOG_KERNEL}", "%{SYSLOGLINE}"]
|
||||
add_tag => "test_tag1"
|
||||
add_tag => ["test_tag2", "test_tag3"]
|
||||
add_field => ["grok_filtered", "true"]
|
||||
add_field => ["test_key", "the pid is %{pid}"]
|
||||
}
|
||||
}
|
||||
|
||||
output {
|
||||
|
|
30
etc/indexer.conf
Normal file
30
etc/indexer.conf
Normal file
|
@ -0,0 +1,30 @@
|
|||
input {
|
||||
amqp {
|
||||
host => "127.0.0.1"
|
||||
user => "guest"
|
||||
pass => "guest"
|
||||
exchange_type => "topic"
|
||||
name => "testing"
|
||||
type => "all"
|
||||
}
|
||||
|
||||
tcp {
|
||||
port => 1234
|
||||
type => "linux-syslog"
|
||||
}
|
||||
}
|
||||
|
||||
filter {
|
||||
grok {
|
||||
type => "linux-syslog"
|
||||
pattern => ["%{SYSLOG_SUDO}", "%{SYSLOG_KERNEL}", "%{SYSLOGLINE}"]
|
||||
add_tag => "grok"
|
||||
add_field => ["test_key", "the pid is %{pid}"]
|
||||
}
|
||||
}
|
||||
|
||||
output {
|
||||
stdout {
|
||||
debug => true
|
||||
}
|
||||
}
|
|
@ -78,7 +78,7 @@ class LogStash::Agent
|
|||
# NOTE(petef) we should use a SizedQueue here (w/config params for size)
|
||||
#filter_queue = Queue.new
|
||||
filter_queue = SizedQueue.new(10)
|
||||
output_queue = MultiQueue.new
|
||||
output_queue = LogStash::MultiQueue.new
|
||||
|
||||
input_target = @filters.length > 0 ? filter_queue : output_queue
|
||||
# Start inputs
|
||||
|
@ -126,15 +126,20 @@ class LogStash::Agent
|
|||
@outputs.each do |output|
|
||||
queue = SizedQueue.new(10)
|
||||
output_queue.add_queue(queue)
|
||||
|
||||
@threads["outputs/#{output}"] = Thread.new do
|
||||
JThread.currentThread().setName("output/#{output}")
|
||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||
begin
|
||||
JThread.currentThread().setName("output/#{output.to_s}")
|
||||
output.logger = @logger
|
||||
output.register
|
||||
|
||||
while event = queue.pop do
|
||||
@logger.debug("Sending event to #{output.to_s}")
|
||||
output.receive(event)
|
||||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
retry
|
||||
end
|
||||
end # Thread.new
|
||||
end # @outputs.each
|
||||
|
||||
|
|
|
@ -80,6 +80,7 @@ class LogStash::File::Manager
|
|||
"@tags" => config["tag"].dup,
|
||||
})
|
||||
e.source = "file://#{@hostname}/#{path}"
|
||||
@logger.debug(["New event from file input", path, e])
|
||||
@output_queue << e
|
||||
end # f.tail
|
||||
end # File.open
|
||||
|
|
|
@ -1,25 +1,11 @@
|
|||
require "bunny" # rubygem 'bunny'
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "mq" # rubygem 'amqp'
|
||||
#require "uuidtools" # rubygem 'uuidtools'
|
||||
require "cgi"
|
||||
require "uri"
|
||||
|
||||
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
|
||||
config_name "amqp"
|
||||
#config "host" => (lambda do |value|
|
||||
## Use URI to validate.
|
||||
#u = URI.parse("dummy:///")
|
||||
#begin
|
||||
#u.host = value
|
||||
#rescue => e
|
||||
#return false, "Invalid hostname #{value.inspect}"
|
||||
#end
|
||||
#return true
|
||||
#) # config "host"
|
||||
|
||||
config :host => :string
|
||||
config :user => :string
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
class MultiQueue
|
||||
require "logstash/namespace"
|
||||
|
||||
class LogStash::MultiQueue
|
||||
public
|
||||
def initialize(*queues)
|
||||
@mutex = Mutex.new
|
||||
@queues = queues
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
# Push an object to all queues.
|
||||
public
|
||||
def push(object)
|
||||
puts "*** Pushing object into MultiQueue: #{object}"
|
||||
@queues.each { |q| q.push(object) }
|
||||
end
|
||||
end # def push
|
||||
alias :<< :push
|
||||
|
||||
alias_method :<<, :push
|
||||
|
||||
|
@ -19,5 +23,10 @@ class MultiQueue
|
|||
@mutex.synchronize do
|
||||
@queues << queue
|
||||
end
|
||||
end
|
||||
end # def add_queue
|
||||
|
||||
public
|
||||
def size
|
||||
return @queues.collect { |q| q.size }
|
||||
end # def size
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue