diff --git a/CHANGELOG b/CHANGELOG index 55ac0dba3..ed19b544b 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -23,8 +23,8 @@ - new: anonymize: supports many hash mechanisms (murmur3, sha, md5, etc) as well as IP address anonymization (#280, #261; patches by Richard Pijnenburg and Avishai Ish-Shalom) - - filter: date: now accepts 'match' as a setting. Use of this is preferable - to the old syntax. + - feature: date: now accepts 'match' as a setting. Use of this is preferable + to the old syntax. (#248, LOGSTASH-734, Patch by Louis Zuckerman) - improvement: grok: now accepts (?...) named captures. This lets you compose a pattern in the grok config without needing to define it in a patterns file. Example: (?%{HOST}:%{POSINT}) to capture 'hostport' diff --git a/lib/logstash/filters/mutate.rb b/lib/logstash/filters/mutate.rb index 3c53551a7..fcf4f362c 100644 --- a/lib/logstash/filters/mutate.rb +++ b/lib/logstash/filters/mutate.rb @@ -207,12 +207,14 @@ class LogStash::Filters::Mutate < LogStash::Filters::Base def replace(event) # TODO(sissel): use event.sprintf on the field names? @replace.each do |field, newvalue| + next unless event.include?(field) event[field] = event.sprintf(newvalue) end end # def replace def convert(event) @convert.each do |field, type| + next unless event.include?(field) original = event[field] # calls convert_{string,integer,float} depending on type requested. diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index f4d7deaf3..34ac6945b 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -37,8 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # similar events to the same 'type'. String expansion '%{foo}' works here. config :index_type, :validate => :string, :default => "%{@type}" - # The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID. - config :id, :validate => :string, :default => nil + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil # The name of your cluster if you set it on the ElasticSearch side. Useful # for discovery. @@ -163,11 +164,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base end end - if id.nil? - req = @client.index(index, type, event.to_hash) + if @document_id.nil? + req = @client.index(index, type, event.to_hash) else - id = event.sprintf(@id) - req = @client.index(index, type, id, event.to_hash) + id = event.sprintf(@document_id) + req = @client.index(index, type, id, event.to_hash) end increment_inflight_request_count diff --git a/lib/logstash/outputs/elasticsearch_http.rb b/lib/logstash/outputs/elasticsearch_http.rb index 8a5e9ce8d..cf493a64c 100644 --- a/lib/logstash/outputs/elasticsearch_http.rb +++ b/lib/logstash/outputs/elasticsearch_http.rb @@ -40,6 +40,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base # be used. config :flush_size, :validate => :number, :default => 100 + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil + public def register require "ftw" # gem ftw @@ -84,9 +88,12 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base end # def receive_single def receive_bulk(event, index, type) + header = { "index" => { "_index" => index, "_type" => type } } + if @document_id.nil? + header["index"]["_id"] = event.sprintf(@document_id) + end @queue << [ - { "index" => { "_index" => index, "_type" => type } }.to_json, - event.to_json + header.to_json, event.to_json ].join("\n") # Keep trying to flush while the queue is full. @@ -98,6 +105,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base puts "Flushing #{@queue.count} events" # If we don't tack a trailing newline at the end, elasticsearch # doesn't seem to process the last event in this bulk index call. + # + # as documented here: + # http://www.elasticsearch.org/guide/reference/api/bulk.html + # "NOTE: the final line of data must end with a newline character \n." response = @agent.post!("http://#{@host}:#{@port}/_bulk", :body => @queue.join("\n") + "\n") diff --git a/lib/logstash/outputs/elasticsearch_river.rb b/lib/logstash/outputs/elasticsearch_river.rb index 18075c7f5..e464b0b56 100644 --- a/lib/logstash/outputs/elasticsearch_river.rb +++ b/lib/logstash/outputs/elasticsearch_river.rb @@ -80,6 +80,10 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base # AMQP persistence setting config :persistent, :validate => :boolean, :default => true + # The document ID for the index. Useful for overwriting existing entries in + # elasticsearch with the same ID. + config :document_id, :validate => :string, :default => nil + public def register @@ -199,20 +203,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base public def receive(event) return unless output?(event) - - # TODO(sissel): Refactor this to not use so much string concatonation. - # River events have a format of # "action\ndata\n" # where 'action' is index or delete, data is the data to index. - index_message = { - "index" => { - "_index" => event.sprintf(@index), - "_type" => event.sprintf(@index_type) - } - }.to_json + "\n" + header = { "index" => { "_index" => index, "_type" => type } } + if @document_id.nil? + header["index"]["_id"] = event.sprintf(@document_id) + end - index_message += event.to_json + "\n" - @mq.receive_raw(index_message) + @mq.receive_raw(header.to_json + "\n" + event.to_json + "\n") end # def receive end # LogStash::Outputs::ElasticSearchRiver