mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Redis channel support
This commit is contained in:
parent
1ba0c4dce6
commit
70291e76a2
2 changed files with 98 additions and 131 deletions
|
@ -8,9 +8,6 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
|
||||
config_name "redis"
|
||||
|
||||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => "default"
|
||||
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
|
@ -26,15 +23,13 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of a redis list (we'll use BLPOP against this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :list, :validate => :string
|
||||
# The name of a redis list or channel. Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}".
|
||||
config :key, :validate => :string, :required => true
|
||||
|
||||
# The name of a redis channel (we'll use SUBSCRIBE on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :channel, :validate => :string
|
||||
# Either list or channel. If redis_type is list, then we will BLPOP the
|
||||
# key. If redis_type is channel, then we will SUBSCRIBE to the key.
|
||||
config :data_type, :validate => [ "list", "channel" ], :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
@ -50,14 +45,16 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
|
||||
unless @list or @channel
|
||||
raise "Must specify redis list or channel"
|
||||
end
|
||||
|
||||
@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"
|
||||
@logger.info "Registering redis #{identity}"
|
||||
end # def register
|
||||
|
||||
# A string used to identify a redis instance in log messages
|
||||
private
|
||||
def identity
|
||||
"#{@redis_url} #{@data_type}:#{@key}"
|
||||
end
|
||||
|
||||
private
|
||||
def connect
|
||||
Redis.new(
|
||||
|
@ -69,30 +66,59 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
)
|
||||
end # def connect
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
wait = Proc.new do |command, *args|
|
||||
private
|
||||
def queue_event msg, output_queue
|
||||
begin
|
||||
e = LogStash::Event.new(JSON.parse(msg))
|
||||
output_queue << e if e
|
||||
rescue => e # parse or event creation error
|
||||
@logger.error(["Failed to create event with '#{msg}'", e])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def list_listener redis, output_queue
|
||||
response = redis.blpop @key, 0
|
||||
yield
|
||||
queue_event response[1], output_queue
|
||||
end
|
||||
|
||||
private
|
||||
def channel_listener redis, output_queue
|
||||
redis.subscribe @key do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.info "Subscribed to #{ch} (#{count})"
|
||||
yield
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
queue_event message, output_queue
|
||||
end
|
||||
|
||||
on.unsubscribe do |ch, count|
|
||||
@logger.info "Unsubscribed from #{ch} (#{count})"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Since both listeners have the same basic loop, we've abstracted the outer
|
||||
# loop. The only problem is that we need to reset retries on a successful
|
||||
# connection, even though the listener might not return (ever). So we've
|
||||
# implement an "onsuccess" callback as a yield.
|
||||
private
|
||||
def listener_loop listener, output_queue
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
# we need seperate instances of redis for each listener
|
||||
@redis ||= connect
|
||||
response = nil
|
||||
if command == :subscribe
|
||||
@redis.send(:subscribe, *args) do |on|
|
||||
on.message do |c, r|
|
||||
response = r
|
||||
end
|
||||
end
|
||||
else
|
||||
response = @redis.send(command, *args)
|
||||
end
|
||||
retries = @retries
|
||||
e = to_event(response[1], @redis_url)
|
||||
if e
|
||||
output_queue << e
|
||||
self.send listener, @redis, output_queue do
|
||||
retries = @retries
|
||||
end
|
||||
rescue => e # redis error
|
||||
@logger.warn(["Failed to get event from redis #{@name}. " +
|
||||
"Will retry #{retries} times.", $!])
|
||||
"Will retry #{retries} times.", e])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
if retries <= 0
|
||||
raise RuntimeError, "Redis connection failed too many times"
|
||||
|
@ -100,79 +126,24 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
@redis = nil
|
||||
retries -= 1
|
||||
sleep(1)
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # listener_loop
|
||||
|
||||
public
|
||||
def run(output_queue)
|
||||
if @data_type == 'list'
|
||||
listener_loop :list_listener, output_queue
|
||||
else
|
||||
listener_loop :channel_listener, output_queue
|
||||
end
|
||||
|
||||
if @channel
|
||||
Thread.new do
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.subscribe @channel do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.debug "Subscribed to #{ch} (#{count})"
|
||||
retries = @retries
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(message))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "Failed to create event with '#{message}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
end
|
||||
|
||||
on.unsubscribe do |ch, count|
|
||||
@logger.debug "Unsubscribed from #{ch} (#{count})"
|
||||
end
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @channel
|
||||
|
||||
if @list
|
||||
Thread.new do
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @list, 0
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
rescue => e # redis error
|
||||
@logger.warn(["Failed to get event from redis #{@name}. " +
|
||||
"Will retry #{retries} times.", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
if retries <= 0
|
||||
raise RuntimeError, "Redis connection failed too many times"
|
||||
end
|
||||
@redis = nil
|
||||
retries -= 1
|
||||
sleep(1)
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @list
|
||||
|
||||
end # def run
|
||||
|
||||
public
|
||||
def teardown
|
||||
if @data_type == 'channel' and @redis
|
||||
@redis.unsubscribe
|
||||
end
|
||||
end
|
||||
end # class LogStash::Inputs::Redis
|
||||
|
|
|
@ -8,9 +8,6 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
|
||||
config_name "redis"
|
||||
|
||||
# Name is used for logging in case there are multiple instances.
|
||||
config :name, :validate => :string, :default => 'default'
|
||||
|
||||
# The hostname of your redis server.
|
||||
config :host, :validate => :string, :default => "127.0.0.1"
|
||||
|
||||
|
@ -26,15 +23,13 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
# Password to authenticate with. There is no authentication by default.
|
||||
config :password, :validate => :password
|
||||
|
||||
# The name of a redis list (we'll use RPUSH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :list, :validate => :string
|
||||
# The name of a redis list or channel. Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}".
|
||||
config :key, :validate => :string, :required => true
|
||||
|
||||
# The name of a redis channel (we'll use PUBLISH on this). Dynamic names are
|
||||
# valid here, for example "logstash-%{@type}". You must specify a list
|
||||
# or channel or both.
|
||||
config :channel, :validate => :string
|
||||
# Either list or channel. If redis_type is list, then we will RPUSH to key.
|
||||
# If redis_type is channel, then we will PUBLISH to key.
|
||||
config :data_type, :validate => [ "list", "channel" ], :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
@ -42,10 +37,6 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
|
||||
unless @list or @channel
|
||||
raise "Must specify redis list or channel"
|
||||
end # unless @list or @channel
|
||||
end # def register
|
||||
|
||||
def connect
|
||||
|
@ -58,26 +49,31 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
)
|
||||
end # def connect
|
||||
|
||||
# A string used to identify a redis instance in log messages
|
||||
def identity
|
||||
"redis://#{@password}@#{@host}:#{@port}/#{@db} #{@data_type}:#{@key}"
|
||||
end
|
||||
|
||||
|
||||
def receive(event, tries=@retries)
|
||||
if tries <= 0
|
||||
@logger.error "Fatal error, failed to log #{event.to_s} to redis #{@name}"
|
||||
raise "Failed to log to redis #{@name}"
|
||||
@logger.error "Fatal error, failed to log #{event.to_s} to #{identity}"
|
||||
raise RuntimeError, "Failed to log to #{identity} after #{@retries} tries"
|
||||
end
|
||||
|
||||
begin
|
||||
@redis ||= connect
|
||||
tx = @list and @channel
|
||||
@redis.multi if tx
|
||||
@redis.rpush event.sprintf(@list), event.to_json if @list
|
||||
@redis.publish event.sprintf(@channel), event.to_json if @channel
|
||||
@redis.exec if tx
|
||||
if @data_type == 'list'
|
||||
@redis.rpush event.sprintf(@list), event.to_json if @list
|
||||
else
|
||||
@redis.publish event.sprintf(@channel), event.to_json if @channel
|
||||
end
|
||||
rescue => e
|
||||
# TODO(sissel): Be specific in the exceptions we rescue.
|
||||
# Drop the redis connection to be picked up later during a retry.
|
||||
@redis = nil
|
||||
@logger.warn("Failed to log #{event.to_s} to redis #{@name}. "+
|
||||
"Will retry #{tries} times.")
|
||||
@logger.warn($!)
|
||||
@logger.warn(["Failed to log #{event.to_s} to #{identity}. " +
|
||||
"Will retry #{retries} times.", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
Thread.new do
|
||||
sleep 1
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue