Adds redis pubsub channel support

This commit is contained in:
Bob Corsaro 2011-05-10 06:37:29 -04:00
parent f8dce8d5df
commit 3997a60761
2 changed files with 72 additions and 30 deletions

View file

@ -1,7 +1,7 @@
require "logstash/inputs/base"
require "logstash/namespace"
# read events from a redis using BLPOP
# Read events from a redis using BLPOP
#
# For more information about redis, see <http://redis.io/>
class LogStash::Inputs::Redis < LogStash::Inputs::Base
@ -27,8 +27,15 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
# Password to authenticate with
config :password, :validate => :password
# The name of the redis queue (we'll use BLPOP against this)
config :queue, :validate => :string, :required => true
# The name of a redis list (we'll use BLPOP against this). Dynamic names are
# valid here, for example "logstash-%{@type}". You must specify a list
# or channel or both.
config :list, :validate => :string
# The name of a redis channel (we'll use SUBSCRIBE on this). Dynamic names are
# valid here, for example "logstash-%{@type}". You must specify a list
# or channel or both.
config :channel, :validate => :string
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5
@ -36,6 +43,10 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
def register
require 'redis'
@redis = nil
unless @list or @channel
raise "Must specify redis list or channel"
end
end
def connect
@ -48,28 +59,46 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
)
end
def wait command, *args
begin
@redis ||= connect
response = @redis.send(command, *args)
retries = @retries
begin
output_queue << LogStash::Event.new(JSON.parse(response[1]))
rescue # parse or event creation error
@logger.error "failed to create event with '#{response[1]}'"
@logger.error $!
end
rescue # redis error
raise "Redis connection failed too many times" if retries <= 0
@redis = nil
@logger.warn "Failed to get event from redis #{@name}. "+
"Will retry #{retries} times."
@logger.warn $!
retries -= 1
sleep 1
end
end
def run(output_queue)
retries = @retries
loop do
begin
@redis ||= connect
response = @redis.blpop @queue, 0
retries = @retries
begin
output_queue << LogStash::Event.new(JSON.parse(response[1]))
rescue # parse or event creation error
@logger.error "failed to create event with '#{response[1]}'"
@logger.error $!
end
rescue # redis error
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
@redis = nil
@logger.warn "Failed to get event from redis #{@name}. "+
"Will retry #{retries} times."
@logger.warn $!
retries -= 1
sleep 1
end
end # loop
if @channel
Thread.new do
loop do
wait :subscribe
end # loop
end # Thread.new
end # if @channel
if @list
Thread.new do
loop do
wait :blpop, 0
end # loop
end # Thread.new
end # if @list
end # def run
end # class LogStash::Inputs::Redis

View file

@ -1,6 +1,5 @@
require "logstash/outputs/base"
require "logstash/namespace"
require 'eventmachine'
# send events to a redis databse using RPUSH
#
@ -28,9 +27,15 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are
# valid here, for example "logstash-%{@type}"
config :queue, :validate => :string, :required => true
# The name of a redis list (we'll use RPUSH on this). Dynamic names are
# valid here, for example "logstash-%{@type}". You must specify a list
# or channel or both.
config :list, :validate => :string
# The name of a redis channel (we'll use PUBLISH on this). Dynamic names are
# valid here, for example "logstash-%{@type}". You must specify a list
# or channel or both.
config :channel, :validate => :string
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5
@ -38,6 +43,10 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
def register
require 'redis'
@redis = nil
unless @list or @channel
raise "Must specify redis list or channel"
end # unless @list or @channel
end # def register
def connect
@ -53,12 +62,16 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
def receive(event, tries=@retries)
if tries <= 0
@logger.error "Fatal error, failed to log #{event.to_s} to redis #{@name}"
raise RuntimeError.new "Failed to log to redis #{@name}"
raise "Failed to log to redis #{@name}"
end
begin
@redis ||= connect
@redis.rpush event.sprintf(@queue), event.to_json
tx = @list and @channel
@redis.multi if tx
@redis.rpush event.sprintf(@list), event.to_json if @list
@redis.publish event.sprintf(@channel), event.to_json if @channel
@redis.exec if tx
rescue
# TODO(sissel): Be specific in the exceptions we rescue.
# Drop the redis connection to be picked up later during a retry.