mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
- Update amqp and stdin inputs for jrubyland
This commit is contained in:
parent
bb110ff82f
commit
85db3348df
2 changed files with 57 additions and 73 deletions
|
@ -1,8 +1,8 @@
|
|||
require "amqp" # rubygem 'amqp'
|
||||
require "bunny" # rubygem 'bunny'
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/namespace"
|
||||
require "mq" # rubygem 'amqp'
|
||||
require "uuidtools" # rubygem 'uuidtools'
|
||||
#require "uuidtools" # rubygem 'uuidtools'
|
||||
require "cgi"
|
||||
require "uri"
|
||||
|
||||
|
@ -10,68 +10,67 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
|
||||
config_name "amqp"
|
||||
config "host" => (lambda do |value|
|
||||
# Use URI to validate.
|
||||
u = URI.parse("dummy:///")
|
||||
begin
|
||||
u.host = value
|
||||
rescue => e
|
||||
return false, "Invalid hostname #{value.inspect}"
|
||||
end
|
||||
return true
|
||||
) # config "host"
|
||||
#config "host" => (lambda do |value|
|
||||
## Use URI to validate.
|
||||
#u = URI.parse("dummy:///")
|
||||
#begin
|
||||
#u.host = value
|
||||
#rescue => e
|
||||
#return false, "Invalid hostname #{value.inspect}"
|
||||
#end
|
||||
#return true
|
||||
#) # config "host"
|
||||
|
||||
config :host => :string
|
||||
config :user => :string
|
||||
config :pass => :string
|
||||
config :exchange_type => :string
|
||||
config :name => :string
|
||||
config :vhost => :string
|
||||
config :durable => :boolean
|
||||
config :debug => :boolean
|
||||
|
||||
|
||||
public
|
||||
def initialize(params)
|
||||
super
|
||||
|
||||
@mq = nil
|
||||
@debug ||= false
|
||||
@durable ||= false
|
||||
|
||||
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
|
||||
# vhost allowed to contain slashes
|
||||
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
|
||||
unused, @vhost, @mqtype, @name = $~.captures
|
||||
else
|
||||
raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||
end
|
||||
|
||||
if !MQTYPES.include?(@mqtype)
|
||||
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
|
||||
if !MQTYPES.include?(@exchange_type)
|
||||
raise "Invalid type '#{@exchange_type}' must be one of #{MQTYPES.join(", ")}"
|
||||
end
|
||||
end # def initialize
|
||||
|
||||
public
|
||||
def register
|
||||
@logger.info("Registering input #{@url}")
|
||||
query_args = @url.query ? CGI.parse(@url.query) : {}
|
||||
amqpsettings = {
|
||||
:vhost => (@vhost or "/"),
|
||||
:host => @url.host,
|
||||
:port => (@url.port or 5672),
|
||||
:host => @host,
|
||||
:port => (@port or 5672),
|
||||
}
|
||||
amqpsettings[:user] = @url.user if @url.user
|
||||
amqpsettings[:pass] = @url.password if @url.password
|
||||
amqpsettings[:logging] = query_args.include? "debug"
|
||||
amqpsettings[:user] = @user if @user
|
||||
amqpsettings[:pass] = @password if @password
|
||||
amqpsettings[:logging] = @debug
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||
@amqp = AMQP.connect(amqpsettings)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
@bunny = Bunny.new(amqpsettings)
|
||||
|
||||
@target = @mq.queue(UUIDTools::UUID.timestamp_create)
|
||||
case @mqtype
|
||||
when "fanout"
|
||||
#@target.bind(MQ.fanout(@url.path, :durable => true))
|
||||
@target.bind(@mq.fanout(@name))
|
||||
when "direct"
|
||||
@target.bind(@mq.direct(@name))
|
||||
when "topic"
|
||||
@target.bind(@mq.topic(@name))
|
||||
end # case @mqtype
|
||||
# TODO(sissel): Check for errors here.
|
||||
@bunny.start
|
||||
|
||||
@target.subscribe(:ack => true) do |header, message|
|
||||
event = LogStash::Event.from_json(message)
|
||||
receive(event)
|
||||
header.ack
|
||||
end
|
||||
@queue = @bunny.queue(@name)
|
||||
exchange = @bunny.exchange(@name, :type => @exchange_type.to_sym)
|
||||
@queue.bind(exchange)
|
||||
end # def register
|
||||
|
||||
def run(queue)
|
||||
loop do
|
||||
@queue.subscribe do |data|
|
||||
@logger.info(:amqp_input => data)
|
||||
end
|
||||
end
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Amqp
|
||||
|
|
|
@ -9,33 +9,18 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base
|
|||
|
||||
public
|
||||
def register
|
||||
EventMachine::attach($stdin, InputHandler, self)
|
||||
@url.host = Socket.gethostname
|
||||
@host = Socket.gethostname
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
event = LogStash::Event.new({
|
||||
"@message" => event,
|
||||
"@type" => @type,
|
||||
"@tags" => @tags.clone,
|
||||
})
|
||||
event.source = @url
|
||||
@logger.debug(["Got event", event])
|
||||
@callback.call(event)
|
||||
end # def receive
|
||||
|
||||
class InputHandler < EventMachine::Connection
|
||||
def initialize(obj)
|
||||
@receiver = obj
|
||||
end # def initialize
|
||||
|
||||
def receive_data(data)
|
||||
@buffer ||= BufferedTokenizer.new
|
||||
@buffer.extract(data).each do |line|
|
||||
@receiver.receive(line)
|
||||
end
|
||||
end # def receive_data
|
||||
end # class InputHandler
|
||||
|
||||
def run(queue)
|
||||
loop do
|
||||
event = LogStash::Event.new
|
||||
event.message = $stdin.readline.chomp
|
||||
event.type = @type
|
||||
event.tags = @tags.clone rescue []
|
||||
event.source = "stdin://#{@host}/"
|
||||
@logger.debug(["Got event", event])
|
||||
queue << event
|
||||
end # loop
|
||||
end # def run
|
||||
end # class LogStash::Inputs::Stdin
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue