Add Multi-threaded workers and queues to UDP input

This commit is contained in:
johnarnold 2014-01-27 07:01:24 +00:00 committed by Jordan Sissel
parent 4f1272eba5
commit 11239d6d19

View file

@ -23,6 +23,9 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
# Buffer size
config :buffer_size, :validate => :number, :default => 8192
# I/O workers
config :workers, :validate => :number, :default => 2
public
def initialize(params)
@ -37,6 +40,7 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
public
def run(output_queue)
@output_queue = output_queue
begin
# udp server
udp_listener(output_queue)
@ -60,13 +64,24 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
@udp = UDPSocket.new(Socket::AF_INET)
@udp.bind(@host, @port)
@input_to_worker = SizedQueue.new(20000)
@worker_to_output = SizedQueue.new(20000)
@input_workers = @workers.times do
Thread.new { inputworker }
end
#johnarnold: not adding output workers unless I see a reason... one should be fine.
#@output_workers = @workers.times do
Thread.new { outputworker }
#end
loop do
#collect datagram message and add to queue
payload, client = @udp.recvfrom(@buffer_size)
@codec.decode(payload) do |event|
decorate(event)
event["host"] ||= client[3]
output_queue << event
end
work = [ payload, client ]
@input_to_worker.push(work)
end
ensure
if @udp
@ -74,7 +89,52 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
@udp.close_write rescue nil
end
end # def udp_listener
def inputworker
LogStash::Util::set_thread_name("|worker")
begin
while true
work = @input_to_worker.pop
payload = work[0]
client = work[1]
if payload == LogStash::ShutdownSignal
@input_to_worker.push(work)
break
end
@codec.decode(payload) do |event|
decorate(event)
event["host"] = client[3]
@worker_to_output.push(event)
end
end
rescue => e
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
end
end # def inputworker
def outputworker
LogStash::Util::set_thread_name("|worker")
begin
while true
event = @worker_to_output.pop
if event == LogStash::ShutdownSignal
@worker_to_output.push(payload)
break
end
@output_queue << event
end
rescue => e
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
end
end # def outputworker
public
def teardown
@udp.close if @udp && !@udp.closed?