mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Updated to streamline multithreaded UDP
This commit is contained in:
parent
ed3ce1ac5e
commit
134cb63687
1 changed files with 18 additions and 48 deletions
|
@ -68,23 +68,16 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
||||||
@udp.bind(@host, @port)
|
@udp.bind(@host, @port)
|
||||||
|
|
||||||
@input_to_worker = SizedQueue.new(@queue_size)
|
@input_to_worker = SizedQueue.new(@queue_size)
|
||||||
@worker_to_output = SizedQueue.new(@queue_size)
|
|
||||||
|
|
||||||
@input_workers = @workers.times do
|
@input_workers = @workers.times do |i|
|
||||||
Thread.new { inputworker }
|
@logger.debug("Starting UDP worker thread", :worker => i)
|
||||||
|
Thread.new { inputworker(i) }
|
||||||
end
|
end
|
||||||
|
|
||||||
#johnarnold: not adding output workers unless I see a reason... one should be fine.
|
|
||||||
#@output_workers = @workers.times do
|
|
||||||
Thread.new { outputworker }
|
|
||||||
#end
|
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
#collect datagram message and add to queue
|
#collect datagram message and add to queue
|
||||||
payload, client = @udp.recvfrom(@buffer_size)
|
payload, client = @udp.recvfrom(@buffer_size)
|
||||||
work = [ payload, client ]
|
@input_to_worker.push([payload,client])
|
||||||
@input_to_worker.push(work)
|
|
||||||
|
|
||||||
end
|
end
|
||||||
ensure
|
ensure
|
||||||
if @udp
|
if @udp
|
||||||
|
@ -93,13 +86,11 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
||||||
end
|
end
|
||||||
end # def udp_listener
|
end # def udp_listener
|
||||||
|
|
||||||
def inputworker
|
def inputworker(number)
|
||||||
LogStash::Util::set_thread_name("|worker")
|
LogStash::Util::set_thread_name("<udp.#{number}")
|
||||||
begin
|
begin
|
||||||
while true
|
while true
|
||||||
work = @input_to_worker.pop
|
payload,client = @input_to_worker.pop
|
||||||
payload = work[0]
|
|
||||||
client = work[1]
|
|
||||||
if payload == LogStash::ShutdownSignal
|
if payload == LogStash::ShutdownSignal
|
||||||
@input_to_worker.push(work)
|
@input_to_worker.push(work)
|
||||||
break
|
break
|
||||||
|
@ -107,37 +98,16 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
||||||
|
|
||||||
@codec.decode(payload) do |event|
|
@codec.decode(payload) do |event|
|
||||||
decorate(event)
|
decorate(event)
|
||||||
|
event["host"] ||= client[3]
|
||||||
event["host"] = client[3]
|
@output_queue.push(event)
|
||||||
@worker_to_output.push(event)
|
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
rescue => e
|
rescue => e
|
||||||
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
|
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
|
||||||
end
|
end
|
||||||
end # def inputworker
|
end # def inputworker
|
||||||
|
|
||||||
|
|
||||||
def outputworker
|
|
||||||
LogStash::Util::set_thread_name("|worker")
|
|
||||||
begin
|
|
||||||
while true
|
|
||||||
event = @worker_to_output.pop
|
|
||||||
|
|
||||||
if event == LogStash::ShutdownSignal
|
|
||||||
@worker_to_output.push(payload)
|
|
||||||
break
|
|
||||||
end
|
|
||||||
|
|
||||||
@output_queue << event
|
|
||||||
|
|
||||||
end
|
|
||||||
rescue => e
|
|
||||||
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
|
|
||||||
end
|
|
||||||
end # def outputworker
|
|
||||||
|
|
||||||
public
|
public
|
||||||
def teardown
|
def teardown
|
||||||
@udp.close if @udp && !@udp.closed?
|
@udp.close if @udp && !@udp.closed?
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue