mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Add messages to zmq error_check
This commit is contained in:
parent
7468b8e229
commit
7fdb2fb636
3 changed files with 21 additions and 14 deletions
|
@ -36,14 +36,17 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
|
|||
def register
|
||||
self.class.send(:include, LogStash::Util::ZeroMQ)
|
||||
@subscriber = context.socket(ZMQ::SUB)
|
||||
error_check(@subscriber.setsockopt(ZMQ::HWM, @queue_length))
|
||||
error_check(@subscriber.setsockopt(ZMQ::SUBSCRIBE, @queue))
|
||||
error_check(@subscriber.setsockopt(ZMQ::LINGER, 1))
|
||||
error_check(@subscriber.setsockopt(ZMQ::HWM, @queue_size),
|
||||
"while setting ZMQ:HWM == #{@queue_size.inspect}")
|
||||
error_check(@subscriber.setsockopt(ZMQ::SUBSCRIBE, @queue),
|
||||
"while setting ZMQ:SUBSCRIBE == #{@queue.inspect}")
|
||||
error_check(@subscriber.setsockopt(ZMQ::LINGER, 1),
|
||||
"while setting ZMQ::LINGER == 1)")
|
||||
setup(@subscriber, @address)
|
||||
end # def register
|
||||
|
||||
def teardown
|
||||
error_check(@subscriber.close)
|
||||
error_check(@subscriber.close, "while closing the zmq socket")
|
||||
end # def teardown
|
||||
|
||||
def server?
|
||||
|
@ -55,7 +58,7 @@ class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
|
|||
loop do
|
||||
msg = ''
|
||||
rc = @subscriber.recv_string(msg)
|
||||
error_check(rc)
|
||||
error_check(rc, "in recv_string")
|
||||
@logger.debug("0mq: receiving", :event => msg)
|
||||
e = self.to_event(msg, @source)
|
||||
if e
|
||||
|
|
|
@ -33,14 +33,18 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
|
|||
# because the Config mixin thinks we're the included module and not the base-class
|
||||
self.class.send(:include, LogStash::Util::ZeroMQ)
|
||||
@publisher = context.socket(ZMQ::PUB)
|
||||
error_check(@publisher.setsockopt(ZMQ::SUBSCRIBE, @queue)) if @queue != ""
|
||||
error_check(@publisher.setsockopt(ZMQ::LINGER, 1))
|
||||
if !@queue.empty?
|
||||
error_check(@publisher.setsockopt(ZMQ::SUBSCRIBE, @queue),
|
||||
"while setting ZMQ::SUBSCRIBE to #{@queue.inspect}")
|
||||
end
|
||||
error_check(@publisher.setsockopt(ZMQ::LINGER, 1),
|
||||
"while setting ZMQ::SUBSCRIBE to 1")
|
||||
setup(@publisher, @address)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def teardown
|
||||
error_check(@publisher.close)
|
||||
error_check(@publisher.close, "while closing the socket")
|
||||
end # def teardown
|
||||
|
||||
private
|
||||
|
@ -58,7 +62,7 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
|
|||
|
||||
begin
|
||||
@logger.debug("0mq: sending", :event => wire_event)
|
||||
error_check(@publisher.send_string(wire_event))
|
||||
error_check(@publisher.send_string(wire_event), "in send_string")
|
||||
rescue => e
|
||||
@logger.warn("0mq output exception", :address => @address, :queue => @queue, :exception => e, :backtrace => e.backtrace)
|
||||
end
|
||||
|
|
|
@ -10,17 +10,17 @@ module LogStash::Util::ZeroMQ
|
|||
|
||||
def setup(socket, address)
|
||||
if server?
|
||||
error_check(socket.bind(address))
|
||||
error_check(socket.bind(address), "binding to #{address}")
|
||||
else
|
||||
error_check(socket.connect(address))
|
||||
error_check(socket.connect(address), "connecting to #{address}")
|
||||
end
|
||||
@logger.info("0mq: #{server? ? 'connected' : 'bound'}", :address => address)
|
||||
end
|
||||
|
||||
def error_check(rc)
|
||||
def error_check(rc, doing)
|
||||
unless ZMQ::Util.resultcode_ok?(rc)
|
||||
@logger.error("ZeroMQ error: ", { :error_code => rc })
|
||||
raise "ZeroMQ Error"
|
||||
@logger.error("ZeroMQ error while #{doing}", { :error_code => rc })
|
||||
raise "ZeroMQ Error while #{doing}"
|
||||
end
|
||||
end # def error_check
|
||||
end # module LogStash::Util::ZeroMQ
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue