Merge pull request #459 from jkoppe/jkoppe-bunny-rabbitmq-fixes

Fixing both LOGSTASH-1003 and LOGSTASH-1038
This commit is contained in:
Jordan Sissel 2013-04-30 16:36:28 -07:00
commit d8e2e96965
2 changed files with 33 additions and 8 deletions

View file

@ -64,10 +64,20 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
# Is the queue exclusive? (aka: Will other clients connect to this named queue?)
config :exclusive, :validate => :boolean, :default => true
# Prefetch count. Number of messages to prefetch
# Using the prefetch_count option means that if ack is true, the server will
# only send the number of messages specified in the prefetch_count option
# to logstash and then the server will wait until logstash acknowledges
# a message prior to the server sending logstash more messages. In practice,
# if ack is true, logstash acknowledges each message. So increasing
# prefetch_count might not yield any practical benefit today.
# Must be 0 or a positive integer.
config :prefetch_count, :validate => :number, :default => 1
# Enable message acknowledgement
# Enable message acknowledgement. The ack only matters if prefetch_count is
# more than 0. Message acknowledgement improves reliablity but it reduces
# throughput since logstash needs to tell rabbitmq-server that logstash
# received the message. Logstash will acknowledge only after it is able to
# process the message into a Logstash Event
config :ack, :validate => :boolean, :default => true
# Enable or disable debugging
@ -123,6 +133,12 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
@rabbitmq_url << "@"
end
@rabbitmq_url += "#{@host}:#{@port}#{@vhost}/#{@queue}"
if @prefetch_count < 0
raise RuntimeError.new(
"Cannot specify prefetch_count less than 0"
)
end
end # def register
def run(queue)
@ -131,12 +147,12 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
@bunny = Bunny.new(@rabbitmq_settings)
return if terminating?
@bunny.start
@bunny.qos({:prefetch_count => @prefetch_count})
@bunny.default_channel.prefetch(@prefetch_count)
@arguments_hash = Hash[*@arguments]
@bunnyqueue = @bunny.queue(@queue, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
@bunnyqueue.bind(@exchange, :key => @key)
@bunnyqueue.bind(@exchange, :routing_key => @key)
# need to get metadata from data
@bunnyqueue.subscribe({:ack => @ack, :block => true}) do |delivery_info, metadata, data|
@ -154,6 +170,15 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
end # headers_add.each do
end # if !@headers_fields.empty?
queue << e
# if these conditions are met, the server won't send any more
# messages until we specifically ack this message
# TODO(jkoppe): to improve throughput, we could ack less often
# but, I definitely want to get community buy-in before enabling
# one method or another.
if @ack and @prefetch_count > 0
@bunny.default_channel.ack(delivery_info[:delivery_tag])
end
end # if e
end # @bunnyqueue.subscribe do

View file

@ -148,12 +148,12 @@ class LogStash::Outputs::RabbitMQ < LogStash::Outputs::Base
if @bunnyexchange
if @headers_add.empty?
#tags2headers is empty, so we send the message normally
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key }])
@bunnyexchange.publish(message, :persistent => @persistent, :key => key)
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :routing_key => key }])
@bunnyexchange.publish(message, :persistent => @persistent, :routing_key => key)
else
#publishing messages WITH headers, that are stored in "headers_add"
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key , :headers => @headers_add.inspect}])
@bunnyexchange.publish(message, :persistent => @persistent, :key => key, :headers => @headers_add)
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :routing_key => key , :headers => @headers_add.inspect}])
@bunnyexchange.publish(message, :persistent => @persistent, :routing_key => key, :headers => @headers_add)
end # if @headers2headers.empty?
else