mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- stomp works now in jruby. Except for this bug:
http://jira.codehaus.org/browse/JRUBY-4941
This commit is contained in:
parent
81f9cb9f32
commit
e671dc1166
2 changed files with 46 additions and 26 deletions
|
@ -1,33 +1,49 @@
|
|||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/stomp/handler"
|
||||
|
||||
# TODO(sissel): This class doesn't work yet in JRuby. Haven't debugged it much.
|
||||
|
||||
class LogStash::Inputs::Stomp < LogStash::Inputs::Base
|
||||
|
||||
config_name "stomp"
|
||||
config :host, :validate => :string
|
||||
config :port, :validate => :number
|
||||
config :user, :validate => :string
|
||||
config :password, :validate => :string
|
||||
config :destination, :validate => :string
|
||||
config :debug, :validate => :boolean
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
raise "issue/17: needs refactor to support configfile"
|
||||
@logger.debug(["Connecting", { :url => @url }])
|
||||
|
||||
@debug ||= false
|
||||
@port ||= 61613
|
||||
@user ||= ''
|
||||
@password ||= ''
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info(["Registering input", { :url => @url}])
|
||||
EventMachine::connect(@url.host, @url.port, InputHandler, self, @logger, @url)
|
||||
require "stomp"
|
||||
|
||||
if @destination == "" or @destination.nil?
|
||||
@logger.error("No destination path given for stomp")
|
||||
return
|
||||
end
|
||||
|
||||
begin
|
||||
@client = Stomp::Client.new(@user, @password, @host, @port)
|
||||
rescue Errno::ECONNREFUSED
|
||||
@logger.error("Connection refused to #{@host}:#{@port}...")
|
||||
# TODO(sissel): Retry?
|
||||
end
|
||||
end # def register
|
||||
|
||||
private
|
||||
class InputHandler < LogStash::Stomp::Handler
|
||||
def receive_msg(message)
|
||||
super
|
||||
|
||||
unless message.command == "CONNECTED"
|
||||
event = LogStash::Event.from_json(message.body)
|
||||
@input.receive(event)
|
||||
end
|
||||
end # def receive_msg
|
||||
end # class StompHandler
|
||||
def run(queue)
|
||||
@client.subscribe(@destination) do |msg|
|
||||
@logger.debug(["Got message from stomp", { :msg => msg }])
|
||||
#event = LogStash::Event.from_json(message.body)
|
||||
#queue << event
|
||||
end
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Stomp
|
||||
|
|
|
@ -1,28 +1,32 @@
|
|||
require "logstash/outputs/base"
|
||||
require "logstash/namespace"
|
||||
require "logstash/stomp/handler"
|
||||
|
||||
class LogStash::Outputs::Stomp < LogStash::Outputs::Base
|
||||
attr_reader :url
|
||||
|
||||
config_name "stomp"
|
||||
config :host, :validate => :string
|
||||
config :port, :validate => :number
|
||||
config :user, :validate => :string
|
||||
config :password, :validate => :string
|
||||
config :destination, :validate => :string
|
||||
config :debug, :validate => :boolean
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@logger.debug(["Initialize", { :url => @url }])
|
||||
@debug ||= false
|
||||
@port ||= 61613
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info(["Registering output", { :url => @url }])
|
||||
@connection = EventMachine::connect(@url.host, @url.port, LogStash::Stomp::Handler, self, @logger, @url)
|
||||
require "stomp"
|
||||
@client = Stomp::Client.new(@user, @password, @host, @port)
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
@logger.debug(["Sending event", { :url => @url, :event => event }])
|
||||
@connection.send(@url.path, event.to_json)
|
||||
@logger.debug(["stomp sending event", { :host => @host, :event => event }])
|
||||
@client.publish(@destination, event.to_json)
|
||||
end # def receive
|
||||
end # class LogStash::Outputs::Stomp
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue