amqp: respect durable flag, use stable queue names; support ElasticSearch bulk API if es_type and es_index are specified

This commit is contained in:
Charles Duffy 2011-02-23 14:00:38 -06:00
parent e2ad4929b6
commit 46685688ef
2 changed files with 29 additions and 14 deletions

View file

@ -6,7 +6,7 @@ require "uuidtools" # rubygem 'uuidtools'
require "cgi"
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
MQTYPES = [ "fanout", "queue", "topic" ]
MQTYPES = [ "fanout", "direct", "topic" ]
public
def initialize(url, type, config={}, &block)
@ -39,20 +39,22 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
amqpsettings[:user] = @url.user if @url.user
amqpsettings[:pass] = @url.password if @url.password
amqpsettings[:logging] = query_args.include? "debug"
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
queue_name = ((@urlopts["queue"].nil? or @urlopts["queue"].empty?) ? "logstash-#{@name}" : @urlopts["queue"])
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{queue_name} on exchange #{@name.inspect}")
@amqp = AMQP.connect(amqpsettings)
@mq = MQ.new(@amqp)
@target = nil
@target = @mq.queue(UUIDTools::UUID.timestamp_create)
@durable_exchange = @urlopts["durable_exchange"] ? true : false
@durable_queue = @urlopts["durable_queue"] ? true : false
@target = @mq.queue(queue_name, :durable => @durable_queue)
case @mqtype
when "fanout"
#@target.bind(MQ.fanout(@url.path, :durable => true))
@target.bind(@mq.fanout(@name))
@target.bind(@mq.fanout(@name, :durable => @durable_exchange))
when "direct"
@target.bind(@mq.direct(@name))
@target.bind(@mq.direct(@name, :durable => @durable_exchange))
when "topic"
@target.bind(@mq.topic(@name))
@target.bind(@mq.topic(@name, :durable => @durable_exchange))
end # case @mqtype
@target.subscribe(:ack => true) do |header, message|

View file

@ -5,12 +5,15 @@ require "mq" # rubygem 'amqp'
require "cgi"
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
MQTYPES = [ "fanout", "queue", "topic" ]
MQTYPES = [ "fanout", "direct", "topic" ]
public
def initialize(url, config={}, &block)
super
@mq = nil
@bulk_prefix = nil
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
# vhost allowed to contain slashes
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
@ -36,25 +39,35 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
amqpsettings[:user] = @url.user if @url.user
amqpsettings[:pass] = @url.password if @url.password
amqpsettings[:logging] = query_args.include? "debug"
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} exchange #{@name.inspect}")
@amqp = AMQP.connect(amqpsettings)
@mq = MQ.new(@amqp)
@target = nil
if @urlopts.include? "es_index" and @urlopts.include? "es_type"
@bulk_prefix = { "index" => { "_index" => @urlopts["es_index"], "_type" => @urlopts["es_type"] } }.to_json + "\n"
@logger.debug "Preparing ElasticSearch bulk API header for injection: #{@bulk_prefix.inspect}"
end
@durable = @urlopts["durable"] ? true : false
case @mqtype
when "fanout"
@target = @mq.fanout(@name)
when "queue"
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
@target = @mq.fanout(@name, :durable => @durable)
when "direct"
@target = @mq.direct(@name, :durable => @durable)
when "topic"
@target = @mq.topic(@name)
@target = @mq.topic(@name, :durable => @durable)
end # case @mqtype
end # def register
public
def receive(event)
@logger.debug(["Sending event", { :url => @url, :event => event }])
@target.publish(event.to_json)
if @bulk_prefix
@target.publish(@bulk_prefix + event.to_json + "\n")
else
@target.publish(event.to_json)
end
end # def receive
# This is used by the ElasticSearch AMQP/River output.