mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Add Stomp input & output plugin to logstash
Signed-off-by: Brice Figureau <brice@daysofwonder.com>
This commit is contained in:
parent
db36afb016
commit
cb0564d19a
5 changed files with 97 additions and 0 deletions
7
etc/logstash-stomp-input.yaml
Normal file
7
etc/logstash-stomp-input.yaml
Normal file
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
inputs:
|
||||
stomp:
|
||||
- stomp://logs:password@localhost:6163/topic/logs
|
||||
outputs:
|
||||
- stdout:///
|
||||
|
7
etc/logstash-stomp.yaml
Normal file
7
etc/logstash-stomp.yaml
Normal file
|
@ -0,0 +1,7 @@
|
|||
---
|
||||
inputs:
|
||||
tail-syslog:
|
||||
- /var/log/syslog
|
||||
outputs:
|
||||
- stomp://logs:password@localhost:6163/topic/logs
|
||||
|
27
lib/logstash/inputs/stomp.rb
Normal file
27
lib/logstash/inputs/stomp.rb
Normal file
|
@ -0,0 +1,27 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/stomp/handler"
|
||||
|
||||
class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
||||
|
||||
class InputHandler < LogStash::Stomp::Handler
|
||||
def receive_msg(message)
|
||||
super
|
||||
|
||||
unless message.command == "CONNECTED"
|
||||
event = LogStash::Event.from_json(message.body)
|
||||
@input.receive(event)
|
||||
end
|
||||
end # def receive_msg
|
||||
end # class StompHandler
|
||||
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
|
||||
@logger.debug(["Connecting", { :url => @url }])
|
||||
end # def initialize
|
||||
|
||||
def register
|
||||
@logger.info(["Registering input", { :url => @url}])
|
||||
EventMachine::connect(@url.host, @url.port, InputHandler, self, @logger, @url)
|
||||
end # def register
|
||||
end # class LogStash::Inputs::Amqp
|
22
lib/logstash/outputs/stomp.rb
Normal file
22
lib/logstash/outputs/stomp.rb
Normal file
|
@ -0,0 +1,22 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/stomp/handler"
|
||||
|
||||
class LogStash::Outputs::Stomp < LogStash::Outputs::Base
|
||||
attr_reader :url
|
||||
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
|
||||
@logger.debug(["Initialize", { :url => @url }])
|
||||
end # def initialize
|
||||
|
||||
def register
|
||||
@logger.info(["Registering output", { :url => @url }])
|
||||
@connection = EventMachine::connect(@url.host, @url.port, LogStash::Stomp::Handler, self, @logger, @url)
|
||||
end # def register
|
||||
|
||||
def receive(event)
|
||||
@logger.debug(["Sending event", { :url => @url, :event => event }])
|
||||
@connection.send(@url.path, event.to_json)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Stomp
|
34
lib/logstash/stomp/handler.rb
Normal file
34
lib/logstash/stomp/handler.rb
Normal file
|
@ -0,0 +1,34 @@
|
|||
# Base of Stomp Handler
|
||||
# it handles connecting and subscribing to the stomp broker which
|
||||
# is used in both stomp input and output
|
||||
class LogStash::Stomp
|
||||
class Handler < EventMachine::Connection
|
||||
include EM::Protocols::Stomp
|
||||
|
||||
def initialize(*args)
|
||||
super
|
||||
|
||||
@input = args[0]
|
||||
@logger = args[1]
|
||||
@url = args[2]
|
||||
end # def initialize
|
||||
|
||||
def connection_completed
|
||||
@logger.debug("Connected")
|
||||
connect :login => @url.user, :passcode => @url.password
|
||||
end # def connection_completed
|
||||
|
||||
def unbind
|
||||
@logger.error(["Error when connecting to stomp broker", { :url => @url }])
|
||||
end # def unbind
|
||||
|
||||
def receive_msg(message)
|
||||
@logger.debug(["receiving message", { :msg => message }])
|
||||
if message.command == "CONNECTED"
|
||||
@logger.debug(["subscribing to", { :path => @url.path }])
|
||||
subscribe @url.path
|
||||
return
|
||||
end
|
||||
end # def receive_msg
|
||||
end # class Handler
|
||||
end # class LogStash::Stomp
|
Loading…
Add table
Add a link
Reference in a new issue