mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- unify input/output for amqp. This fixes a bug where we amqp:///topic/foo
would register a topic as "/topic/foo" for outputs but "foo" for inputs.
This commit is contained in:
parent
7f4b1192f8
commit
75910bb656
1 changed files with 8 additions and 8 deletions
|
@ -3,18 +3,18 @@ require "amqp" # rubygem 'amqp'
|
|||
require "mq" # rubygem 'amqp'
|
||||
|
||||
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
||||
TYPES = [ "fanout", "queue", "topic" ]
|
||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
|
||||
# Handle path /<type>/<name>
|
||||
unused, @type, @name = @url.path.split("/", 3)
|
||||
if @type == nil or @name == nil
|
||||
raise "amqp urls must have a path of /<type>/name where <type> is #{TYPES.join(", ")}"
|
||||
unused, @mqtype, @name = @url.path.split("/", 3)
|
||||
if @mqtype == nil or @name == nil
|
||||
raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||
end
|
||||
|
||||
if !TYPES.include?(@type)
|
||||
raise "Invalid type '#{@type}' must be one #{TYPES.join(", ")}"
|
||||
if !MQTYPES.include?(@mqtype)
|
||||
raise "Invalid type '#{@mqtype}' must be one #{MQTYPES.join(", ")}"
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
|
@ -23,14 +23,14 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
||||
case @type
|
||||
case @mqtype
|
||||
when "fanout"
|
||||
@target = @mq.fanout(@name)
|
||||
when "queue"
|
||||
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
|
||||
when "topic"
|
||||
@target = @mq.topic(@name)
|
||||
end # case @type
|
||||
end # case @mqtype
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue