mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
LOGSTASH-1003 LOGSTASH-1038 fixing both issues in one commit because the rabbitmq input is functionally broken until both are fixed
This commit is contained in:
parent
05e43556e1
commit
eb9ae3beb2
2 changed files with 33 additions and 8 deletions
|
@ -64,10 +64,20 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
# Is the queue exclusive? (aka: Will other clients connect to this named queue?)
|
||||
config :exclusive, :validate => :boolean, :default => true
|
||||
|
||||
# Prefetch count. Number of messages to prefetch
|
||||
# Using the prefetch_count option means that if ack is true, the server will
|
||||
# only send the number of messages specified in the prefetch_count option
|
||||
# to logstash and then the server will wait until logstash acknowledges
|
||||
# a message prior to the server sending logstash more messages. In practice,
|
||||
# if ack is true, logstash acknowledges each message. So increasing
|
||||
# prefetch_count might not yield any practical benefit today.
|
||||
# Must be 0 or a positive integer.
|
||||
config :prefetch_count, :validate => :number, :default => 1
|
||||
|
||||
# Enable message acknowledgement
|
||||
# Enable message acknowledgement. The ack only matters if prefetch_count is
|
||||
# more than 0. Message acknowledgement improves reliablity but it reduces
|
||||
# throughput since logstash needs to tell rabbitmq-server that logstash
|
||||
# received the message. Logstash will acknowledge only after it is able to
|
||||
# process the message into a Logstash Event
|
||||
config :ack, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
|
@ -123,6 +133,12 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
@rabbitmq_url << "@"
|
||||
end
|
||||
@rabbitmq_url += "#{@host}:#{@port}#{@vhost}/#{@queue}"
|
||||
|
||||
if @prefetch_count < 0
|
||||
raise RuntimeError.new(
|
||||
"Cannot specify prefetch_count less than 0"
|
||||
)
|
||||
end
|
||||
end # def register
|
||||
|
||||
def run(queue)
|
||||
|
@ -131,12 +147,12 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
@bunny = Bunny.new(@rabbitmq_settings)
|
||||
return if terminating?
|
||||
@bunny.start
|
||||
@bunny.qos({:prefetch_count => @prefetch_count})
|
||||
@bunny.default_channel.prefetch(@prefetch_count)
|
||||
|
||||
@arguments_hash = Hash[*@arguments]
|
||||
|
||||
@bunnyqueue = @bunny.queue(@queue, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
|
||||
@bunnyqueue.bind(@exchange, :key => @key)
|
||||
@bunnyqueue.bind(@exchange, :routing_key => @key)
|
||||
|
||||
# need to get metadata from data
|
||||
@bunnyqueue.subscribe({:ack => @ack, :block => true}) do |delivery_info, metadata, data|
|
||||
|
@ -154,6 +170,15 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
end # headers_add.each do
|
||||
end # if !@headers_fields.empty?
|
||||
queue << e
|
||||
|
||||
# if these conditions are met, the server won't send any more
|
||||
# messages until we specifically ack this message
|
||||
# TODO(jkoppe): to improve throughput, we could ack less often
|
||||
# but, I definitely want to get community buy-in before enabling
|
||||
# one method or another.
|
||||
if @ack and @prefetch_count > 0
|
||||
@bunny.default_channel.ack(delivery_info[:delivery_tag])
|
||||
end
|
||||
end # if e
|
||||
end # @bunnyqueue.subscribe do
|
||||
|
||||
|
|
|
@ -148,12 +148,12 @@ class LogStash::Outputs::RabbitMQ < LogStash::Outputs::Base
|
|||
if @bunnyexchange
|
||||
if @headers_add.empty?
|
||||
#tags2headers is empty, so we send the message normally
|
||||
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key }])
|
||||
@bunnyexchange.publish(message, :persistent => @persistent, :key => key)
|
||||
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :routing_key => key }])
|
||||
@bunnyexchange.publish(message, :persistent => @persistent, :routing_key => key)
|
||||
else
|
||||
#publishing messages WITH headers, that are stored in "headers_add"
|
||||
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :key => key , :headers => @headers_add.inspect}])
|
||||
@bunnyexchange.publish(message, :persistent => @persistent, :key => key, :headers => @headers_add)
|
||||
@logger.debug(["Publishing message", { :destination => to_s, :message => message, :routing_key => key , :headers => @headers_add.inspect}])
|
||||
@bunnyexchange.publish(message, :persistent => @persistent, :routing_key => key, :headers => @headers_add)
|
||||
|
||||
end # if @headers2headers.empty?
|
||||
else
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue