mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Merge pull request #419 from piavlo/feature/redis-output-congestion
redis output congestion blocking check
This commit is contained in:
commit
d28a4cbf75
1 changed files with 39 additions and 4 deletions
|
@ -63,6 +63,7 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
||||||
#
|
#
|
||||||
# If true, we send an RPUSH every "batch_events" events or
|
# If true, we send an RPUSH every "batch_events" events or
|
||||||
# "batch_timeout" seconds (whichever comes first).
|
# "batch_timeout" seconds (whichever comes first).
|
||||||
|
# Only supported for list redis data_type.
|
||||||
config :batch, :validate => :boolean, :default => false
|
config :batch, :validate => :boolean, :default => false
|
||||||
|
|
||||||
# If batch is set to true, the number of events we queue up for an RPUSH.
|
# If batch is set to true, the number of events we queue up for an RPUSH.
|
||||||
|
@ -72,6 +73,21 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
||||||
# when there are pending events to flush.
|
# when there are pending events to flush.
|
||||||
config :batch_timeout, :validate => :number, :default => 5
|
config :batch_timeout, :validate => :number, :default => 5
|
||||||
|
|
||||||
|
# Interval for reconnecting to failed redis connections
|
||||||
|
config :reconnect_interval, :validate => :number, :default => 1
|
||||||
|
|
||||||
|
# In case redis data_type is list and has more than @congestion_threshold items, block until someone consumes them and reduces
|
||||||
|
# congestion, otherwise if there are no consumers redis will run out of memory, unless it was configured with OOM protection.
|
||||||
|
# But even with OOM protection single redis list can block all other users of redis, as well redis cpu consumption
|
||||||
|
# becomes bad then it reaches the max allowed ram size.
|
||||||
|
# Default value of 0 means that this limit is disabled.
|
||||||
|
# Only supported for list redis data_type.
|
||||||
|
config :congestion_threshold, :validate => :number, :default => 0
|
||||||
|
|
||||||
|
# How often to check for congestion, defaults to 1 second.
|
||||||
|
# Zero means to check on every event.
|
||||||
|
config :congestion_interval, :validate => :number, :default => 1
|
||||||
|
|
||||||
def register
|
def register
|
||||||
require 'redis'
|
require 'redis'
|
||||||
|
|
||||||
|
@ -112,22 +128,26 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
||||||
@host.shuffle!
|
@host.shuffle!
|
||||||
end
|
end
|
||||||
@host_idx = 0
|
@host_idx = 0
|
||||||
|
|
||||||
|
@congestion_check_times = Hash.new { |h,k| h[k] = Time.now.to_i - @congestion_interval }
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
def receive(event)
|
def receive(event)
|
||||||
return unless output?(event)
|
return unless output?(event)
|
||||||
|
|
||||||
if @batch
|
if @batch and @data_type == 'list' # Don't use batched method for pubsub.
|
||||||
# Stud::Buffer
|
# Stud::Buffer
|
||||||
buffer_receive(event.to_json, event.sprintf(@key))
|
buffer_receive(event.to_json, event.sprintf(@key))
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
event_key_and_payload = [event.sprintf(@key), event.to_json]
|
event_key = event.sprintf(@key)
|
||||||
|
event_key_and_payload = [event_key, event.to_json]
|
||||||
|
|
||||||
begin
|
begin
|
||||||
@redis ||= connect
|
@redis ||= connect
|
||||||
if @data_type == 'list'
|
if @data_type == 'list'
|
||||||
|
congestion_check(event_key)
|
||||||
@redis.rpush *event_key_and_payload
|
@redis.rpush *event_key_and_payload
|
||||||
else
|
else
|
||||||
@redis.publish *event_key_and_payload
|
@redis.publish *event_key_and_payload
|
||||||
|
@ -136,15 +156,30 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
|
||||||
@logger.warn("Failed to send event to redis", :event => event,
|
@logger.warn("Failed to send event to redis", :event => event,
|
||||||
:identity => identity, :exception => e,
|
:identity => identity, :exception => e,
|
||||||
:backtrace => e.backtrace)
|
:backtrace => e.backtrace)
|
||||||
sleep 1
|
sleep @reconnect_interval
|
||||||
@redis = nil
|
@redis = nil
|
||||||
retry
|
retry
|
||||||
end
|
end
|
||||||
end # def receive
|
end # def receive
|
||||||
|
|
||||||
|
def congestion_check(key)
|
||||||
|
return if @congestion_threshold == 0
|
||||||
|
if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check.
|
||||||
|
@congestion_check_time = Time.now.to_i
|
||||||
|
while @redis.llen(key) > @congestion_threshold # Don't push event to redis key which has reached @congestion_threshold.
|
||||||
|
@logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds")
|
||||||
|
sleep @congestion_interval
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# called from Stud::Buffer#buffer_flush when there are events to flush
|
# called from Stud::Buffer#buffer_flush when there are events to flush
|
||||||
def flush(events, key)
|
def flush(events, key, teardown=false)
|
||||||
@redis ||= connect
|
@redis ||= connect
|
||||||
|
# we should not block due to congestion on teardown
|
||||||
|
# to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
|
||||||
|
congestion_check(key) unless teardown
|
||||||
@redis.rpush(key, events)
|
@redis.rpush(key, events)
|
||||||
end
|
end
|
||||||
# called from Stud::Buffer#buffer_flush when an error occurs
|
# called from Stud::Buffer#buffer_flush when an error occurs
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue