- go back to using @queue.subscribe because @queue.pop returns

immediately "queue_empty" if the queue is empty, rather than blocking.
  I could find no way to change this behavior in Bunny based on the
  docs here: http://rubydoc.info/gems/bunny/Bunny/Queue#pop-instance_method
This commit is contained in:
Jordan Sissel 2012-02-09 20:57:31 -08:00
parent bb4b6235af
commit 7415a52023

View file

@ -104,7 +104,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
@amqpurl += amqp_credentials unless amqp_credentials.nil?
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"
@metric_queue_pop = @logger.metrics.timer(self, "queue-pop")
@metric_amqp_read = @logger.metrics.timer(self, "amqp-read")
@metric_queue_write = @logger.metrics.timer(self, "internal-queue-write")
end # def register
def run(queue)
@ -118,14 +119,18 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
@queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive})
@queue.bind(@exchange, :key => @key)
while true
data = ""
@metric_queue_pop.time do
data = @queue.pop(:ack => @ack)
end
timer = @metric_amqp_read.time
@queue.subscribe({:ack => @ack}) do |data|
timer.stop
e = to_event(data[:payload], @amqpurl)
queue << e if e
end # looping forever
if e
@metric_queue_write.time do
queue << e
end
end
timer = @metric_amqp_read.time
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.