mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- parse query string on output URLs for additional options
- amqp now properly supports "queue" type (and in general, works now) - amqp supports durable queues (?durable=1) - added amqp#receive_raw for publishing a string directly
This commit is contained in:
parent
3014715d78
commit
84ef233bca
2 changed files with 23 additions and 5 deletions
|
@ -1,5 +1,6 @@
|
|||
require "logstash/outputs/base"
|
||||
require "amqp" # rubygem 'amqp'
|
||||
require "cgi"
|
||||
require "mq" # rubygem 'amqp'
|
||||
|
||||
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
||||
|
@ -25,15 +26,23 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
|
||||
case @type
|
||||
when "fanout"
|
||||
@target = @mq.fanout(@url.path)
|
||||
when "direct"
|
||||
@target = @mq.direct(@url.path)
|
||||
@target = @mq.fanout(@name)
|
||||
when "queue"
|
||||
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
|
||||
when "topic"
|
||||
@target = @mq.topic(@url.path)
|
||||
@target = @mq.topic(@name)
|
||||
end # case @type
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
@target.publish(event.to_json)
|
||||
end # def event
|
||||
end # def receive
|
||||
|
||||
def receive_raw(raw)
|
||||
if @target == nil
|
||||
raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil"
|
||||
end
|
||||
|
||||
@target.publish(raw)
|
||||
end # def receive_raw
|
||||
end # class LogStash::Outputs::Amqp
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
require "logstash/namespace"
|
||||
require "logstash/event"
|
||||
require "logstash/logging"
|
||||
require "uri"
|
||||
|
||||
class LogStash::Outputs::Base
|
||||
|
@ -7,6 +8,14 @@ class LogStash::Outputs::Base
|
|||
@url = url
|
||||
@url = URI.parse(url) if url.is_a? String
|
||||
@config = config
|
||||
@logger = LogStash::Logger.new(STDOUT)
|
||||
@urlopts = {}
|
||||
if @url.query
|
||||
@urlopts = CGI.parse(@url.query)
|
||||
@urlopts.each do |k, v|
|
||||
@urlopts[k] = v.last if v.is_a?(Array)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def register
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue