- Add port support to amqp (and elasticsearch river)

This commit is contained in:
Jordan Sissel 2011-01-05 11:58:49 -08:00
parent 2ec791cda0
commit 80be042c24
2 changed files with 9 additions and 4 deletions

View file

@ -20,7 +20,7 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base
def register
@logger.info("Registering output #{@url}")
@amqp = AMQP.connect(:host => @url.host)
@amqp = AMQP.connect(:host => @url.host, :port => (@url.port or 5672))
@mq = MQ.new(@amqp)
@target = nil

View file

@ -21,7 +21,8 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
@http = EventMachine::HttpRequest.new(@httpurl.to_s)
@callback = self.method(:receive_http)
when "river"
mq_url = URI::parse("amqp://#{params["host"]}/queue/#{params["queue"]}?durable=1")
params["port"] ||= 5672
mq_url = URI::parse("amqp://#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1")
@mq = LogStash::Outputs::Amqp.new(mq_url.to_s)
@mq.register
@callback = self.method(:receive_river)
@ -31,6 +32,7 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
river_config = {"type" => params["type"],
params["type"] => {"host" => params["host"],
"user" => params["user"],
"port" => params["port"],
"pass" => params["pass"],
"vhost" => params["vhost"],
"queue" => params["queue"],
@ -54,10 +56,13 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
@callback.call(event)
end # def receive
def receive_http(event)
def receive_http(event, tries=5)
req = @http.post :body => event.to_json
req.errback do
$stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}"
$stderr.puts "Request to index to #{@httpurl.to_s} failed (will retry, #{tries} tries left). Event was #{event.to_s}"
EventMachine::add_timer(2) do
receive_http(event, tries - 1)
end
end
end # def receive_http