mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
add support for streaming logs to elasticsearch via a "rabbitmq river".
we have to PUT a json config to the elasticsearch HTTP API to tell it where to look, then we use our amqp output to send bulk-data style index messages to elasticsearch. The ES side does bulk-queueing for us.
This commit is contained in:
parent
b250a0218c
commit
ec2ae60d83
2 changed files with 90 additions and 2 deletions
41
etc/logstash-elasticsearch-rabbitmq-river.yaml
Normal file
41
etc/logstash-elasticsearch-rabbitmq-river.yaml
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
---
|
||||||
|
# this is a sample logstash config (code is still highly in change, so
|
||||||
|
# this could change later)
|
||||||
|
#
|
||||||
|
#
|
||||||
|
inputs:
|
||||||
|
# Give a list of inputs. Tag them for easy query/filter later.
|
||||||
|
linux-syslog: # this is the 'linux-syslog' type
|
||||||
|
- /var/log/messages # watch /var/log/messages (uses eventmachine-tail)
|
||||||
|
- /var/log/kern.log
|
||||||
|
- /var/log/auth.log
|
||||||
|
- /var/log/user.log
|
||||||
|
apache-access: # similar, different type.
|
||||||
|
- /var/log/apache2/access.log
|
||||||
|
- /b/access
|
||||||
|
apache-error:
|
||||||
|
- /var/log/apache2/error.log
|
||||||
|
filters:
|
||||||
|
- grok:
|
||||||
|
linux-syslog: # for logs of type 'linux-syslog'
|
||||||
|
patterns:
|
||||||
|
- %{SYSLOGLINE}
|
||||||
|
apache-access: # for logs of type 'apache-error'
|
||||||
|
patterns:
|
||||||
|
- %{COMBINEDAPACHELOG}
|
||||||
|
- date:
|
||||||
|
linux-syslog: # for logs of type 'linux-syslog'
|
||||||
|
# Look for a field 'timestamp' with this format, parse and it for the timestamp
|
||||||
|
# This field comes from the SYSLOGLINE pattern
|
||||||
|
timestamp: "%b %e %H:%M:%S"
|
||||||
|
apache-access:
|
||||||
|
timestamp: "%d/%b/%Y:%H:%M:%S %Z"
|
||||||
|
outputs:
|
||||||
|
- stdout:///
|
||||||
|
- "elasticsearch://localhost:9200/logs/all?method=river&type=rabbitmq&host=127.0.0.1&user=guest&pass=guest&vhost=/&queue=es"
|
||||||
|
# But we could write to mongodb, too.
|
||||||
|
# - mongodb://localhost/parsedlogs
|
||||||
|
# And also write to an AMQP topic
|
||||||
|
# - amqp://localhost/topic/parsedlogs
|
||||||
|
# Write to stdout ... etc.
|
||||||
|
# - stdout:///
|
|
@ -1,4 +1,5 @@
|
||||||
require "logstash/outputs/base"
|
require "logstash/outputs/base"
|
||||||
|
require "logstash/outputs/amqp"
|
||||||
require "em-http-request"
|
require "em-http-request"
|
||||||
|
|
||||||
class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
||||||
|
@ -11,13 +12,59 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base
|
||||||
# Authentication?
|
# Authentication?
|
||||||
@httpurl = @url.clone
|
@httpurl = @url.clone
|
||||||
@httpurl.scheme = "http"
|
@httpurl.scheme = "http"
|
||||||
@http = EventMachine::HttpRequest.new(@httpurl.to_s)
|
defaults = {"method" => "http"}
|
||||||
|
params = defaults.merge(@urlopts)
|
||||||
|
|
||||||
|
case params["method"]
|
||||||
|
when "http"
|
||||||
|
@logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}"
|
||||||
|
@http = EventMachine::HttpRequest.new(@httpurl.to_s)
|
||||||
|
@callback = self.method(:receive_http)
|
||||||
|
when "river"
|
||||||
|
mq_url = URI::parse("amqp://#{params["host"]}/queue/#{params["queue"]}?durable=1")
|
||||||
|
@mq = LogStash::Outputs::Amqp.new(mq_url.to_s)
|
||||||
|
@mq.register
|
||||||
|
@callback = self.method(:receive_river)
|
||||||
|
em_url = URI.parse("http://#{@httpurl.host}:#{@httpurl.port}/_river/logstash#{@httpurl.path.tr("/", "_")}/_meta")
|
||||||
|
unused, @es_index, @es_type = @httpurl.path.split("/", 3)
|
||||||
|
|
||||||
|
river_config = {"type" => params["type"],
|
||||||
|
params["type"] => {"host" => params["host"],
|
||||||
|
"user" => params["user"],
|
||||||
|
"pass" => params["pass"],
|
||||||
|
"vhost" => params["vhost"],
|
||||||
|
"queue" => params["queue"],
|
||||||
|
"exchange" => params["queue"],
|
||||||
|
},
|
||||||
|
"index" => {"bulk_size" => 100,
|
||||||
|
"bulk_timeout" => "10ms",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
@logger.debug "ElasticSearch using river with config #{river_config.awesome_inspect}"
|
||||||
|
http_setup = EventMachine::HttpRequest.new(em_url.to_s)
|
||||||
|
req = http_setup.put :body => river_config.to_json
|
||||||
|
req.errback do
|
||||||
|
@logger.warn "Error setting up river: #{req.response}"
|
||||||
|
end
|
||||||
|
else raise "unknown elasticsearch method #{params["method"].inspect}"
|
||||||
|
end
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
def receive(event)
|
def receive(event)
|
||||||
|
@callback.call(event)
|
||||||
|
end # def receive
|
||||||
|
|
||||||
|
def receive_http(event)
|
||||||
req = @http.post :body => event.to_json
|
req = @http.post :body => event.to_json
|
||||||
req.errback do
|
req.errback do
|
||||||
$stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}"
|
$stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}"
|
||||||
end
|
end
|
||||||
end # def event
|
end # def receive_http
|
||||||
|
|
||||||
|
def receive_river(event)
|
||||||
|
# bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/
|
||||||
|
index_message = {"index" => {"_index" => @es_index, "_type" => @es_type}}.to_json + "\n"
|
||||||
|
index_message += {@es_type => event.to_hash}.to_json + "\n"
|
||||||
|
@mq.receive_raw(index_message)
|
||||||
|
end # def receive_river
|
||||||
end # class LogStash::Outputs::Websocket
|
end # class LogStash::Outputs::Websocket
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue