- add codec support to zeromq output

This commit is contained in:
Jordan Sissel 2013-08-24 22:47:27 -07:00
parent d756181ac1
commit 0b028b5224

View file

@ -14,6 +14,8 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
config_name "zeromq"
milestone 2
default :codec, "json"
# 0mq socket address to connect or bind.
# Please note that `inproc://` will not work with logstashi.
# For each we use a context per thread.
@ -82,11 +84,13 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
@address.each do |addr|
setup(@zsocket, addr)
end
@codec.on_event(&method(:publish))
end # def register
public
def teardown
error_check(@publisher.close, "while closing the socket")
error_check(@zsocket.close, "while closing the socket")
end # def teardown
private
@ -98,19 +102,19 @@ class LogStash::Outputs::ZeroMQ < LogStash::Outputs::Base
def receive(event)
return unless output?(event)
# TODO(sissel): Figure out why masterzen has '+ "\n"' here
#wire_event = event.to_hash.to_json + "\n"
wire_event = event.to_json
begin
@logger.debug("0mq: sending", :event => wire_event)
if @topology == "pubsub"
@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE), "in topic send_string")
end
error_check(@zsocket.send_string(wire_event), "in send_string")
rescue => e
@logger.warn("0mq output exception", :address => @address, :queue => @queue_name, :exception => e, :backtrace => e.backtrace)
end
@codec.encode(event)
end # def receive
def publish(payload)
@logger.debug("0mq: sending", :event => payload)
if @topology == "pubsub"
# TODO(sissel): Need to figure out how to fit this into the codecs system.
#@logger.debug("0mq output: setting topic to: #{event.sprintf(@topic)}")
#error_check(@zsocket.send_string(event.sprintf(@topic), ZMQ::SNDMORE),
#"in topic send_string")
end
error_check(@zsocket.send_string(payload), "in send_string")
rescue => e
@logger.warn("0mq output exception", :address => @address, :exception => e)
end
end # class LogStash::Outputs::ZeroMQ