diff --git a/lib/logstash/inputs/zeromq.rb b/lib/logstash/inputs/zeromq.rb index 6e8ed38d9..5bc08936c 100644 --- a/lib/logstash/inputs/zeromq.rb +++ b/lib/logstash/inputs/zeromq.rb @@ -73,17 +73,10 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") - # TODO (lusis) - # wireup sockopt hash if @sockopt - @sockopt.each do |opt,value| - sockopt = opt.split('::')[1] - option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) - error_check(@zsocket.setsockopt(option, value), - "while setting #{opt} == 1)") - end + setopts(@zsocket, @sockopt) end - + @address.each do |addr| setup(@zsocket, addr) end diff --git a/lib/logstash/outputs/zeromq.rb b/lib/logstash/outputs/zeromq.rb index 48dfd6c35..0434b6b92 100644 --- a/lib/logstash/outputs/zeromq.rb +++ b/lib/logstash/outputs/zeromq.rb @@ -73,16 +73,8 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base error_check(@zsocket.setsockopt(ZMQ::LINGER, 1), "while setting ZMQ::LINGER == 1)") - # TODO (lusis) - # wireup sockopt hash better - # making assumptions on split if @sockopt - @sockopt.each do |opt,value| - sockopt = opt.split('::')[1] - option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) - error_check(@zsocket.setsockopt(option, value), - "while setting #{opt} == 1)") - end + setopts(@zsocket, @sockopt) end @address.each do |addr| diff --git a/lib/logstash/util/zeromq.rb b/lib/logstash/util/zeromq.rb index d5da3572d..254aeaa9f 100644 --- a/lib/logstash/util/zeromq.rb +++ b/lib/logstash/util/zeromq.rb @@ -3,6 +3,9 @@ require "logstash/namespace" module LogStash::Util::ZeroMQ CONTEXT = ZMQ::Context.new + # LOGSTASH-400 + # see https://github.com/chuckremes/ffi-rzmq/blob/master/lib/ffi-rzmq/socket.rb#L93-117 + STRING_OPTS = %w{IDENTITY SUBSCRIBE UNSUBSCRIBE} def context CONTEXT @@ -23,4 +26,21 @@ module LogStash::Util::ZeroMQ raise "ZeroMQ Error while #{doing}" end end # def error_check + + def setopts(socket, options) + options.each do |opt,value| + sockopt = opt.split('::')[1] + option = ZMQ.const_defined?(sockopt) ? ZMQ.const_get(sockopt) : ZMQ.const_missing(sockopt) + unless STRING_OPTS.include?(sockopt) + begin + Float(value) + value = value.to_i + rescue ArgumentError + raise "#{sockopt} requires a numeric value. #{value} is not numeric" + end + end # end unless + error_check(socket.setsockopt(option, value), + "while setting #{opt} == #{value}") + end # end each + end # end setopts end # module LogStash::Util::ZeroMQ