From ec2ae60d8347f3c4aed3666f9bb75b6efd7a99ad Mon Sep 17 00:00:00 2001 From: Pete Fritchman Date: Mon, 1 Nov 2010 01:46:20 +0000 Subject: [PATCH] add support for streaming logs to elasticsearch via a "rabbitmq river". we have to PUT a json config to the elasticsearch HTTP API to tell it where to look, then we use our amqp output to send bulk-data style index messages to elasticsearch. The ES side does bulk-queueing for us. --- ...logstash-elasticsearch-rabbitmq-river.yaml | 41 +++++++++++++++ lib/logstash/outputs/elasticsearch.rb | 51 ++++++++++++++++++- 2 files changed, 90 insertions(+), 2 deletions(-) create mode 100644 etc/logstash-elasticsearch-rabbitmq-river.yaml diff --git a/etc/logstash-elasticsearch-rabbitmq-river.yaml b/etc/logstash-elasticsearch-rabbitmq-river.yaml new file mode 100644 index 000000000..24c1758de --- /dev/null +++ b/etc/logstash-elasticsearch-rabbitmq-river.yaml @@ -0,0 +1,41 @@ +--- +# this is a sample logstash config (code is still highly in change, so +# this could change later) +# +# +inputs: +# Give a list of inputs. Tag them for easy query/filter later. + linux-syslog: # this is the 'linux-syslog' type + - /var/log/messages # watch /var/log/messages (uses eventmachine-tail) + - /var/log/kern.log + - /var/log/auth.log + - /var/log/user.log + apache-access: # similar, different type. + - /var/log/apache2/access.log + - /b/access + apache-error: + - /var/log/apache2/error.log +filters: +- grok: + linux-syslog: # for logs of type 'linux-syslog' + patterns: + - %{SYSLOGLINE} + apache-access: # for logs of type 'apache-error' + patterns: + - %{COMBINEDAPACHELOG} +- date: + linux-syslog: # for logs of type 'linux-syslog' + # Look for a field 'timestamp' with this format, parse and it for the timestamp + # This field comes from the SYSLOGLINE pattern + timestamp: "%b %e %H:%M:%S" + apache-access: + timestamp: "%d/%b/%Y:%H:%M:%S %Z" +outputs: +- stdout:/// +- "elasticsearch://localhost:9200/logs/all?method=river&type=rabbitmq&host=127.0.0.1&user=guest&pass=guest&vhost=/&queue=es" +# But we could write to mongodb, too. +# - mongodb://localhost/parsedlogs +# And also write to an AMQP topic +# - amqp://localhost/topic/parsedlogs +# Write to stdout ... etc. +# - stdout:/// diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index c2fa972c9..0b07b0206 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -1,4 +1,5 @@ require "logstash/outputs/base" +require "logstash/outputs/amqp" require "em-http-request" class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base @@ -11,13 +12,59 @@ class LogStash::Outputs::Elasticsearch < LogStash::Outputs::Base # Authentication? @httpurl = @url.clone @httpurl.scheme = "http" - @http = EventMachine::HttpRequest.new(@httpurl.to_s) + defaults = {"method" => "http"} + params = defaults.merge(@urlopts) + + case params["method"] + when "http" + @logger.debug "ElasticSearch using http with URL #{@httpurl.to_s}" + @http = EventMachine::HttpRequest.new(@httpurl.to_s) + @callback = self.method(:receive_http) + when "river" + mq_url = URI::parse("amqp://#{params["host"]}/queue/#{params["queue"]}?durable=1") + @mq = LogStash::Outputs::Amqp.new(mq_url.to_s) + @mq.register + @callback = self.method(:receive_river) + em_url = URI.parse("http://#{@httpurl.host}:#{@httpurl.port}/_river/logstash#{@httpurl.path.tr("/", "_")}/_meta") + unused, @es_index, @es_type = @httpurl.path.split("/", 3) + + river_config = {"type" => params["type"], + params["type"] => {"host" => params["host"], + "user" => params["user"], + "pass" => params["pass"], + "vhost" => params["vhost"], + "queue" => params["queue"], + "exchange" => params["queue"], + }, + "index" => {"bulk_size" => 100, + "bulk_timeout" => "10ms", + }, + } + @logger.debug "ElasticSearch using river with config #{river_config.awesome_inspect}" + http_setup = EventMachine::HttpRequest.new(em_url.to_s) + req = http_setup.put :body => river_config.to_json + req.errback do + @logger.warn "Error setting up river: #{req.response}" + end + else raise "unknown elasticsearch method #{params["method"].inspect}" + end end # def register def receive(event) + @callback.call(event) + end # def receive + + def receive_http(event) req = @http.post :body => event.to_json req.errback do $stderr.puts "Request to index to #{@httpurl.to_s} failed. Event was #{event.to_s}" end - end # def event + end # def receive_http + + def receive_river(event) + # bulk format; see http://www.elasticsearch.com/docs/elasticsearch/river/rabbitmq/ + index_message = {"index" => {"_index" => @es_index, "_type" => @es_type}}.to_json + "\n" + index_message += {@es_type => event.to_hash}.to_json + "\n" + @mq.receive_raw(index_message) + end # def receive_river end # class LogStash::Outputs::Websocket