diff --git a/etc/agent.conf b/etc/agent.conf index dcae05a43..1ad1ff91c 100644 --- a/etc/agent.conf +++ b/etc/agent.conf @@ -25,4 +25,12 @@ output { stdout { debug => true } + + amqp { + host => "127.0.0.1" + user => "guest" + pass => "guest" + exchange_type => "topic" + name => "testing" + } } diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index 6eca58388..7dc165523 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -1,23 +1,27 @@ -require "amqp" # rubygem 'amqp' +require "bunny" # rubygem 'bunny' require "logstash/outputs/base" require "logstash/namespace" -require "mq" # rubygem 'amqp' -require "cgi" class LogStash::Outputs::Amqp < LogStash::Outputs::Base MQTYPES = [ "fanout", "queue", "topic" ] config_name "amqp" config :host => :string + config :user => :string + config :pass => :string config :exchange_type => :string config :name => :string config :vhost => :string + config :durable => :boolean + config :debug => :boolean public def initialize(params) super - p @exchange_type => MQTYPES + @debug ||= false + @durable ||= false + if !MQTYPES.include?(@exchange_type) raise "Invalid exchange_type, #{@exchange_type.inspect}, must be one of #{MQTYPES.join(", ")}" end @@ -25,44 +29,44 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base public def register - @logger.info("Registering output #{@url}") - query_args = @url.query ? CGI.parse(@url.query) : {} + @logger.info("Registering output #{to_s}") amqpsettings = { :vhost => (@vhost or "/"), - :host => @url.host, - :port => (@url.port or 5672), + :host => @host, + :port => (@port or 5672), } - amqpsettings[:user] = @url.user if @url.user - amqpsettings[:pass] = @url.password if @url.password - amqpsettings[:logging] = query_args.include? "debug" - @logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@exchange_type.inspect} queue #{@name.inspect}") - @amqp = AMQP.connect(amqpsettings) - @mq = MQ.new(@amqp) - @target = nil + amqpsettings[:user] = @user if @user + amqpsettings[:pass] = @pass if @pass + amqpsettings[:logging] = @debug + @logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name]) + @bunny = Bunny.new(amqpsettings) + @bunny.start + @target = nil case @exchange_type when "fanout" - @target = @mq.fanout(@name) + @target = @bunny.exchange(@name, :type => :fanout) when "queue" - @target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false) + @target = @bunny.queue(@name, :durable => @durable) when "topic" - @target = @mq.topic(@name) + @target = @bunny.exchange(@name, :type => :topic) end # case @exchange_type end # def register public def receive(event) - @logger.debug(["Sending event", { :url => @url, :event => event }]) + @logger.debug(["Sending event", { :destination => to_s, :event => event }]) @target.publish(event.to_json) end # def receive # This is used by the ElasticSearch AMQP/River output. public def receive_raw(raw) - if @target == nil - raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil" - end - @target.publish(raw) end # def receive_raw + + public + def to_s + return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}" + end end # class LogStash::Outputs::Amqp