- Switch to using SizedQueues for internal messaging

http://code.google.com/p/logstash/issues/detail?id=44
This commit is contained in:
Jordan Sissel 2011-02-21 21:14:40 -08:00
parent cef63977e3
commit 08d2028a98

View file

@ -10,6 +10,9 @@ require "uri"
JThread = java.lang.Thread JThread = java.lang.Thread
# TODO(sissel): only enable this if we are in debug mode.
# JRuby.objectspace=true
# Collect logs, ship them out. # Collect logs, ship them out.
class LogStash::Agent class LogStash::Agent
attr_reader :config attr_reader :config
@ -73,7 +76,8 @@ class LogStash::Agent
end end
# NOTE(petef) we should use a SizedQueue here (w/config params for size) # NOTE(petef) we should use a SizedQueue here (w/config params for size)
filter_queue = Queue.new #filter_queue = Queue.new
filter_queue = SizedQueue.new(10)
output_queue = MultiQueue.new output_queue = MultiQueue.new
input_target = @filters.length > 0 ? filter_queue : output_queue input_target = @filters.length > 0 ? filter_queue : output_queue
@ -89,7 +93,7 @@ class LogStash::Agent
# Create N filter-worker threads # Create N filter-worker threads
if @filters.length > 0 if @filters.length > 0
3.times do |n| 1.times do |n|
@logger.info("Starting filter worker thread #{n}") @logger.info("Starting filter worker thread #{n}")
@threads["filter|worker|#{n}"] = Thread.new do @threads["filter|worker|#{n}"] = Thread.new do
JThread.currentThread().setName("filter|worker|#{n}") JThread.currentThread().setName("filter|worker|#{n}")
@ -120,7 +124,7 @@ class LogStash::Agent
# Create output threads # Create output threads
@outputs.each do |output| @outputs.each do |output|
queue = Queue.new queue = SizedQueue.new(10)
output_queue.add_queue(queue) output_queue.add_queue(queue)
@threads["outputs/#{output}"] = Thread.new do @threads["outputs/#{output}"] = Thread.new do
@ -134,13 +138,9 @@ class LogStash::Agent
end # Thread.new end # Thread.new
end # @outputs.each end # @outputs.each
# # Register any signal handlers
# #register_signal_handler
#
while sleep 5 while sleep 5
end end
end # def register end # def run
public public
def stop def stop
@ -175,39 +175,25 @@ class LogStash::Agent
public public
def register_signal_handler def register_signal_handler
@sigchannel = EventMachine::Channel.new # TODO(sissel): This doesn't work well in jruby since ObjectSpace is disabled
Signal.trap("USR1") do # by default.
@sigchannel.push(:USR1) Signal.trap("USR2") do
end
Signal.trap("INT") do
@sigchannel.push(:INT)
end
@sigchannel.subscribe do |msg|
# TODO(sissel): Make this a function. # TODO(sissel): Make this a function.
case msg counts = Hash.new { |h,k| h[k] = 0 }
when :USR1 ObjectSpace.each_object do |obj|
counts = Hash.new { |h,k| h[k] = 0 } counts[obj.class] += 1
ObjectSpace.each_object do |obj| end
counts[obj.class] += 1
end
@logger.info("SIGUSR1 received. Dumping state") @logger.info("SIGUSR1 received. Dumping state")
@logger.info("#{self.class.name} config") @logger.info("#{self.class.name} config")
@logger.info([" Inputs:", @inputs]) @logger.info([" Inputs:", @inputs])
@logger.info([" Filters:", @filters]) @logger.info([" Filters:", @filters])
@logger.info([" Outputs:", @outputs]) @logger.info([" Outputs:", @outputs])
@logger.info("Dumping counts of objects by class") @logger.info("Dumping counts of objects by class")
counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value| counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value|
@logger.info("Class: [#{value}] #{key}") @logger.info("Class: [#{value}] #{key}")
end end
when :INT end # SIGUSR1
@logger.warn("SIGINT received. Shutting down.")
# TODO(petef): Should have input/output/filter register shutdown
# hooks.
end # case msg
end # @sigchannel.subscribe
end # def register_signal_handler end # def register_signal_handler
end # class LogStash::Agent end # class LogStash::Agent