mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
channel input
This commit is contained in:
parent
20f8cf8ea0
commit
ea29f9ebb0
1 changed files with 87 additions and 22 deletions
|
@ -58,35 +58,78 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
)
|
||||
end
|
||||
|
||||
def wait command, *args
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.send(command, *args)
|
||||
def run(output_queue)
|
||||
wait = Proc.new do |command, *args|
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
@redis ||= connect
|
||||
response = nil
|
||||
if command == :subscribe
|
||||
@redis.send(:subscribe, *args) do |on|
|
||||
on.message do |c, r|
|
||||
response = r
|
||||
end
|
||||
end
|
||||
else
|
||||
response = @redis.send(command, *args)
|
||||
end
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
retries -= 1
|
||||
sleep 1
|
||||
end
|
||||
end
|
||||
|
||||
def run(output_queue)
|
||||
retries = @retries
|
||||
|
||||
if @channel
|
||||
Thread.new do
|
||||
loop do
|
||||
wait :subscribe
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
@redis.subscribe @channel do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.debug "Subscribed to #{ch} (#{count})"
|
||||
retries = @retries
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(message))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "Failed to create event with '#{message}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
end
|
||||
|
||||
on.unsubscribe do |ch, count|
|
||||
@logger.debug "Unsubscribed from #{ch} (#{count})"
|
||||
end
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @channel
|
||||
|
@ -94,7 +137,29 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
if @list
|
||||
Thread.new do
|
||||
loop do
|
||||
wait :blpop, 0
|
||||
retries = @retries
|
||||
begin
|
||||
@redis ||= connect
|
||||
response = @redis.blpop @list, 0
|
||||
retries = @retries
|
||||
begin
|
||||
output_queue << LogStash::Event.new(JSON.parse(response[1]))
|
||||
rescue # parse or event creation error
|
||||
@logger.error "failed to create event with '#{response[1]}'"
|
||||
@logger.error $!
|
||||
@logger.error $!.backtrace
|
||||
end
|
||||
rescue # redis error
|
||||
raise "Redis connection failed too many times" if retries <= 0
|
||||
@redis = nil
|
||||
@logger.warn "Failed to get event from redis #{@name}. "+
|
||||
"Will retry #{retries} times."
|
||||
@logger.warn $!
|
||||
@logger.warn $!.backtrace
|
||||
retries -= 1
|
||||
sleep 1
|
||||
retry
|
||||
end
|
||||
end # loop
|
||||
end # Thread.new
|
||||
end # if @list
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue