Merge pull request #39 from nickethier/master

Add Support for Dynamic AMQP Routing Keys
This commit is contained in:
Jordan Sissel 2011-08-02 14:26:50 -07:00
commit 2b06b8da95
2 changed files with 11 additions and 4 deletions

View file

@ -28,6 +28,9 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
# The name of the exchange
config :name, :validate => :string, :required => true
# The routing key to bind to
config :key, :validate => :string
# The vhost to use
config :vhost, :validate => :string, :default => "/"
@ -87,8 +90,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
@queue = @bunny.queue(@name, :durable => @durable)
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym, :durable => @durable)
@queue.bind(exchange)
@queue.bind(exchange, :key => @key)
@queue.subscribe do |data|
e = to_event(data[:payload], @amqpurl)
if e

View file

@ -28,6 +28,9 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
# The name of the exchange
config :name, :validate => :string, :required => true
# Key to route to
config :key, :validate => :string
# The vhost to use
config :vhost, :validate => :string, :default => "/"
@ -91,11 +94,12 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
public
def receive(event)
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
key = event.sprintf(@key)
@logger.debug(["Sending event", { :destination => to_s, :event => event, :key => key }])
begin
if @target
begin
@target.publish(event.to_json, :persistent => @persistent)
@target.publish(event.to_json, :persistent => @persistent, :key => key)
rescue JSON::GeneratorError
@logger.warn(["Trouble converting event to JSON", $!, event.to_hash])
return