diff --git a/lib/logstash/inputs/amqp.rb b/lib/logstash/inputs/amqp.rb index aebfc2031..fde344217 100644 --- a/lib/logstash/inputs/amqp.rb +++ b/lib/logstash/inputs/amqp.rb @@ -27,7 +27,13 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base public def register @logger.info("Registering input #{@url}") - @amqp = AMQP.connect(:host => @url.host) + amqpsettings = { + :host => @url.host, + :port => (@url.port or 5672), + } + amqpsettings[:user] = @url.user if @url.user + amqpsettings[:pass] = @url.password if @url.password + @amqp = AMQP.connect(amqpsettings) @mq = MQ.new(@amqp) @target = nil diff --git a/lib/logstash/outputs/amqp.rb b/lib/logstash/outputs/amqp.rb index 237dcca51..282a2b4b0 100644 --- a/lib/logstash/outputs/amqp.rb +++ b/lib/logstash/outputs/amqp.rb @@ -24,7 +24,13 @@ class LogStash::Outputs::Amqp < LogStash::Outputs::Base public def register @logger.info("Registering output #{@url}") - @amqp = AMQP.connect(:host => @url.host, :port => (@url.port or 5672)) + amqpsettings = { + :host => @url.host, + :port => (@url.port or 5672), + } + amqpsettings[:user] = @url.user if @url.user + amqpsettings[:pass] = @url.password if @url.password + @amqp = AMQP.connect(amqpsettings) @mq = MQ.new(@amqp) @target = nil diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 1dd61f40f..3132bfa2f 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -62,7 +62,8 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base @callback = self.method(:receive_http) when "river" params["port"] ||= 5672 - mq_url = URI::parse("amqp://#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1") + auth = "#{params["user"] or "guest"}:#{params["pass"] or "guest"}" + mq_url = URI::parse("amqp://#{auth}@#{params["host"]}:#{params["port"]}/queue/#{params["queue"]}?durable=1") @mq = LogStash::Outputs::Amqp.new(mq_url.to_s) @mq.register @callback = self.method(:receive_river)