mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
Remove retry code, moved to agent.rb
This commit is contained in:
parent
472a8de49d
commit
580adda0d5
2 changed files with 7 additions and 41 deletions
|
@ -31,9 +31,6 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
# key. If redis_type is channel, then we will SUBSCRIBE to the key.
|
||||
config :data_type, :validate => [ "list", "channel" ], :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
@ -80,7 +77,6 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
private
|
||||
def list_listener redis, output_queue
|
||||
response = redis.blpop @key, 0
|
||||
yield
|
||||
queue_event response[1], output_queue
|
||||
end
|
||||
|
||||
|
@ -89,7 +85,6 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
redis.subscribe @key do |on|
|
||||
on.subscribe do |ch, count|
|
||||
@logger.info "Subscribed to #{ch} (#{count})"
|
||||
yield
|
||||
end
|
||||
|
||||
on.message do |ch, message|
|
||||
|
@ -103,30 +98,17 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
|
|||
end
|
||||
|
||||
# Since both listeners have the same basic loop, we've abstracted the outer
|
||||
# loop. The only problem is that we need to reset retries on a successful
|
||||
# connection, even though the listener might not return (ever). So we've
|
||||
# implement an "onsuccess" callback as a yield.
|
||||
# loop.
|
||||
private
|
||||
def listener_loop listener, output_queue
|
||||
loop do
|
||||
retries = @retries
|
||||
begin
|
||||
# we need seperate instances of redis for each listener
|
||||
@redis ||= connect
|
||||
self.send listener, @redis, output_queue do
|
||||
retries = @retries
|
||||
end
|
||||
self.send listener, @redis, output_queue
|
||||
rescue => e # redis error
|
||||
@logger.warn(["Failed to get event from redis #{@name}. " +
|
||||
"Will retry #{retries} times.", e])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
if retries <= 0
|
||||
raise RuntimeError, "Redis connection failed too many times"
|
||||
end
|
||||
@redis = nil
|
||||
retries -= 1
|
||||
sleep(1)
|
||||
retry
|
||||
@logger.warn(["Failed to get event from redis #{@name}. ", e])
|
||||
raise e
|
||||
end
|
||||
end # loop
|
||||
end # listener_loop
|
||||
|
|
|
@ -31,9 +31,6 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
# If redis_type is channel, then we will PUBLISH to key.
|
||||
config :data_type, :validate => [ "list", "channel" ], :required => true
|
||||
|
||||
# Maximum number of retries on a read before we give up.
|
||||
config :retries, :validate => :number, :default => 5
|
||||
|
||||
def register
|
||||
require 'redis'
|
||||
@redis = nil
|
||||
|
@ -55,12 +52,7 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
end
|
||||
|
||||
|
||||
def receive(event, tries=@retries)
|
||||
if tries <= 0
|
||||
@logger.error "Fatal error, failed to log #{event.to_s} to #{identity}"
|
||||
raise RuntimeError, "Failed to log to #{identity} after #{@retries} tries"
|
||||
end
|
||||
|
||||
def receive(event)
|
||||
begin
|
||||
@redis ||= connect
|
||||
if @data_type == 'list'
|
||||
|
@ -69,16 +61,8 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
|||
@redis.publish event.sprintf(@key), event.to_json
|
||||
end
|
||||
rescue => e
|
||||
# TODO(sissel): Be specific in the exceptions we rescue.
|
||||
# Drop the redis connection to be picked up later during a retry.
|
||||
@redis = nil
|
||||
@logger.warn(["Failed to log #{event.to_s} to #{identity}. " +
|
||||
"Will retry #{retries} times.", $!])
|
||||
@logger.debug(["Backtrace", e.backtrace])
|
||||
Thread.new do
|
||||
sleep 1
|
||||
receive(event, tries - 1)
|
||||
end
|
||||
@logger.warn(["Failed to log #{event.to_s} to #{identity}.", e])
|
||||
raise e
|
||||
end
|
||||
end # def receive
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue