mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge commit 'c0551dfaaf
' into pre-jruby
This commit is contained in:
commit
e2ad4929b6
4 changed files with 26 additions and 11 deletions
|
@ -8,3 +8,4 @@ Contributors:
|
||||||
* Alexandre Dulaunoy (adulau)
|
* Alexandre Dulaunoy (adulau)
|
||||||
* Ryan Ausanka-Crues (rausanka) / tello.com (startupseven)
|
* Ryan Ausanka-Crues (rausanka) / tello.com (startupseven)
|
||||||
* Piotr Usewicz (pusewicz)
|
* Piotr Usewicz (pusewicz)
|
||||||
|
* Charles Duffy (charles-dyfis-net)
|
||||||
|
|
|
@ -3,6 +3,7 @@ require "logstash/inputs/base"
|
||||||
require "logstash/namespace"
|
require "logstash/namespace"
|
||||||
require "mq" # rubygem 'amqp'
|
require "mq" # rubygem 'amqp'
|
||||||
require "uuidtools" # rubygem 'uuidtools'
|
require "uuidtools" # rubygem 'uuidtools'
|
||||||
|
require "cgi"
|
||||||
|
|
||||||
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||||
|
@ -13,26 +14,32 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||||
|
|
||||||
@mq = nil
|
@mq = nil
|
||||||
|
|
||||||
# Handle path /<type>/<name>
|
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
|
||||||
unused, @mqtype, @name = @url.path.split("/", 3)
|
# vhost allowed to contain slashes
|
||||||
if @mqtype == nil or @name == nil
|
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
|
||||||
raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
|
unused, @vhost, @mqtype, @name = $~.captures
|
||||||
|
else
|
||||||
|
raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||||
end
|
end
|
||||||
|
|
||||||
if !MQTYPES.include?(@mqtype)
|
if !MQTYPES.include?(@mqtype)
|
||||||
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.JOIN(", ")}"
|
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
|
||||||
end
|
end
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
@logger.info("Registering input #{@url}")
|
@logger.info("Registering input #{@url}")
|
||||||
|
query_args = @url.query ? CGI.parse(@url.query) : {}
|
||||||
amqpsettings = {
|
amqpsettings = {
|
||||||
|
:vhost => (@vhost or "/"),
|
||||||
:host => @url.host,
|
:host => @url.host,
|
||||||
:port => (@url.port or 5672),
|
:port => (@url.port or 5672),
|
||||||
}
|
}
|
||||||
amqpsettings[:user] = @url.user if @url.user
|
amqpsettings[:user] = @url.user if @url.user
|
||||||
amqpsettings[:pass] = @url.password if @url.password
|
amqpsettings[:pass] = @url.password if @url.password
|
||||||
|
amqpsettings[:logging] = query_args.include? "debug"
|
||||||
|
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||||
@amqp = AMQP.connect(amqpsettings)
|
@amqp = AMQP.connect(amqpsettings)
|
||||||
@mq = MQ.new(@amqp)
|
@mq = MQ.new(@amqp)
|
||||||
@target = nil
|
@target = nil
|
||||||
|
|
|
@ -2,6 +2,7 @@ require "amqp" # rubygem 'amqp'
|
||||||
require "logstash/outputs/base"
|
require "logstash/outputs/base"
|
||||||
require "logstash/namespace"
|
require "logstash/namespace"
|
||||||
require "mq" # rubygem 'amqp'
|
require "mq" # rubygem 'amqp'
|
||||||
|
require "cgi"
|
||||||
|
|
||||||
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
||||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||||
|
@ -10,26 +11,32 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
||||||
def initialize(url, config={}, &block)
|
def initialize(url, config={}, &block)
|
||||||
super
|
super
|
||||||
|
|
||||||
# Handle path /<type>/<name>
|
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
|
||||||
unused, @mqtype, @name = @url.path.split("/", 3)
|
# vhost allowed to contain slashes
|
||||||
if @mqtype == nil or @name == nil
|
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
|
||||||
raise "amqp urls must have a path of /<type>/name where <type> is #{MQTYPES.join(", ")}"
|
unused, @vhost, @mqtype, @name = $~.captures
|
||||||
|
else
|
||||||
|
raise "amqp urls must have a path of /<type>/name or /vhost/<type>/name where <type> is #{MQTYPES.join(", ")}"
|
||||||
end
|
end
|
||||||
|
|
||||||
if !MQTYPES.include?(@mqtype)
|
if !MQTYPES.include?(@mqtype)
|
||||||
raise "Invalid type '#{@mqtype}' must be one #{MQTYPES.join(", ")}"
|
raise "Invalid type '#{@mqtype}' must be one of #{MQTYPES.join(", ")}"
|
||||||
end
|
end
|
||||||
end # def initialize
|
end # def initialize
|
||||||
|
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
@logger.info("Registering output #{@url}")
|
@logger.info("Registering output #{@url}")
|
||||||
|
query_args = @url.query ? CGI.parse(@url.query) : {}
|
||||||
amqpsettings = {
|
amqpsettings = {
|
||||||
|
:vhost => (@vhost or "/"),
|
||||||
:host => @url.host,
|
:host => @url.host,
|
||||||
:port => (@url.port or 5672),
|
:port => (@url.port or 5672),
|
||||||
}
|
}
|
||||||
amqpsettings[:user] = @url.user if @url.user
|
amqpsettings[:user] = @url.user if @url.user
|
||||||
amqpsettings[:pass] = @url.password if @url.password
|
amqpsettings[:pass] = @url.password if @url.password
|
||||||
|
amqpsettings[:logging] = query_args.include? "debug"
|
||||||
|
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||||
@amqp = AMQP.connect(amqpsettings)
|
@amqp = AMQP.connect(amqpsettings)
|
||||||
@mq = MQ.new(@amqp)
|
@mq = MQ.new(@amqp)
|
||||||
@target = nil
|
@target = nil
|
||||||
|
|
|
@ -46,7 +46,7 @@
|
||||||
|
|
||||||
jQuery.getJSON("/api/histogram", logstash.params, function(histogram, text, jqxhr) {
|
jQuery.getJSON("/api/histogram", logstash.params, function(histogram, text, jqxhr) {
|
||||||
/* Load the data into the graph */
|
/* Load the data into the graph */
|
||||||
flot_data = [];
|
var flot_data = [];
|
||||||
// histogram is an array of { "key": ..., "count": ... }
|
// histogram is an array of { "key": ..., "count": ... }
|
||||||
for (var i in histogram) {
|
for (var i in histogram) {
|
||||||
flot_data.push([parseInt(histogram[i]["key"]), histogram[i]["count"]])
|
flot_data.push([parseInt(histogram[i]["key"]), histogram[i]["count"]])
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue