diff --git a/lib/net/socket.rb b/lib/net/socket.rb index 71496974f..05255399b 100644 --- a/lib/net/socket.rb +++ b/lib/net/socket.rb @@ -22,8 +22,27 @@ module LogStash; module Net @handler = self @outbuffer = Hash.new { |h,k| h[k] = [] } @mq = nil + start_amqp end # def initialize + def start_amqp + @amqpthread = Thread.new do + # Create connection to AMQP, and in turn, the main EventMachine loop. + AMQP.start(:host => "localhost") do + @mq = MQ.new + mq_q = @mq.queue(@id, :auto_delete => true) + mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) } + handle_new_subscriptions + + EM.add_periodic_timer(5) { handle_new_subscriptions } + EM.add_periodic_timer(1) do + @outbuffer.each_key { |dest| flushout(dest) } + @outbuffer.clear + end + end # AMQP.start + end + end # def start_amqp + def subscribe(name) @want_queues << name end # def subscribe @@ -72,19 +91,7 @@ module LogStash; module Net end # def handle_message def run - # Create connection to AMQP, and in turn, the main EventMachine loop. - AMQP.start(:host => "localhost") do - @mq = MQ.new - mq_q = @mq.queue(@id, :auto_delete => true) - mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) } - handle_new_subscriptions - - EM.add_periodic_timer(5) { handle_new_subscriptions } - EM.add_periodic_timer(1) do - @outbuffer.each_key { |dest| flushout(dest) } - @outbuffer.clear - end - end # AMQP.start + @amqpthread.join end # run def handle_new_subscriptions