diff --git a/etc/logstash-stomp-input.yaml b/etc/logstash-stomp-input.yaml new file mode 100644 index 000000000..b45132076 --- /dev/null +++ b/etc/logstash-stomp-input.yaml @@ -0,0 +1,7 @@ +--- +inputs: + stomp: + - stomp://logs:password@localhost:6163/topic/logs +outputs: +- stdout:/// + diff --git a/etc/logstash-stomp.yaml b/etc/logstash-stomp.yaml new file mode 100644 index 000000000..f34dd6500 --- /dev/null +++ b/etc/logstash-stomp.yaml @@ -0,0 +1,7 @@ +--- +inputs: + tail-syslog: + - /var/log/syslog +outputs: +- stomp://logs:password@localhost:6163/topic/logs + diff --git a/lib/logstash/inputs/stomp.rb b/lib/logstash/inputs/stomp.rb new file mode 100644 index 000000000..e9c76650a --- /dev/null +++ b/lib/logstash/inputs/stomp.rb @@ -0,0 +1,27 @@ +require "logstash/inputs/base" +require "logstash/stomp/handler" + +class LogStash::Inputs::Stomp < LogStash::Inputs::Base + + class InputHandler < LogStash::Stomp::Handler + def receive_msg(message) + super + + unless message.command == "CONNECTED" + event = LogStash::Event.from_json(message.body) + @input.receive(event) + end + end # def receive_msg + end # class StompHandler + + def initialize(url, config={}, &block) + super + + @logger.debug(["Connecting", { :url => @url }]) + end # def initialize + + def register + @logger.info(["Registering input", { :url => @url}]) + EventMachine::connect(@url.host, @url.port, InputHandler, self, @logger, @url) + end # def register +end # class LogStash::Inputs::Amqp diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb new file mode 100644 index 000000000..ea93c9361 --- /dev/null +++ b/lib/logstash/outputs/stomp.rb @@ -0,0 +1,22 @@ +require "logstash/outputs/base" +require "logstash/stomp/handler" + +class LogStash::Outputs::Stomp < LogStash::Outputs::Base + attr_reader :url + + def initialize(url, config={}, &block) + super + + @logger.debug(["Initialize", { :url => @url }]) + end # def initialize + + def register + @logger.info(["Registering output", { :url => @url }]) + @connection = EventMachine::connect(@url.host, @url.port, LogStash::Stomp::Handler, self, @logger, @url) + end # def register + + def receive(event) + @logger.debug(["Sending event", { :url => @url, :event => event }]) + @connection.send(@url.path, event.to_json) + end # def receive +end # class LogStash::Outputs::Stomp diff --git a/lib/logstash/stomp/handler.rb b/lib/logstash/stomp/handler.rb new file mode 100644 index 000000000..23fc8355d --- /dev/null +++ b/lib/logstash/stomp/handler.rb @@ -0,0 +1,34 @@ +# Base of Stomp Handler +# it handles connecting and subscribing to the stomp broker which +# is used in both stomp input and output +class LogStash::Stomp + class Handler < EventMachine::Connection + include EM::Protocols::Stomp + + def initialize(*args) + super + + @input = args[0] + @logger = args[1] + @url = args[2] + end # def initialize + + def connection_completed + @logger.debug("Connected") + connect :login => @url.user, :passcode => @url.password + end # def connection_completed + + def unbind + @logger.error(["Error when connecting to stomp broker", { :url => @url }]) + end # def unbind + + def receive_msg(message) + @logger.debug(["receiving message", { :msg => message }]) + if message.command == "CONNECTED" + @logger.debug(["subscribing to", { :path => @url.path }]) + subscribe @url.path + return + end + end # def receive_msg + end # class Handler +end # class LogStash::Stomp