From 134cb6368784b7d23652f72de250b2a8275f8759 Mon Sep 17 00:00:00 2001 From: Aaron Mildenstein Date: Wed, 19 Feb 2014 18:43:02 -0600 Subject: [PATCH] Updated to streamline multithreaded UDP --- lib/logstash/inputs/udp.rb | 66 +++++++++++--------------------------- 1 file changed, 18 insertions(+), 48 deletions(-) diff --git a/lib/logstash/inputs/udp.rb b/lib/logstash/inputs/udp.rb index 55b7caf31..ace771cb3 100644 --- a/lib/logstash/inputs/udp.rb +++ b/lib/logstash/inputs/udp.rb @@ -67,24 +67,17 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base @udp = UDPSocket.new(Socket::AF_INET) @udp.bind(@host, @port) - @input_to_worker = SizedQueue.new(@queue_size) - @worker_to_output = SizedQueue.new(@queue_size) + @input_to_worker = SizedQueue.new(@queue_size) - @input_workers = @workers.times do - Thread.new { inputworker } - end + @input_workers = @workers.times do |i| + @logger.debug("Starting UDP worker thread", :worker => i) + Thread.new { inputworker(i) } + end - #johnarnold: not adding output workers unless I see a reason... one should be fine. - #@output_workers = @workers.times do - Thread.new { outputworker } - #end - loop do - #collect datagram message and add to queue + #collect datagram message and add to queue payload, client = @udp.recvfrom(@buffer_size) - work = [ payload, client ] - @input_to_worker.push(work) - + @input_to_worker.push([payload,client]) end ensure if @udp @@ -93,51 +86,28 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base end end # def udp_listener - def inputworker - LogStash::Util::set_thread_name("|worker") + def inputworker(number) + LogStash::Util::set_thread_name(" e @logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace) end end # def inputworker - - def outputworker - LogStash::Util::set_thread_name("|worker") - begin - while true - event = @worker_to_output.pop - - if event == LogStash::ShutdownSignal - @worker_to_output.push(payload) - break - end - - @output_queue << event - - end - rescue => e - @logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace) - end - end # def outputworker - public def teardown @udp.close if @udp && !@udp.closed?