mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Initial version of the new RabbitMQ plugin version
This version uses different clients on different Ruby implementations: * Bunny [1] 0.9 on MRI (CRuby) * HotBunnies [2] on JRuby This eliminates a few limitations of the existing plugin, based on Bunny 0.8: * Payload of large size [< than client machine's available RAM] now can be published without issues * Both are fully capable, well maintained clients that open the door to using more recent RabbitMQ features * Both Bunny 0.9 and especially HotBunnies offer double or triple (!) digits throughput improvements for event publishing * Bunny 0.9 offers automatic connection recovery as an option The choice of Hot Bunnies on JRuby is primarily drive by Bunny triggering a JRuby-specific issue with high CPU consumption by `IO#select` implementation. In addition, it is based on an officially supported RabbitMQ Java client and offers the best possible throughput of all Ruby clients/engines, matching the Java client. This version fully fleshed out yet, needs more work, automated tests and network failure recovery QA. 1. http://rubybunny.info 2. http://hotbunnies.info
This commit is contained in:
parent
431745046f
commit
94609ccd8a
4 changed files with 204 additions and 63 deletions
|
@ -69,7 +69,7 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
# Enable message acknowledgement
|
||||
config :ack, :validate => :boolean, :default => true
|
||||
|
||||
# Enable or disable debugging
|
||||
# Enable or disable logging
|
||||
config :debug, :validate => :boolean, :default => false
|
||||
|
||||
# Enable or disable SSL
|
||||
|
@ -78,71 +78,27 @@ class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable
|
|||
# Validate SSL certificate
|
||||
config :verify_ssl, :validate => :boolean, :default => false
|
||||
|
||||
public
|
||||
|
||||
def initialize(params)
|
||||
params["codec"] = "json" if !params["codec"]
|
||||
|
||||
super
|
||||
end # def initialize
|
||||
end
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info("Registering input #{@url}")
|
||||
require "bunny" # rubygem 'bunny'
|
||||
@vhost ||= "/"
|
||||
@port ||= 5672
|
||||
@key ||= "#"
|
||||
@amqpsettings = {
|
||||
:vhost => @vhost,
|
||||
:host => @host,
|
||||
:port => @port,
|
||||
}
|
||||
@amqpsettings[:user] = @user if @user
|
||||
@amqpsettings[:pass] = @password.value if @password
|
||||
@amqpsettings[:logging] = @debug
|
||||
@amqpsettings[:ssl] = @ssl if @ssl
|
||||
@amqpsettings[:verify_ssl] = @verify_ssl if @verify_ssl
|
||||
@amqpurl = "amqp://"
|
||||
if @user
|
||||
@amqpurl << @user if @user
|
||||
@amqpurl << ":#{CGI.escape(@password.to_s)}" if @password
|
||||
@amqpurl << "@"
|
||||
end
|
||||
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@queue}"
|
||||
end # def register
|
||||
# Use HotBunnies on JRuby to avoid IO#select CPU spikes
|
||||
# (see github.com/ruby-amqp/bunny/issues/95).
|
||||
#
|
||||
# On MRI, use Bunny 0.9.
|
||||
#
|
||||
# See http://rubybunny.info and http://hotbunnies.info
|
||||
# for the docs.
|
||||
if RUBY_ENGINE == "jruby"
|
||||
require "logstash/inputs/rabbitmq/hot_bunnies"
|
||||
|
||||
def run(queue)
|
||||
begin
|
||||
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@queue.inspect}")
|
||||
@bunny = Bunny.new(@amqpsettings)
|
||||
return if terminating?
|
||||
@bunny.start
|
||||
@bunny.qos({:prefetch_count => @prefetch_count})
|
||||
include HotBunniesImpl
|
||||
else
|
||||
require "logstash/inputs/rabbitmq/bunny"
|
||||
|
||||
@arguments_hash = Hash[*@arguments]
|
||||
|
||||
@bunnyqueue = @bunny.queue(@queue, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
|
||||
@bunnyqueue.bind(@exchange, :key => @key)
|
||||
|
||||
@bunnyqueue.subscribe({:ack => @ack}) do |data|
|
||||
@codec.decode(data[:payload]) do |event|
|
||||
event["source"] = @amqpurl
|
||||
queue << event
|
||||
end
|
||||
end # @bunnyqueue.subscribe
|
||||
|
||||
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
|
||||
@logger.error("AMQP connection error, will reconnect: #{e}")
|
||||
# Sleep for a bit before retrying.
|
||||
# TODO(sissel): Write 'backoff' method?
|
||||
sleep(1)
|
||||
retry
|
||||
end # begin/rescue
|
||||
end # def run
|
||||
|
||||
def teardown
|
||||
@bunnyqueue.unsubscribe unless @durable == true
|
||||
@bunnyqueue.delete unless @durable == true
|
||||
@bunny.close if @bunny
|
||||
finished
|
||||
end # def teardown
|
||||
include BunnyImpl
|
||||
end
|
||||
end # class LogStash::Inputs::RabbitMQ
|
||||
|
|
89
lib/logstash/inputs/rabbitmq/bunny.rb
Normal file
89
lib/logstash/inputs/rabbitmq/bunny.rb
Normal file
|
@ -0,0 +1,89 @@
|
|||
class LogStash::Inputs::RabbitMQ
|
||||
module BunnyImpl
|
||||
def register
|
||||
require "bunny"
|
||||
|
||||
@vhost ||= Bunny::DEFAULT_HOST
|
||||
# 5672. Will be switched to 5671 by Bunny if TLS is enabled.
|
||||
@port ||= AMQ::Protocol::DEFAULT_PORT
|
||||
@routing_key ||= "#"
|
||||
|
||||
@settings = {
|
||||
:vhost => @vhost,
|
||||
:host => @host,
|
||||
:port => @port
|
||||
}
|
||||
@settings[:user] = @user || Bunny::DEFAULT_USER
|
||||
@settings[:pass] = if @password
|
||||
@password.value
|
||||
else
|
||||
Bunny::DEFAULT_PASSWORD
|
||||
end
|
||||
|
||||
@settings[:log_level] = if @debug
|
||||
:debug
|
||||
else
|
||||
:error
|
||||
end
|
||||
|
||||
@settings[:tls] = @ssl if @ssl
|
||||
@settings[:verify_ssl] = @verify_ssl if @verify_ssl
|
||||
|
||||
proto = if @ssl
|
||||
"amqp"
|
||||
else
|
||||
"amqps"
|
||||
end
|
||||
@connection_url = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}"
|
||||
|
||||
@logger.info("Registering input #{@connection_url}")
|
||||
end
|
||||
|
||||
def run(queue)
|
||||
begin
|
||||
@connection = Bunny.new(@settings)
|
||||
|
||||
@logger.debug("Connecting to RabbitMQ. Settings: #{@settings.inspect}, queue: #{@queue.inspect}")
|
||||
return if terminating?
|
||||
@conn.start
|
||||
|
||||
@ch = @conn.create_channel
|
||||
|
||||
@ch.prefetch(@prefetch_count)
|
||||
|
||||
@arguments_hash = Hash[*@arguments]
|
||||
|
||||
@q = @ch.queue(@queue,
|
||||
:durable => @durable,
|
||||
:auto_delete => @auto_delete,
|
||||
:exclusive => @exclusive,
|
||||
:arguments => @arguments_hash)
|
||||
@q.bind(@exchange, :routing_key => @key)
|
||||
|
||||
@consumer = @q.subscribe(:manual_ack => @ack) do |delivery_info, properties, data|
|
||||
@codec.decode(data) do |event|
|
||||
event["source"] = @connection_url
|
||||
queue << event
|
||||
end
|
||||
end
|
||||
rescue Bunny::NetworkFailure, Bunny::ConnectionClosedError, Bunny::ConnectionLevelException => e
|
||||
@logger.error("RabbitMQ connection error, will reconnect: #{e}")
|
||||
|
||||
n = Bunny::DEFAULT_NETWORK_RECOVERY_INTERVAL / 2
|
||||
|
||||
sleep n
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
def teardown
|
||||
@consumer.cancel
|
||||
@q.delete unless @durable
|
||||
|
||||
@ch.close if @ch && @ch.open?
|
||||
@connection.close if @connection && @connection.open?
|
||||
|
||||
finished
|
||||
end
|
||||
end # BunnyImpl
|
||||
end
|
91
lib/logstash/inputs/rabbitmq/hot_bunnies.rb
Normal file
91
lib/logstash/inputs/rabbitmq/hot_bunnies.rb
Normal file
|
@ -0,0 +1,91 @@
|
|||
class LogStash::Inputs::RabbitMQ
|
||||
# HotBunnies-based implementation for JRuby
|
||||
module HotBunniesImpl
|
||||
def register
|
||||
require "hot_bunnies"
|
||||
|
||||
@vhost ||= Bunny::DEFAULT_HOST
|
||||
# 5672. Will be switched to 5671 by Bunny if TLS is enabled.
|
||||
@port ||= AMQ::Protocol::DEFAULT_PORT
|
||||
@routing_key ||= "#"
|
||||
|
||||
@settings = {
|
||||
:vhost => @vhost,
|
||||
:host => @host,
|
||||
:port => @port
|
||||
}
|
||||
@settings[:user] = @user || Bunny::DEFAULT_USER
|
||||
@settings[:pass] = if @password
|
||||
@password.value
|
||||
else
|
||||
Bunny::DEFAULT_PASSWORD
|
||||
end
|
||||
|
||||
@settings[:log_level] = if @debug
|
||||
:debug
|
||||
else
|
||||
:error
|
||||
end
|
||||
|
||||
@settings[:tls] = @ssl if @ssl
|
||||
|
||||
proto = if @ssl
|
||||
"amqp"
|
||||
else
|
||||
"amqps"
|
||||
end
|
||||
@connection_url = "#{proto}://#{@user}@#{@host}:#{@port}#{vhost}/#{@queue}"
|
||||
|
||||
@logger.info("Registering input #{@connection_url}")
|
||||
end
|
||||
|
||||
def run(queue)
|
||||
begin
|
||||
@connection = HotBunnies.connect(@settings)
|
||||
|
||||
@logger.debug("Connecting to RabbitMQ. Settings: #{@settings.inspect}, queue: #{@queue.inspect}")
|
||||
return if terminating?
|
||||
@conn.start
|
||||
|
||||
@ch = @conn.create_channel
|
||||
|
||||
@ch.prefetch = @prefetch_count
|
||||
|
||||
@arguments_hash = Hash[*@arguments]
|
||||
|
||||
@q = @ch.queue(@queue,
|
||||
:durable => @durable,
|
||||
:auto_delete => @auto_delete,
|
||||
:exclusive => @exclusive,
|
||||
:arguments => @arguments_hash)
|
||||
@q.bind(@exchange, :routing_key => @key)
|
||||
|
||||
# we manually build a consumer here to be able to keep a reference to it
|
||||
# in an @ivar even though we use a blocking version of HB::Queue#subscribe
|
||||
@consumer = @q.build_consumer(:block => true)
|
||||
@q.subscribe_with(@consumer, :manual_ack => @ack, :block => true) do |_, data|
|
||||
@codec.decode(data) do |event|
|
||||
event["source"] = @connection_url
|
||||
queue << event
|
||||
end
|
||||
end
|
||||
rescue HotBunnies::Exception, java.lang.Throwable => e
|
||||
@logger.error("RabbitMQ connection error, will reconnect: #{e}")
|
||||
|
||||
sleep 3
|
||||
retry
|
||||
end
|
||||
end
|
||||
|
||||
def teardown
|
||||
@consumer.cancel
|
||||
@consumer.gracefully_shut_down
|
||||
@q.delete unless @durable
|
||||
|
||||
@ch.close if @ch && @ch.open?
|
||||
@connection.close if @connection && @connection.open?
|
||||
|
||||
finished
|
||||
end
|
||||
end # HotBunniesImpl
|
||||
end
|
|
@ -39,7 +39,6 @@ Gem::Specification.new do |gem|
|
|||
gem.add_runtime_dependency "aws-sdk" #{Apache 2.0 license}
|
||||
gem.add_runtime_dependency "heroku" #(MIT license)
|
||||
gem.add_runtime_dependency "addressable" #(Apache 2.0 license)
|
||||
gem.add_runtime_dependency "bunny", ["0.8.0"] #(MIT license)
|
||||
gem.add_runtime_dependency "extlib", ["0.9.16"] #(MIT license)
|
||||
gem.add_runtime_dependency "ffi" #(LGPL-3 license)
|
||||
gem.add_runtime_dependency "ffi-rzmq", ["1.0.0"] #(MIT license)
|
||||
|
@ -88,6 +87,12 @@ Gem::Specification.new do |gem|
|
|||
gem.add_runtime_dependency "msgpack" #(Apache 2.0 license)
|
||||
end
|
||||
|
||||
if RUBY_PLATFORM != 'java'
|
||||
gem.add_runtime_dependency "bunny", ["~> 0.9.0.rc2"] #(MIT license)
|
||||
else
|
||||
gem.add_runtime_dependency "hot_bunnies", ["~> 2.0.0.pre4"] #(MIT license)
|
||||
end
|
||||
|
||||
if RUBY_VERSION >= '1.9.1'
|
||||
gem.add_runtime_dependency "cinch" # cinch requires 1.9.1+ #(MIT license)
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue