Merge pull request #26 from dokipen/master

Redis channel support.
This commit is contained in:
Jordan Sissel 2011-06-08 15:50:45 -07:00
commit 023e637666
2 changed files with 161 additions and 47 deletions

View file

@ -7,10 +7,12 @@ require "logstash/namespace"
class LogStash::Inputs::Redis < LogStash::Inputs::Base
config_name "redis"
# Name is used for logging in case there are multiple instances.
config :name, :validate => :string, :default => "default"
# Name is used for logging in case there are multiple instances.
# TODO: remove
config :name, :validate => :string, :default => "default",
:deprecated => true
# The hostname of your redis server.
config :host, :validate => :string, :default => "127.0.0.1"
@ -27,10 +29,17 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
config :password, :validate => :password
# The name of the redis queue (we'll use BLPOP against this).
config :queue, :validate => :string, :required => true
# TODO: remove
config :queue, :validate => :string, :deprecated => true
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5
# The name of a redis list or channel.
# TODO: change required to true
config :key, :validate => :string, :required => false
# Either list or channel. If redis_type is list, then we will BLPOP the
# key. If redis_type is channel, then we will SUBSCRIBE to the key.
# TODO: change required to true
config :data_type, :validate => [ "list", "channel" ], :required => false
public
def initialize(params)
@ -44,8 +53,34 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
require 'redis'
@redis = nil
@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"
# TODO remove after setting key and data_type to true
if @queue
if @key or @data_type
raise RuntimeError.new(
"Cannot specify queue parameter and key or data_type"
)
end
@key = @queue
@data_type = 'list'
end
if not @key or not @data_type
raise RuntimeError.new(
"Must define queue, or key and data_type parameters"
)
end
# end TODO
@logger.info "Registering redis #{identity}"
end # def register
# A string used to identify a redis instance in log messages
private
def identity
@name || "#{@redis_url} #{@data_type}:#{@key}"
end
private
def connect
Redis.new(
@ -57,29 +92,70 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
)
end # def connect
public
def run(output_queue)
retries = @retries
private
def queue_event msg, output_queue
begin
event = to_event msg, identity
output_queue << event if event
rescue => e # parse or event creation error
@logger.error(["Failed to create event with '#{msg}'", e])
@logger.debug(["Backtrace", e.backtrace])
end
end
private
def list_listener redis, output_queue
response = redis.blpop @key, 0
queue_event response[1], output_queue
end
private
def channel_listener redis, output_queue
redis.subscribe @key do |on|
on.subscribe do |ch, count|
@logger.info "Subscribed to #{ch} (#{count})"
end
on.message do |ch, message|
queue_event message, output_queue
end
on.unsubscribe do |ch, count|
@logger.info "Unsubscribed from #{ch} (#{count})"
end
end
end
# Since both listeners have the same basic loop, we've abstracted the outer
# loop.
private
def listener_loop listener, output_queue
loop do
begin
@redis ||= connect
response = @redis.blpop @queue, 0
retries = @retries
e = to_event(response[1], @redis_url)
if e
output_queue << e
end
self.send listener, @redis, output_queue
rescue => e # redis error
@logger.warn(["Failed to get event from redis #{@name}. " +
"Will retry #{retries} times.", $!])
@logger.debug(["Backtrace", e.backtrace])
if retries <= 0
raise RuntimeError, "Redis connection failed too many times"
end
@redis = nil
retries -= 1
sleep(1)
@logger.warn(["Failed to get event from redis #{@name}. ", e])
raise e
end
end # loop
end # listener_loop
public
def run(output_queue)
if @data_type == 'list'
listener_loop :list_listener, output_queue
else
listener_loop :channel_listener, output_queue
end
end # def run
public
def teardown
if @data_type == 'channel' and @redis
@redis.unsubscribe
@redis.quit
@redis = nil
end
end
end # class LogStash::Inputs::Redis

View file

@ -7,10 +7,12 @@ require "logstash/namespace"
class LogStash::Outputs::Redis < LogStash::Outputs::Base
config_name "redis"
# Name is used for logging in case there are multiple instances.
config :name, :validate => :string, :default => 'default'
# Name is used for logging in case there are multiple instances.
# TODO: delete
config :name, :validate => :string, :default => 'default',
:deprecated => true
# The hostname of your redis server.
config :host, :validate => :string, :default => "127.0.0.1"
@ -28,16 +30,45 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
# The name of the redis queue (we'll use RPUSH on this). Dynamic names are
# valid here, for example "logstash-%{@type}"
config :queue, :validate => :string, :required => true
# TODO: delete
config :queue, :validate => :string, :deprecated => true
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5
# The name of a redis list or channel. Dynamic names are
# valid here, for example "logstash-%{@type}".
# TODO set required true
config :key, :validate => :string, :required => false
# Either list or channel. If redis_type is list, then we will RPUSH to key.
# If redis_type is channel, then we will PUBLISH to key.
# TODO set required true
config :data_type, :validate => [ "list", "channel" ], :required => false
public
def register
require 'redis'
# TODO remove after setting key and data_type to true
if @queue
if @key or @data_type
raise RuntimeError.new(
"Cannot specify queue parameter and key or data_type"
)
end
@key = @queue
@data_type = 'list'
end
if not @key or not @data_type
raise RuntimeError.new(
"Must define queue, or key and data_type parameters"
)
end
# end TODO
@redis = nil
end # def register
private
def connect
Redis.new(
:host => @host,
@ -48,27 +79,34 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
)
end # def connect
def receive(event, tries=@retries)
if tries <= 0
@logger.error "Fatal error, failed to log #{event.to_s} to redis #{@name}"
raise RuntimeError.new "Failed to log to redis #{@name}"
end
# A string used to identify a redis instance in log messages
private
def identity
@name || "redis://#{@password}@#{@host}:#{@port}/#{@db} #{@data_type}:#{@key}"
end
public
def receive(event)
begin
@redis ||= connect
@redis.rpush event.sprintf(@queue), event.to_json
rescue => e
# TODO(sissel): Be specific in the exceptions we rescue.
# Drop the redis connection to be picked up later during a retry.
@redis = nil
@logger.warn("Failed to log #{event.to_s} to redis #{@name}. "+
"Will retry #{tries} times.")
@logger.warn($!)
@logger.debug(["Backtrace", e.backtrace])
Thread.new do
sleep 1
receive(event, tries - 1)
if @data_type == 'list'
@redis.rpush event.sprintf(@key), event.to_json
else
@redis.publish event.sprintf(@key), event.to_json
end
rescue => e
@logger.warn(["Failed to log #{event.to_s} to #{identity}.", e])
raise e
end
end # def receive
public
def teardown
if @data_type == 'channel' and @redis
@redis.quit
@redis = nil
end
end
end