mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Merge pull request #1098 from untergeek/udp-untergeek
UDP input plugin tweaks
This commit is contained in:
commit
0bfaf04ce4
1 changed files with 22 additions and 51 deletions
|
@ -21,13 +21,14 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
|||
# than 1024 (privileged ports) may require root or elevated privileges to use.
|
||||
config :port, :validate => :number, :required => true
|
||||
|
||||
# Buffer size
|
||||
# The maximum packet size to read from the network
|
||||
config :buffer_size, :validate => :number, :default => 8192
|
||||
|
||||
# I/O workers
|
||||
# Number of threads processing packets
|
||||
config :workers, :validate => :number, :default => 2
|
||||
|
||||
# Queue depth
|
||||
# This is the number of unprocessed UDP packets you can hold in memory
|
||||
# before packets will start dropping.
|
||||
config :queue_size, :validate => :number, :default => 2000
|
||||
|
||||
public
|
||||
|
@ -67,24 +68,17 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
|||
@udp = UDPSocket.new(Socket::AF_INET)
|
||||
@udp.bind(@host, @port)
|
||||
|
||||
@input_to_worker = SizedQueue.new(@queue_size)
|
||||
@worker_to_output = SizedQueue.new(@queue_size)
|
||||
@input_to_worker = SizedQueue.new(@queue_size)
|
||||
|
||||
@input_workers = @workers.times do
|
||||
Thread.new { inputworker }
|
||||
end
|
||||
@input_workers = @workers.times do |i|
|
||||
@logger.debug("Starting UDP worker thread", :worker => i)
|
||||
Thread.new { inputworker(i) }
|
||||
end
|
||||
|
||||
#johnarnold: not adding output workers unless I see a reason... one should be fine.
|
||||
#@output_workers = @workers.times do
|
||||
Thread.new { outputworker }
|
||||
#end
|
||||
|
||||
loop do
|
||||
#collect datagram message and add to queue
|
||||
#collect datagram message and add to queue
|
||||
payload, client = @udp.recvfrom(@buffer_size)
|
||||
work = [ payload, client ]
|
||||
@input_to_worker.push(work)
|
||||
|
||||
@input_to_worker.push([payload,client])
|
||||
end
|
||||
ensure
|
||||
if @udp
|
||||
|
@ -93,51 +87,28 @@ class LogStash::Inputs::Udp < LogStash::Inputs::Base
|
|||
end
|
||||
end # def udp_listener
|
||||
|
||||
def inputworker
|
||||
LogStash::Util::set_thread_name("|worker")
|
||||
def inputworker(number)
|
||||
LogStash::Util::set_thread_name("<udp.#{number}")
|
||||
begin
|
||||
while true
|
||||
work = @input_to_worker.pop
|
||||
payload = work[0]
|
||||
client = work[1]
|
||||
if payload == LogStash::ShutdownSignal
|
||||
payload,client = @input_to_worker.pop
|
||||
if payload == LogStash::ShutdownSignal
|
||||
@input_to_worker.push(work)
|
||||
break
|
||||
end
|
||||
|
||||
@codec.decode(payload) do |event|
|
||||
decorate(event)
|
||||
|
||||
event["host"] = client[3]
|
||||
@worker_to_output.push(event)
|
||||
|
||||
end
|
||||
|
||||
@codec.decode(payload) do |event|
|
||||
decorate(event)
|
||||
event["host"] ||= client[3]
|
||||
@output_queue.push(event)
|
||||
end
|
||||
end
|
||||
|
||||
rescue => e
|
||||
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
|
||||
end
|
||||
end # def inputworker
|
||||
|
||||
|
||||
def outputworker
|
||||
LogStash::Util::set_thread_name("|worker")
|
||||
begin
|
||||
while true
|
||||
event = @worker_to_output.pop
|
||||
|
||||
if event == LogStash::ShutdownSignal
|
||||
@worker_to_output.push(payload)
|
||||
break
|
||||
end
|
||||
|
||||
@output_queue << event
|
||||
|
||||
end
|
||||
rescue => e
|
||||
@logger.error("Exception in inputworker", "exception" => e, "backtrace" => e.backtrace)
|
||||
end
|
||||
end # def outputworker
|
||||
|
||||
public
|
||||
def teardown
|
||||
@udp.close if @udp && !@udp.closed?
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue