Merge pull request #144 from logstash/zmq-topic-fixes

Zmq topic fixes
This commit is contained in:
John E. Vincent 2012-05-01 08:06:57 -07:00
commit 93fe6a05e8
2 changed files with 48 additions and 18 deletions

View file

@ -13,7 +13,7 @@ require "timeout"
class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
config_name "zeromq"
plugin_status "experimental"
plugin_status "beta"
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
@ -40,13 +40,20 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# On outputs, this allows you to tag a message for routing
# NOTE: ZeroMQ does subscriber side filtering.
# NOTE: All topics have an implicit wildcard at the end
config :topic, :validate => :string, :default => ""
# You can specify multiple topics here
config :topic, :validate => :array
# mode
# server mode binds/listens
# client mode connects
config :mode, :validate => ["server", "client"], :default => "server"
# sender
# overrides the sender to
# set the source of the event
# default is "zmq+topology://type/"
config :sender, :validate => :string
# 0mq socket options
# This exposes zmq_setsockopt
# for advanced tuning
@ -56,7 +63,6 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
# ZMQ::HWM - high water mark
# ZMQ::IDENTITY - named queues
# ZMQ::SWAP_SIZE - space for disk overflow
# ZMQ::SUBSCRIBE - topic filters for pubsub
#
# example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"]
config :sockopt, :validate => :hash
@ -77,7 +83,6 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
zmq_const = ZMQ::SUB
end # case socket_type
@zsocket = context.socket(zmq_const)
error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
"while setting ZMQ::LINGER == 1)")
@ -85,11 +90,24 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
setopts(@zsocket, @sockopt)
end
setopts(@zsocket, {"ZMQ::SUBSCRIBE" => @topic}) if @topology == "pubsub"
@address.each do |addr|
setup(@zsocket, addr)
end
if @topology == "pubsub"
if @topic.nil?
@logger.debug("ZMQ - No topic provided. Subscribing to all messages")
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
"while setting ZMQ::SUBSCRIBE")
else
@topic.each do |t|
@logger.debug("ZMQ subscribing to topic: #{t}")
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
"while setting ZMQ::SUBSCRIBE == #{t}")
end
end
end
end # def register
def teardown
@ -103,17 +121,25 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
def run(output_queue)
begin
loop do
if @topology == "pubsub"
topic = ''
rc = @zsocket.recv_string(topic)
error_check(rc, "in recv_string")
@logger.debug("0mq input: got topic #{topic}")
end
msg = ''
rc = @zsocket.recv_string(msg)
# Here's the unified receiver
# Get the first part as the msg
m1 = ''
rc = @zsocket.recv_string(m1)
error_check(rc, "in recv_string")
@logger.debug("0mq: receiving", :event => msg)
e = self.to_event(msg, @source)
@logger.debug("ZMQ receiving", :event => m1)
msg = m1
# If we have more parts, we'll eat the first as the topic
# and set the message to the second part
if @zsocket.more_parts?
@logger.debug("Multipart message detected. Setting @message to second part. First part was: #{m1}")
m2 = ''
rc2 = @zsocket.recv_string(m2)
error_check(rc2, "in recv_string")
@logger.debug("ZMQ receiving", :event => m2)
msg = m2
end
@sender ||= "zmq+#{@topology}://#{@type}/"
e = self.to_event(msg, @sender)
if e
output_queue << e
end
@ -125,4 +151,9 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
@logger.debug("Read timeout", subscriber => @zsocket)
end # begin
end # def run
private
def build_source_string
id = @address.first.clone
end
end # class LogStash::Inputs::ZeroMQ

View file

@ -12,7 +12,7 @@ require "logstash/namespace"
class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
config_name "zeromq"
plugin_status "experimental"
plugin_status "beta"
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
@ -56,7 +56,6 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
# ZMQ::HWM - high water mark
# ZMQ::IDENTITY - named queues
# ZMQ::SWAP_SIZE - space for disk overflow
# ZMQ::SUBSCRIBE - topic filters for pubsub
#
# example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"]
config :sockopt, :validate => :hash