Implement template management

Tests pass against ES 1.0.0:
  % bin/logstash rspec spec/outputs/elasticsearch.rb -e 'index template expected behavior' --fail-fast
  Finished in 27.57 seconds
  15 examples, 0 failures
This commit is contained in:
Jordan Sissel 2014-02-21 11:45:01 -08:00
parent 521cd1aeb7
commit 454688fb58
2 changed files with 34 additions and 12 deletions

View file

@ -249,12 +249,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
end end
# TODO(sissel): check if we can manage templates
#@logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s)
if @manage_template if @manage_template
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
# TODO(sissel): Manage the template @client.template_install(@template_name, get_template, @template_overwrite)
end # if @manage_templates end # if @manage_templates
buffer_initialize( buffer_initialize(
@ -265,7 +262,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
end # def register end # def register
public public
def get_template_json def get_template
if @template.nil? if @template.nil?
if __FILE__ =~ /^(jar:)?file:\/.+!.+/ if __FILE__ =~ /^(jar:)?file:\/.+!.+/
begin begin
@ -285,8 +282,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
end end
end end
end end
@template_json = IO.read(@template).gsub(/\n/,'') template_json = IO.read(@template).gsub(/\n/,'')
@logger.info("Using mapping template", :template => @template_json) @logger.info("Using mapping template", :template => template_json)
return JSON.parse(template_json)
end # def get_template end # def get_template
protected protected

View file

@ -18,11 +18,11 @@ module LogStash::Outputs::Elasticsearch
def template_install(name, template, force=false) def template_install(name, template, force=false)
if template_exists?(name, template) && !force if template_exists?(name) && !force
@logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
return return
end end
# TODO(sissel): ??? template_put(name, template)
end end
# Do a bulk request with the given actions. # Do a bulk request with the given actions.
@ -40,7 +40,7 @@ module LogStash::Outputs::Elasticsearch
#]) #])
end end
public(:initialize) public(:initialize, :template_install)
end end
class HTTPClient < Base class HTTPClient < Base
@ -56,7 +56,6 @@ module LogStash::Outputs::Elasticsearch
require "elasticsearch" # gem 'elasticsearch-ruby' require "elasticsearch" # gem 'elasticsearch-ruby'
@options = DEFAULT_OPTIONS.merge(options) @options = DEFAULT_OPTIONS.merge(options)
@client = client @client = client
end end
def build_client(options) def build_client(options)
@ -129,6 +128,17 @@ module LogStash::Outputs::Elasticsearch
end end
end # def bulk_ftw end # def bulk_ftw
def template_exists?(name)
@client.indices.get_template(:name => name)
return true
rescue Elasticsearch::Transport::Transport::Errors::NotFound
return false
end # def template_exists?
def template_put(name, template)
@client.indices.put_template(:name => name, :body => template)
end # template_put
public(:bulk) public(:bulk)
end # class HTTPClient end # class HTTPClient
@ -146,7 +156,7 @@ module LogStash::Outputs::Elasticsearch
setup(@options) setup(@options)
@client = client @client = client
end # def initialize end # def initialize
def settings def settings
return @settings return @settings
end end
@ -216,6 +226,20 @@ module LogStash::Outputs::Elasticsearch
return request return request
end # def build_request end # def build_request
def template_exists?(name)
request = org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequestBuilder.new(@client.admin.indices, name)
response = request.get
return !response.getIndexTemplates.isEmpty
end # def template_exists?
def template_put(name, template)
request = org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder.new(@client.admin.indices, name)
request.setSource(template.to_json)
# execute the request and get the response, if it fails, we'll get an exception.
request.get
end # template_put
public(:initialize, :bulk) public(:initialize, :bulk)
end # class NodeClient end # class NodeClient