mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge branch 'es-vhost-support' of https://github.com/charles-dyfis-net/logstash into pull/13
This commit is contained in:
commit
280837fb47
3 changed files with 64 additions and 32 deletions
|
@ -6,7 +6,7 @@ require "uuidtools" # rubygem 'uuidtools'
|
|||
require "cgi"
|
||||
|
||||
class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
MQTYPES = [ "fanout", "direct", "topic" ]
|
||||
|
||||
public
|
||||
def initialize(url, type, config={}, &block)
|
||||
|
@ -39,20 +39,22 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
|
|||
amqpsettings[:user] = @url.user if @url.user
|
||||
amqpsettings[:pass] = @url.password if @url.password
|
||||
amqpsettings[:logging] = query_args.include? "debug"
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||
queue_name = ((@urlopts["queue"].nil? or @urlopts["queue"].empty?) ? "logstash-#{@name}" : @urlopts["queue"])
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{queue_name} on exchange #{@name.inspect}")
|
||||
@amqp = AMQP.connect(amqpsettings)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
||||
@target = @mq.queue(UUIDTools::UUID.timestamp_create)
|
||||
@durable_exchange = @urlopts["durable_exchange"] ? true : false
|
||||
@durable_queue = @urlopts["durable_queue"] ? true : false
|
||||
@target = @mq.queue(queue_name, :durable => @durable_queue)
|
||||
case @mqtype
|
||||
when "fanout"
|
||||
#@target.bind(MQ.fanout(@url.path, :durable => true))
|
||||
@target.bind(@mq.fanout(@name))
|
||||
@target.bind(@mq.fanout(@name, :durable => @durable_exchange))
|
||||
when "direct"
|
||||
@target.bind(@mq.direct(@name))
|
||||
@target.bind(@mq.direct(@name, :durable => @durable_exchange))
|
||||
when "topic"
|
||||
@target.bind(@mq.topic(@name))
|
||||
@target.bind(@mq.topic(@name, :durable => @durable_exchange))
|
||||
end # case @mqtype
|
||||
|
||||
@target.subscribe(:ack => true) do |header, message|
|
||||
|
|
|
@ -5,12 +5,15 @@ require "mq" # rubygem 'amqp'
|
|||
require "cgi"
|
||||
|
||||
class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
||||
MQTYPES = [ "fanout", "queue", "topic" ]
|
||||
MQTYPES = [ "fanout", "direct", "topic" ]
|
||||
|
||||
public
|
||||
def initialize(url, config={}, &block)
|
||||
super
|
||||
|
||||
@mq = nil
|
||||
@bulk_prefix = nil
|
||||
|
||||
# Handle path /<vhost>/<type>/<name> or /<type>/<name>
|
||||
# vhost allowed to contain slashes
|
||||
if @url.path =~ %r{^/((.*)/)?([^/]+)/([^/]+)}
|
||||
|
@ -36,25 +39,35 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
|
|||
amqpsettings[:user] = @url.user if @url.user
|
||||
amqpsettings[:pass] = @url.password if @url.password
|
||||
amqpsettings[:logging] = query_args.include? "debug"
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} queue #{@name.inspect}")
|
||||
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@mqtype.inspect} exchange #{@name.inspect}")
|
||||
@amqp = AMQP.connect(amqpsettings)
|
||||
@mq = MQ.new(@amqp)
|
||||
@target = nil
|
||||
|
||||
if @urlopts.include? "es_index" and @urlopts.include? "es_type"
|
||||
@bulk_prefix = { "index" => { "_index" => @urlopts["es_index"], "_type" => @urlopts["es_type"] } }.to_json + "\n"
|
||||
@logger.debug "Preparing ElasticSearch bulk API header for injection: #{@bulk_prefix.inspect}"
|
||||
end
|
||||
|
||||
@durable = @urlopts["durable"] ? true : false
|
||||
case @mqtype
|
||||
when "fanout"
|
||||
@target = @mq.fanout(@name)
|
||||
when "queue"
|
||||
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
|
||||
@target = @mq.fanout(@name, :durable => @durable)
|
||||
when "direct"
|
||||
@target = @mq.direct(@name, :durable => @durable)
|
||||
when "topic"
|
||||
@target = @mq.topic(@name)
|
||||
@target = @mq.topic(@name, :durable => @durable)
|
||||
end # case @mqtype
|
||||
end # def register
|
||||
|
||||
public
|
||||
def receive(event)
|
||||
@logger.debug(["Sending event", { :url => @url, :event => event }])
|
||||
@target.publish(event.to_json)
|
||||
if @bulk_prefix
|
||||
@target.publish(@bulk_prefix + event.to_json + "\n")
|
||||
else
|
||||
@target.publish(event.to_json)
|
||||
end
|
||||
end # def receive
|
||||
|
||||
# This is used by the ElasticSearch AMQP/River output.
|
||||
|
|
|
@ -2,6 +2,7 @@ require "em-http-request"
|
|||
require "logstash/namespace"
|
||||
require "logstash/outputs/amqp"
|
||||
require "logstash/outputs/base"
|
||||
require "cgi"
|
||||
|
||||
class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
||||
public
|
||||
|
@ -63,34 +64,50 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
|
||||
public
|
||||
def ready(params)
|
||||
case params["method"]
|
||||
method = params.delete("method")
|
||||
case method
|
||||
when "http"
|
||||
@logger.debug "ElasticSearch using http with URL #{@url.to_s}"
|
||||
@http = EventMachine::HttpRequest.new(@url.to_s)
|
||||
@callback = self.method(:receive_http)
|
||||
when "river"
|
||||
params["port"] ||= 5672
|
||||
auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}"
|
||||
mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1")
|
||||
river_type = params.delete("type") || "rabbitmq"
|
||||
amqp_host = params.delete("host") || 'localhost'
|
||||
amqp_port = params.delete("port") || 5672
|
||||
amqp_exchange_type = params.delete("exchange_type") || "direct"
|
||||
amqp_queue_name = params.delete("queue") || "es"
|
||||
amqp_exchange_name = params.delete("exchange") || amqp_queue_name
|
||||
amqp_exchange_durable = (params["durable"] || "false") =~ /^[ty1]/
|
||||
amqp_user = params.delete("user") or "guest"
|
||||
amqp_pass = params.delete("pass") or "guest"
|
||||
amqp_vhost = params.delete("vhost") || "/"
|
||||
vhost_str = (amqp_vhost == "/") ? "" : "/#{amqp_vhost}"
|
||||
qs = params.map {|k,v| "#{CGI.escape(k)}=#{CGI.escape(v)}"}.join("&")
|
||||
mq_url = URI::parse("amqp://#{amqp_user}:#{amqp_pass}@#{amqp_host}:#{amqp_port}#{vhost_str}/#{amqp_exchange_type}/#{amqp_exchange_name}?#{qs}")
|
||||
@mq = LogStash::Outputs::Amqp.new(mq_url.to_s)
|
||||
@mq.register
|
||||
@callback = self.method(:receive_river)
|
||||
em_url = URI.parse("http://#{@url.host}:#{@url.port}/_river/logstash#{@url.path.tr("/", "_")}/_meta")
|
||||
unused, @es_index, @es_type = @url.path.split("/", 3)
|
||||
|
||||
river_config = {"type" => params["type"],
|
||||
params["type"] => {"host" => params["host"],
|
||||
"user" => params["user"],
|
||||
"port" => params["port"],
|
||||
"pass" => params["pass"],
|
||||
"vhost" => params["vhost"],
|
||||
"queue" => params["queue"],
|
||||
"exchange" => params["queue"],
|
||||
},
|
||||
"index" => {"bulk_size" => 100,
|
||||
"bulk_timeout" => "10ms",
|
||||
},
|
||||
}
|
||||
river_config = {
|
||||
"type" => river_type,
|
||||
river_type => {
|
||||
"host" => amqp_host,
|
||||
"user" => amqp_user,
|
||||
"port" => amqp_port,
|
||||
"pass" => amqp_pass,
|
||||
"vhost" => amqp_vhost,
|
||||
"queue" => amqp_queue_name,
|
||||
"exchange" => amqp_exchange_name,
|
||||
"exchange_durable" => amqp_exchange_durable ? "true" : "false",
|
||||
"exchange_type" => amqp_exchange_type,
|
||||
},
|
||||
"index" => {
|
||||
"bulk_size" => 100,
|
||||
"bulk_timeout" => "10ms",
|
||||
},
|
||||
}
|
||||
@logger.debug(["ElasticSearch using river", river_config])
|
||||
http_setup = EventMachine::HttpRequest.new(em_url.to_s)
|
||||
req = http_setup.put :body => river_config.to_json
|
||||
|
@ -98,7 +115,7 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
|||
@logger.warn "Error setting up river: #{req.response}"
|
||||
end
|
||||
@callback = self.method(:receive_river)
|
||||
else raise "unknown elasticsearch method #{params["method"].inspect}"
|
||||
else raise "unknown elasticsearch method #{method.inspect}"
|
||||
end
|
||||
|
||||
#receive(LogStash::Event.new({
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue