diff --git a/lib/logstash/inputs/zeromq.rb b/lib/logstash/inputs/zeromq.rb index 192967f36..5fb951523 100644 --- a/lib/logstash/inputs/zeromq.rb +++ b/lib/logstash/inputs/zeromq.rb @@ -17,138 +17,75 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base # 0mq socket address to connect or bind # Please note that `inproc://` will not work with logstash - # As each we use a context per thread - # To listen on all addresses, use `*` as the ip address - config :address, :validate => :string, :default => "tcp://127.0.0.1:2120" + # as each we use a context per thread. + # By default, inputs bind/listen + # and outputs connect + config :address, :validate => :array, :default => ["tcp://*:2120"] - # 0mq queue size - config :queue_size, :validate => :number, :default => 20 + # 0mq topology + # The default logstash topologies work as follows: + # * pushpull - inputs are pull, outputs are push + # * pubsub - inputs are subscribers, outputs are publishers + # * pair - inputs are clients, inputs are servers + # + # If the predefined topology flows don't work for you, + # you can change the 'mode' setting + # TODO (lusis) add req/rep MAYBE + # TODO (lusis) add router/dealer + config :topology, :validate => ["pushpull", "pubsub", "pair"] - # 0mq topic (Used with ZMQ_SUBSCRIBE, see http://api.zeromq.org/2-1:zmq-setsockopt - # for 'ZMQ_SUBSCRIBE: Establish message filter') - config :topic, :validate => :string, :default => "" # default all - - # 0mq message pattern - # Instead of declaring socket_types in logstash, - # we abstract this to a 'pattern': - # This should cover most standard use cases - # In the event that you need finer-grained control, - # please use `socket_type` and not this option. - # - # To control if this is a sender or receiver, please set `mode` below - # TODO (lusis) fix req/rep - config :pattern, :validate => ["pushpull", "pubsub", "pair", "reqrep"] - - # 0mq socket type - # There is no default. - # This setting conflicts with `pattern` - # TODO (lusis) fix req/rep - config :socket_type, :validate => ["req", "rep", "push", "pull", "sub", "pair"] - - # 0mq swap size - # Controls buffering to disk - # in the event of messages counts exceeding the queue_size - # size in bytes - # Default: 15MB - # (ZMQ_SWAP) - config :swap_size, :validate => :number - - # 0mq identity - # (ZMQ_IDENTITY) - config :identity, :validate => :string + # mode + # server mode binds/listens + # client mode connects + config :mode, :validate => ["server", "client"], :default => "server" # 0mq socket options # This exposes zmq_setsockopt # for advanced tuning # see http://api.zeromq.org/2-1:zmq-setsockopt for details + # + # This is where you would set values like: + # ZMQ::HWM - high water mark + # ZMQ::IDENTITY - named queues + # ZMQ::SWAP_SIZE - space for disk overflow + # ZMQ::SUBSCRIBE - topic filters for pubsub + # + # example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"] config :sockopt, :validate => :hash - # mode - # server mode binds/listens - # client mode connects - config :mode, :validate => ["server", "client"], :default => "client" - - @source = "0mq_#{@address}/#{@queue}" - public def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) - if @pattern - %w{socket_type}.each do |var| - if instance_variable_get("@#{var}") - @logger.error("inputs/zeromq: You cannot specify "\ - "'pattern' and also set '#{var}'") - raise "Invalid configuration detected. Please fix." - end # if instance_variable_get - end # socket_type each - case @pattern - when "pubsub" - # Input with pubsub only makes sense as a subscriber - @socket_type = "sub" - when "pushpull" - # Input with pushpull - # We can either listen for events - # or poll for events - @mode == "client" ? (@socket_type = "pull") : (@socket_type = "push") - when "pair" - # Input with pair - # We can either listen for events - # or poll for them - @socket_type = "pair" - when "reqrep" - # Input with reqrep - # We can either listen or events - # or poll for them - @mode == "client" ? (@socket_type = "req") : (@socket_type = "rep") - end # case pattern - end # if pattern - - case @socket_type - when "req" - zmq_const = ZMQ::REQ - @mode = "client" # req is by nature a client. it REQuests - when "rep" - zmq_const = ZMQ::REP - @mode = "server" # rep is by nature a server. it REPlies + case @topology when "pair" zmq_const = ZMQ::PAIR - @mode ||= "server" # pair can be either server or client. - when "pull" + when "pushpull" zmq_const = ZMQ::PULL - @mode = "client" # pull is by nature a client. - when "push" - zmq_const = ZMQ::PUSH - @mode = "server" # push is by nature a server - when "sub" + when "pubsub" zmq_const = ZMQ::SUB - @mode = "client" # sub is by nature a client end # case socket_type @zsocket = context.socket(zmq_const) - error_check(@zsocket.setsockopt(ZMQ::HWM, @queue_size), - "while setting ZMQ:HWM == #{@queue_size.inspect}") error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") - if @swap_size - error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size), - "while setting ZMQ::SWAP == #{@swap_size.inspect}") - end # if swap_size - - if @identity - error_check(@zsocket.setsockopt(ZMQ::IDENTITY, @identity), - "while setting ZMQ::IDENTITY == #{@identity.inspect}") - end # if identity - - if @socket_type == "sub" - error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, @topic), - "while setting ZMQ:SUBSCRIBE == #{@topic.inspect}") - end # if sub - - setup(@zsocket, @address) + # TODO (lusis) + # wireup sockopt hash + if @sockopt + @sockopt.each do |opt,value| + sockopt = opt.split('::')[1] + option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) + error_check(@zsocket.setsockopt(option, value), + "while setting #{opt} == 1)") + end + end + + @address.each do |addr| + setup(@zsocket, addr) + end end # def register def teardown diff --git a/lib/logstash/outputs/zeromq.rb b/lib/logstash/outputs/zeromq.rb index a99c0857f..48dfd6c35 100644 --- a/lib/logstash/outputs/zeromq.rb +++ b/lib/logstash/outputs/zeromq.rb @@ -14,72 +14,80 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base config_name "zeromq" plugin_status "experimental" - # 0mq socket address to connect or bind to - config :address, :validate => :string, :default => "tcp://127.0.0.1:2120" + # 0mq socket address to connect or bind + # Please note that `inproc://` will not work with logstash + # As each we use a context per thread + # By default, inputs bind/listen + # and outputs connect + config :address, :validate => :array, :default => ["tcp://127.0.0.1:2120"] - # 0mq queue size - config :queue_size, :validate => :number, :default => 20 + # 0mq topology + # The default logstash topologies work as follows: + # * pushpull - inputs are pull, outputs are push + # * pubsub - inputs are subscribers, outputs are publishers + # * pair - inputs are clients, inputs are servers + # + # If the predefined topology flows don't work for you, + # you can change the 'mode' setting + # TODO (lusis) add req/rep MAYBE + # TODO (lusis) add router/dealer + config :topology, :validate => ["pushpull", "pubsub", "pair"] - # 0mq topic (Used with ZMQ_SUBSCRIBE, see http://api.zeromq.org/2-1:zmq-setsockopt - # for 'ZMQ_SUBSCRIBE: Establish message filter') - config :queue_name, :validate => :string, :default => "" - - # 0mq socket type - # There is no default. - config :socket_type, :validate => ["rep","push","pub","router","pair"], :required => true - - # 0mq swap size - # Controls buffering to disk - # in the event of messages counts exceeding the queue_size - # size in bytes - # Default: 15MB - # (ZMQ_SWAP) - config :swap_size, :validate => :number, :default => 15728640 - - # 0mq identity - # (ZMQ_IDENTITY) - config :identity, :validate => :string + # mode + # server mode binds/listens + # client mode connects + config :mode, :validate => ["server", "client"], :default => "client" # 0mq socket options # This exposes zmq_setsockopt # for advanced tuning # see http://api.zeromq.org/2-1:zmq-setsockopt for details + # + # This is where you would set values like: + # ZMQ::HWM - high water mark + # ZMQ::IDENTITY - named queues + # ZMQ::SWAP_SIZE - space for disk overflow + # ZMQ::SUBSCRIBE - topic filters for pubsub + # + # example: sockopt => ["ZMQ::HWM", 50, "ZMQ::IDENTITY", "my_named_queue"] config :sockopt, :validate => :hash - # mode - # server mode binds/listens - # client mode connects - # This only makes sense with "pair" types - # default pair mode is server - config :mode, :validate => [ "server", "client"], :default => "server" - public def register require "ffi-rzmq" require "logstash/util/zeromq" self.class.send(:include, LogStash::Util::ZeroMQ) - case @socket_type - when "rep" - zmq_const = ZMQ::REP - @mode = "server" + + # Translate topology shorthand to socket types + case @topology when "pair" zmq_const = ZMQ::PAIR - @mode ||= "client" - when "push" + when "pushpull" zmq_const = ZMQ::PUSH - @mode = "server" - when "pub" + when "pubsub" zmq_const = ZMQ::PUB - @mode = "server" - end + end # case socket_type + @zsocket = context.socket(zmq_const) - error_check(@zsocket.setsockopt(ZMQ::HWM, @queue_size), - "while setting ZMQ:HWM == #{@queue_size.inspect}") + error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") - error_check(@zsocket.setsockopt(ZMQ::SWAP, @swap_size), - "while setting ZMQ::SWAP == #{@swap_size}") - setup(@zsocket, @address) + + # TODO (lusis) + # wireup sockopt hash better + # making assumptions on split + if @sockopt + @sockopt.each do |opt,value| + sockopt = opt.split('::')[1] + option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) + error_check(@zsocket.setsockopt(option, value), + "while setting #{opt} == 1)") + end + end + + @address.each do |addr| + setup(@zsocket, addr) + end end # def register public