adding new redis 'pattern_channel' input support for PSUB

This commit is contained in:
John E. Vincent 2011-07-07 04:41:01 -04:00
parent e0181f1af0
commit a4e3bba50f

View file

@ -39,8 +39,9 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
# Either list or channel. If redis_type is list, then we will BLPOP the
# key. If redis_type is channel, then we will SUBSCRIBE to the key.
# If redis_type is pattern_channel, then we will PSUBSCRIBE to the key.
# TODO: change required to true
config :data_type, :validate => [ "list", "channel" ], :required => false
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false
public
def initialize(params)
@ -127,6 +128,23 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
end
end
private
def pattern_channel_listener redis, output_queue
redis.psubscribe @key do |on|
on.psubscribe do |ch, count|
@logger.info "Subscribed to #{ch} (#{count})"
end
on.pmessage do |ch, event, message|
queue_event message, output_queue
end
on.punsubscribe do |ch, count|
@logger.info "Unsubscribed from #{ch} (#{count})"
end
end
end
# Since both listeners have the same basic loop, we've abstracted the outer
# loop.
private
@ -146,8 +164,10 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
def run(output_queue)
if @data_type == 'list'
listener_loop :list_listener, output_queue
else
elsif @data_type == 'channel'
listener_loop :channel_listener, output_queue
else
listener_loop :pattern_channel_listener, output_queue
end
end # def run
@ -158,5 +178,10 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
@redis.quit
@redis = nil
end
if @data_type == 'pattern_channel' and @redis
@redis.punsubscribe
@redis.quit
@redis = nil
end
end
end # class LogStash::Inputs::Redis