From 17e1d38a12fa331c4b11e9dcf0e9f8cd53594763 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Thu, 27 Dec 2012 16:50:19 -0800 Subject: [PATCH 1/4] - translate #240 by hand to a patch. Credit to Py Rho for the original implemenation. --- lib/logstash/filters/mutate.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/filters/mutate.rb b/lib/logstash/filters/mutate.rb index 3c53551a7..fcf4f362c 100644 --- a/lib/logstash/filters/mutate.rb +++ b/lib/logstash/filters/mutate.rb @@ -207,12 +207,14 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base def replace(event) # TODO(sissel): use event.sprintf on the field names? @replace.each do |field, newvalue| + next unless event.include?(field) event[field] = event.sprintf(newvalue) end end # def replace def convert(event) @convert.each do |field, type| + next unless event.include?(field) original = event[field] # calls convert_{string,integer,float} depending on type requested. From c5801a7aa9fa7279b30fdc548829f8289d95c173 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Thu, 27 Dec 2012 17:21:45 -0800 Subject: [PATCH 2/4] - rename 'id' -> 'document_id' - add document_id support to all elasticsearch outputs --- lib/logstash/outputs/elasticsearch.rb | 13 +++++++------ lib/logstash/outputs/elasticsearch_http.rb | 15 +++++++++++++-- lib/logstash/outputs/elasticsearch_river.rb | 20 +++++++++----------- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index f4d7deaf3..34ac6945b 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -37,8 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # similar events to the same 'type'. String expansion '%{foo}' works here. config :index_type, :validate => :string, :default => "%{@type}" - # The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID. - config :id, :validate => :string, :default => nil + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil # The name of your cluster if you set it on the ElasticSearch side. Useful # for discovery. @@ -163,11 +164,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base end end - if id.nil? - req = @client.index(index, type, event.to_hash) + if @document_id.nil? + req = @client.index(index, type, event.to_hash) else - id = event.sprintf(@id) - req = @client.index(index, type, id, event.to_hash) + id = event.sprintf(@document_id) + req = @client.index(index, type, id, event.to_hash) end increment_inflight_request_count diff --git a/lib/logstash/outputs/elasticsearch_http.rb b/lib/logstash/outputs/elasticsearch_http.rb index 8a5e9ce8d..cf493a64c 100644 --- a/lib/logstash/outputs/elasticsearch_http.rb +++ b/lib/logstash/outputs/elasticsearch_http.rb @@ -40,6 +40,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base # be used. config :flush_size, :validate => :number, :default => 100 + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil + public def register require "ftw" # gem ftw @@ -84,9 +88,12 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base end # def receive_single def receive_bulk(event, index, type) + header = { "index" => { "_index" => index, "_type" => type } } + if @document_id.nil? + header["index"]["_id"] = event.sprintf(@document_id) + end @queue << [ - { "index" => { "_index" => index, "_type" => type } }.to_json, - event.to_json + header.to_json, event.to_json ].join("\n") # Keep trying to flush while the queue is full. @@ -98,6 +105,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base puts "Flushing #{@queue.count} events" # If we don't tack a trailing newline at the end, elasticsearch # doesn't seem to process the last event in this bulk index call. + # + # as documented here: + # http://www.elasticsearch.org/guide/reference/api/bulk.html + # "NOTE: the final line of data must end with a newline character \n." response = @agent.post!("http://#{@host}:#{@port}/_bulk", :body => @queue.join("\n") + "\n") diff --git a/lib/logstash/outputs/elasticsearch_river.rb b/lib/logstash/outputs/elasticsearch_river.rb index 18075c7f5..e464b0b56 100644 --- a/lib/logstash/outputs/elasticsearch_river.rb +++ b/lib/logstash/outputs/elasticsearch_river.rb @@ -80,6 +80,10 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base # AMQP persistence setting config :persistent, :validate => :boolean, :default => true + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil + public def register @@ -199,20 +203,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base public def receive(event) return unless output?(event) - - # TODO(sissel): Refactor this to not use so much string concatonation. - # River events have a format of # "action\ndata\n" # where 'action' is index or delete, data is the data to index. - index_message = { - "index" => { - "_index" => event.sprintf(@index), - "_type" => event.sprintf(@index_type) - } - }.to_json + "\n" + header = { "index" => { "_index" => index, "_type" => type } } + if @document_id.nil? + header["index"]["_id"] = event.sprintf(@document_id) + end - index_message += event.to_json + "\n" - @mq.receive_raw(index_message) + @mq.receive_raw(header.to_json + "\n" + event.to_json + "\n") end # def receive end # LogStash::Outputs::ElasticSearchRiver From 5388d90e94bc8a03b8a233c5beeaac00a4be0767 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Thu, 27 Dec 2012 22:12:39 -0800 Subject: [PATCH 3/4] - fis term --- CHANGELOG | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index 55ac0dba3..ae5c709b6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -23,7 +23,7 @@ - new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as well as IP address anonymization (#280, #261; patches by Richard Pijnenburg and Avishai Ish-Shalom) - - filter: date: now accepts 'match' as a setting. Use of this is preferable + - feature: date: now accepts 'match' as a setting. Use of this is preferable to the old syntax. - improvement: grok: now accepts (?...) named captures. This lets you compose a pattern in the grok config without needing to define it in a From 30e4c04d7201363125e4d142e31bb0ef325c8909 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Thu, 27 Dec 2012 22:13:34 -0800 Subject: [PATCH 4/4] attribution --- CHANGELOG | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG b/CHANGELOG index ae5c709b6..ed19b544b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -24,7 +24,7 @@ well as IP address anonymization (#280, #261; patches by Richard Pijnenburg and Avishai Ish-Shalom) - feature: date: now accepts 'match' as a setting. Use of this is preferable - to the old syntax. + to the old syntax. (#248, LOGSTASH-734, Patch by Louis Zuckerman) - improvement: grok: now accepts (?...) named captures. This lets you compose a pattern in the grok config without needing to define it in a patterns file. Example: (?%{HOST}:%{POSINT}) to capture 'hostport'