diff --git a/lib/logstash/agent.rb b/lib/logstash/agent.rb index 4f42c7911..948c6855d 100644 --- a/lib/logstash/agent.rb +++ b/lib/logstash/agent.rb @@ -10,6 +10,9 @@ require "uri" JThread = java.lang.Thread +# TODO(sissel): only enable this if we are in debug mode. +# JRuby.objectspace=true + # Collect logs, ship them out. class LogStash::Agent attr_reader :config @@ -73,7 +76,8 @@ class LogStash::Agent end # NOTE(petef) we should use a SizedQueue here (w/config params for size) - filter_queue = Queue.new + #filter_queue = Queue.new + filter_queue = SizedQueue.new(10) output_queue = MultiQueue.new input_target = @filters.length > 0 ? filter_queue : output_queue @@ -89,7 +93,7 @@ class LogStash::Agent # Create N filter-worker threads if @filters.length > 0 - 3.times do |n| + 1.times do |n| @logger.info("Starting filter worker thread #{n}") @threads["filter|worker|#{n}"] = Thread.new do JThread.currentThread().setName("filter|worker|#{n}") @@ -120,7 +124,7 @@ class LogStash::Agent # Create output threads @outputs.each do |output| - queue = Queue.new + queue = SizedQueue.new(10) output_queue.add_queue(queue) @threads["outputs/#{output}"] = Thread.new do @@ -134,13 +138,9 @@ class LogStash::Agent end # Thread.new end # @outputs.each - -# # Register any signal handlers -# #register_signal_handler -# while sleep 5 end - end # def register + end # def run public def stop @@ -175,39 +175,25 @@ class LogStash::Agent public def register_signal_handler - @sigchannel = EventMachine::Channel.new - Signal.trap("USR1") do - @sigchannel.push(:USR1) - end - - Signal.trap("INT") do - @sigchannel.push(:INT) - end - - @sigchannel.subscribe do |msg| + # TODO(sissel): This doesn't work well in jruby since ObjectSpace is disabled + # by default. + Signal.trap("USR2") do # TODO(sissel): Make this a function. - case msg - when :USR1 - counts = Hash.new { |h,k| h[k] = 0 } - ObjectSpace.each_object do |obj| - counts[obj.class] += 1 - end + counts = Hash.new { |h,k| h[k] = 0 } + ObjectSpace.each_object do |obj| + counts[obj.class] += 1 + end - @logger.info("SIGUSR1 received. Dumping state") - @logger.info("#{self.class.name} config") - @logger.info([" Inputs:", @inputs]) - @logger.info([" Filters:", @filters]) - @logger.info([" Outputs:", @outputs]) + @logger.info("SIGUSR1 received. Dumping state") + @logger.info("#{self.class.name} config") + @logger.info([" Inputs:", @inputs]) + @logger.info([" Filters:", @filters]) + @logger.info([" Outputs:", @outputs]) - @logger.info("Dumping counts of objects by class") - counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value| - @logger.info("Class: [#{value}] #{key}") - end - when :INT - @logger.warn("SIGINT received. Shutting down.") - # TODO(petef): Should have input/output/filter register shutdown - # hooks. - end # case msg - end # @sigchannel.subscribe + @logger.info("Dumping counts of objects by class") + counts.sort { |a,b| a[1] <=> b[1] or a[0] <=> b[0] }.each do |key, value| + @logger.info("Class: [#{value}] #{key}") + end + end # SIGUSR1 end # def register_signal_handler end # class LogStash::Agent