Added support for amqp routing keys

This commit is contained in:
Nick Ethier 2011-08-02 11:27:36 -06:00
parent 093acf59b9
commit a14f1aff17
2 changed files with 28 additions and 5 deletions

View file

@ -28,6 +28,9 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
# The name of the exchange
config :name, :validate => :string, :required => true
# The routing key to bind to
config :key, :validate => :string
# The vhost to use
config :vhost, :validate => :string, :default => "/"
@ -79,8 +82,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue.bind(exchange, :key => @key)
@queue.subscribe do |data|
e = to_event(data[:payload], @amqpurl)
if e

View file

@ -1,6 +1,6 @@
require "logstash/outputs/base"
require "logstash/namespace"
require "socket"
# Push events to an AMQP exchange.
#
# AMQP is a messaging system. It requires you to run an AMQP server or 'broker'
@ -8,6 +8,10 @@ require "logstash/namespace"
# [QPid](http://qpid.apache.org/)
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
MQTYPES = [ "fanout", "direct", "topic" ]
KEY_VARS = {
:hostname => Socket.gethostname,
:type => ''
}
config_name "amqp"
@ -28,6 +32,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
# The name of the exchange
config :name, :validate => :string, :required => true
# Key to route to
config :key, :validate => :array
# The vhost to use
config :vhost, :validate => :string, :default => "/"
@ -83,11 +90,24 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
public
def receive(event)
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
if @key
KEY_VARS[:type] = event.type
#build routing key
key = ''
@key.each do |k|
if k.match(/^:/)
key = "#{key}.#{KEY_VARS[k[1..-1].to_sym]}"
else
key = "#{key}.#{k}"
end
end
key = key[1..-1] #strip first .
end
@logger.debug(["Sending event", { :destination => to_s, :event => event, :key => key }])
begin
if @target
begin
@target.publish(event.to_json, :persistent => @persistent)
@target.publish(event.to_json, :persistent => @persistent, :key => key)
rescue JSON::GeneratorError
@logger.warn(["Trouble converting event to JSON", $!, event.to_hash])
return