diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index c803aad3b..572845956 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -63,6 +63,7 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # # If true, we send an RPUSH every "batch_events" events or # "batch_timeout" seconds (whichever comes first). + # Only supported for list redis data_type. config :batch, :validate => :boolean, :default => false # If batch is set to true, the number of events we queue up for an RPUSH. @@ -72,6 +73,21 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # when there are pending events to flush. config :batch_timeout, :validate => :number, :default => 5 + # Interval for reconnecting to failed redis connections + config :reconnect_interval, :validate => :number, :default => 1 + + # In case redis data_type is list and has more than @congestion_threshold items, block until someone consumes them and reduces + # congestion, otherwise if there are no consumers redis will run out of memory, unless it was configured with OOM protection. + # But even with OOM protection single redis list can block all other users of redis, as well redis cpu consumption + # becomes bad then it reaches the max allowed ram size. + # Default value of 0 means that this limit is disabled. + # Only supported for list redis data_type. + config :congestion_threshold, :validate => :number, :default => 0 + + # How often to check for congestion, defaults to 1 second. + # Zero means to check on every event. + config :congestion_interval, :validate => :number, :default => 1 + def register require 'redis' @@ -112,22 +128,26 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base @host.shuffle! end @host_idx = 0 + + @congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval } end # def register def receive(event) return unless output?(event) - if @batch + if @batch and @data_type == 'list' # Don't use batched method for pubsub. # Stud::Buffer buffer_receive(event.to_json, event.sprintf(@key)) return end - event_key_and_payload = [event.sprintf(@key), event.to_json] + event_key = event.sprintf(@key) + event_key_and_payload = [event_key, event.to_json] begin @redis ||= connect if @data_type == 'list' + congestion_check(event_key) @redis.rpush *event_key_and_payload else @redis.publish *event_key_and_payload @@ -136,15 +156,30 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base @logger.warn("Failed to send event to redis", :event => event, :identity => identity, :exception => e, :backtrace => e.backtrace) - sleep 1 + sleep @reconnect_interval @redis = nil retry end end # def receive + def congestion_check(key) + return if @congestion_threshold == 0 + if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. + @congestion_check_time = Time.now.to_i + while @redis.llen(key) > @congestion_threshold # Don't push event to redis key which has reached @congestion_threshold. + @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") + sleep @congestion_interval + end + end + end + end + # called from Stud::Buffer#buffer_flush when there are events to flush - def flush(events, key) + def flush(events, key, teardown=false) @redis ||= connect + # we should not block due to congestion on teardown + # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. + congestion_check(key) unless teardown @redis.rpush(key, events) end # called from Stud::Buffer#buffer_flush when an error occurs