mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
- move amqp/em into a separate thread.
- Things wanting to block until amqp/em are finished will still call Socket#run, which now only blocks until the new amqp/em thread complete (with Thread#join)
This commit is contained in:
parent
4afa19646b
commit
cd4baf78a6
1 changed files with 20 additions and 13 deletions
|
@ -22,8 +22,27 @@ module LogStash; module Net
|
||||||
@handler = self
|
@handler = self
|
||||||
@outbuffer = Hash.new { |h,k| h[k] = [] }
|
@outbuffer = Hash.new { |h,k| h[k] = [] }
|
||||||
@mq = nil
|
@mq = nil
|
||||||
|
start_amqp
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
|
def start_amqp
|
||||||
|
@amqpthread = Thread.new do
|
||||||
|
# Create connection to AMQP, and in turn, the main EventMachine loop.
|
||||||
|
AMQP.start(:host => "localhost") do
|
||||||
|
@mq = MQ.new
|
||||||
|
mq_q = @mq.queue(@id, :auto_delete => true)
|
||||||
|
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
|
||||||
|
handle_new_subscriptions
|
||||||
|
|
||||||
|
EM.add_periodic_timer(5) { handle_new_subscriptions }
|
||||||
|
EM.add_periodic_timer(1) do
|
||||||
|
@outbuffer.each_key { |dest| flushout(dest) }
|
||||||
|
@outbuffer.clear
|
||||||
|
end
|
||||||
|
end # AMQP.start
|
||||||
|
end
|
||||||
|
end # def start_amqp
|
||||||
|
|
||||||
def subscribe(name)
|
def subscribe(name)
|
||||||
@want_queues << name
|
@want_queues << name
|
||||||
end # def subscribe
|
end # def subscribe
|
||||||
|
@ -72,19 +91,7 @@ module LogStash; module Net
|
||||||
end # def handle_message
|
end # def handle_message
|
||||||
|
|
||||||
def run
|
def run
|
||||||
# Create connection to AMQP, and in turn, the main EventMachine loop.
|
@amqpthread.join
|
||||||
AMQP.start(:host => "localhost") do
|
|
||||||
@mq = MQ.new
|
|
||||||
mq_q = @mq.queue(@id, :auto_delete => true)
|
|
||||||
mq_q.subscribe(:ack =>true) { |hdr, msg| handle_message(hdr, msg) }
|
|
||||||
handle_new_subscriptions
|
|
||||||
|
|
||||||
EM.add_periodic_timer(5) { handle_new_subscriptions }
|
|
||||||
EM.add_periodic_timer(1) do
|
|
||||||
@outbuffer.each_key { |dest| flushout(dest) }
|
|
||||||
@outbuffer.clear
|
|
||||||
end
|
|
||||||
end # AMQP.start
|
|
||||||
end # run
|
end # run
|
||||||
|
|
||||||
def handle_new_subscriptions
|
def handle_new_subscriptions
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue