mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- rename 'id' -> 'document_id'
- add document_id support to all elasticsearch outputs
This commit is contained in:
parent
17e1d38a12
commit
c5801a7aa9
3 changed files with 29 additions and 19 deletions
|
@ -37,8 +37,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
# similar events to the same 'type'. String expansion '%{foo}' works here.
|
||||
config :index_type, :validate => :string, :default => "%{@type}"
|
||||
|
||||
# The document ID for the index. Overwrites any existing entry in elasticsearch with the same ID.
|
||||
config :id, :validate => :string, :default => nil
|
||||
# The document ID for the index. Useful for overwriting existing entries in
|
||||
# elasticsearch with the same ID.
|
||||
config :document_id, :validate => :string, :default => nil
|
||||
|
||||
# The name of your cluster if you set it on the ElasticSearch side. Useful
|
||||
# for discovery.
|
||||
|
@ -163,11 +164,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
|||
end
|
||||
end
|
||||
|
||||
if id.nil?
|
||||
req = @client.index(index, type, event.to_hash)
|
||||
if @document_id.nil?
|
||||
req = @client.index(index, type, event.to_hash)
|
||||
else
|
||||
id = event.sprintf(@id)
|
||||
req = @client.index(index, type, id, event.to_hash)
|
||||
id = event.sprintf(@document_id)
|
||||
req = @client.index(index, type, id, event.to_hash)
|
||||
end
|
||||
|
||||
increment_inflight_request_count
|
||||
|
|
|
@ -40,6 +40,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
|
|||
# be used.
|
||||
config :flush_size, :validate => :number, :default => 100
|
||||
|
||||
# The document ID for the index. Useful for overwriting existing entries in
|
||||
# elasticsearch with the same ID.
|
||||
config :document_id, :validate => :string, :default => nil
|
||||
|
||||
public
|
||||
def register
|
||||
require "ftw" # gem ftw
|
||||
|
@ -84,9 +88,12 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
|
|||
end # def receive_single
|
||||
|
||||
def receive_bulk(event, index, type)
|
||||
header = { "index" => { "_index" => index, "_type" => type } }
|
||||
if @document_id.nil?
|
||||
header["index"]["_id"] = event.sprintf(@document_id)
|
||||
end
|
||||
@queue << [
|
||||
{ "index" => { "_index" => index, "_type" => type } }.to_json,
|
||||
event.to_json
|
||||
header.to_json, event.to_json
|
||||
].join("\n")
|
||||
|
||||
# Keep trying to flush while the queue is full.
|
||||
|
@ -98,6 +105,10 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
|
|||
puts "Flushing #{@queue.count} events"
|
||||
# If we don't tack a trailing newline at the end, elasticsearch
|
||||
# doesn't seem to process the last event in this bulk index call.
|
||||
#
|
||||
# as documented here:
|
||||
# http://www.elasticsearch.org/guide/reference/api/bulk.html
|
||||
# "NOTE: the final line of data must end with a newline character \n."
|
||||
response = @agent.post!("http://#{@host}:#{@port}/_bulk",
|
||||
:body => @queue.join("\n") + "\n")
|
||||
|
||||
|
|
|
@ -80,6 +80,10 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
|
|||
# AMQP persistence setting
|
||||
config :persistent, :validate => :boolean, :default => true
|
||||
|
||||
# The document ID for the index. Useful for overwriting existing entries in
|
||||
# elasticsearch with the same ID.
|
||||
config :document_id, :validate => :string, :default => nil
|
||||
|
||||
public
|
||||
def register
|
||||
|
||||
|
@ -199,20 +203,14 @@ class LogStash::Outputs::ElasticSearchRiver < LogStash::Outputs::Base
|
|||
public
|
||||
def receive(event)
|
||||
return unless output?(event)
|
||||
|
||||
# TODO(sissel): Refactor this to not use so much string concatonation.
|
||||
|
||||
# River events have a format of
|
||||
# "action\ndata\n"
|
||||
# where 'action' is index or delete, data is the data to index.
|
||||
index_message = {
|
||||
"index" => {
|
||||
"_index" => event.sprintf(@index),
|
||||
"_type" => event.sprintf(@index_type)
|
||||
}
|
||||
}.to_json + "\n"
|
||||
header = { "index" => { "_index" => index, "_type" => type } }
|
||||
if @document_id.nil?
|
||||
header["index"]["_id"] = event.sprintf(@document_id)
|
||||
end
|
||||
|
||||
index_message += event.to_json + "\n"
|
||||
@mq.receive_raw(index_message)
|
||||
@mq.receive_raw(header.to_json + "\n" + event.to_json + "\n")
|
||||
end # def receive
|
||||
end # LogStash::Outputs::ElasticSearchRiver
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue