Implement protocol => http in elasticsearch output

Also add `action` setting to allow logstash to signal document
deletions, updates, etc. The goal is to support any bulk request with
this output plugin, but the early target is simply to allow deletes.

This is one step in the path to having only one elasticsearch output
plugin and eventually removing elasticsearch_http.

The following tests pass
    % bin/logstash rspec spec/outputs/elasticsearch.rb
    ...
    Finished in 1 minute 33.94 seconds
    3 examples, 0 failures

Tested manually 'elasticsearch { protocol => http }' with success.

Still to be implemented:
* template management
* tests to cover all protocols: node, transport, and http
* mark elasticsearch_http as deprecated?
This commit is contained in:
Jordan Sissel 2014-02-07 18:38:22 -08:00
parent f7519373fc
commit 52a53a605c
3 changed files with 304 additions and 121 deletions

View file

@ -5,12 +5,11 @@ require "stud/buffer"
# This output lets you store logs in Elasticsearch and is the most recommended
# output for Logstash. If you plan on using the Kibana web interface, you'll
# need to use this output or the elasticsearch_http output.
# need to use this output.
#
# *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch
# %ELASTICSEARCH_VERSION%. If you use any other version of Elasticsearch,
# you should consider using the [elasticsearch_http](elasticsearch_http)
# output instead.
# you should set `protocol => http` in this plugin.
#
# If you want to set other Elasticsearch options that are not exposed directly
# as config options, there are two options:
@ -18,21 +17,21 @@ require "stud/buffer"
# * create an elasticsearch.yml file in the $PWD of the Logstash process
# * pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
#
# This plugin will join your Elasticsearch cluster, so it will show up in
# Elasticsearch's cluster health status.
# With the default `protocol` setting ("node"), this plugin will join your
# Elasticsearch cluster as a client node, so it will show up in Elasticsearch's
# cluster status.
#
# You can learn more about Elasticsearch at <http://www.elasticsearch.org>
#
# ## Operational Notes
#
# Template management is a new feature and requires at least version
# Elasticsearch 0.90.5+
# Template management requires at least version Elasticsearch 0.90.7> If you
# are still using a version older than this, please upgrade and receive
# more benefits than just template management!
#
# If you are still using a version older than this, please upgrade for
# more benefits than just template management.
#
# Your firewalls might need to permit port 9300 in *both* directions (from
# Logstash to Elasticsearch, and Elasticsearch to Logstash)
# If using the default `protocol` setting ("node"), your firewalls might need
# to permit port 9300 in *both* directions (from Logstash to Elasticsearch, and
# Elasticsearch to Logstash)
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
include Stud::Buffer
@ -88,9 +87,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# work in your environment.
config :host, :validate => :string
# The port for Elasticsearch transport to use. This is *not* the Elasticsearch
# REST API port (normally 9200).
config :port, :validate => :string, :default => "9300-9305"
# The port for Elasticsearch transport to use.
#
# If you do not set this, the following defaults are used:
# * `protocol => http` - port 9200
# * `protocol => transport` - port 9300-9305
# * `protocol => node` - port 9300-9305
config :port, :validate => :string
# The name/address of the host to bind to for Elasticsearch clustering
config :bind_host, :validate => :string
@ -125,7 +128,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# events before flushing that out to Elasticsearch. This setting
# controls how many events will be buffered before sending a batch
# of events.
config :flush_size, :validate => :number, :default => 100
config :flush_size, :validate => :number, :default => 5000
# The amount of time since last flush before a flush is forced.
#
@ -148,22 +151,87 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# not show up as a 'node' in the Elasticsearch cluster. This is useful
# in situations where you cannot permit connections outbound from the
# Elasticsearch cluster to this Logstash server.
config :protocol, :validate => [ "node", "transport" ], :default => "node"
#
# The default `protocol` setting under java/jruby is "node". The default
# `protocol` on non-java rubies is "http"
config :protocol, :validate => [ "node", "transport", "http" ]
# The Elasticsearch action to perform. Valid actions are: `index`, `delete`.
#
# Use of this setting *REQUIRES* you also configure the `document_id` setting
# because `delete` actions all require a document id.
#
# What does each action do?
#
# - index: indexes a document (an event from logstash).
# - delete: deletes a document by id
#
# For more details on actions, check out the [Elasticsearch bulk API documentation](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html)
config :action, :validate => :string, :default => "index"
public
def register
# TODO(sissel): find a better way of declaring where the Elasticsearch
# libraries are
# TODO(sissel): can skip this step if we're running from a jar.
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar")
Dir[jarpath].each do |jar|
require jar
client_settings = {}
client_settings["cluster.name"] = @cluster if @cluster
client_settings["network.host"] = @bind_host if @bind_host
client_settings["transport.tcp.port"] = @bind_port if @bind_port
client_settings["node.name"] = @node_name if @node_name
if @protocol.nil?
@protocol = (RUBY_PLATFORM == "java") ? "node" : "http"
end
# setup log4j properties for Elasticsearch
LogStash::Logger.setup_log4j(@logger)
if ["node", "transport"].include?(@protocol)
# Node or TransportClient; requires JRuby
if RUBY_PLATFORM != "java"
raise LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }"
end
require "logstash/loadlibs"
# setup log4j properties for Elasticsearch
LogStash::Logger.setup_log4j(@logger)
end
require "logstash/outputs/elasticsearch/protocol"
if @port.nil?
@port = case @protocol
when "http"; "9200"
when "transport", "node"; "9300-9305"
end
end
if @host.nil? && @protocol == "http"
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
@host = "localhost"
end
options = {
:host => @host,
:port => @port,
:client_settings => client_settings
}
client_class = case @protocol
when "transport"
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
when "node"
LogStash::Outputs::Elasticsearch::Protocols::NodeClient
when "http"
LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
end
@client = client_class.new(options)
@logger.info("New Elasticsearch output", :cluster => @cluster,
:host => @host, :port => @port, :embedded => @embedded,
:protocol => @protocol)
if @embedded
if RUBY_PLATFORM == "java"
raise LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}"
end
# Default @host with embedded to localhost. This should help avoid
# newbies tripping on ubuntu and other distros that have a default
# firewall that blocks multicast.
@ -172,85 +240,14 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Start Elasticsearch local.
start_local_elasticsearch
end
require "jruby-elasticsearch"
@logger.info("New Elasticsearch output", :cluster => @cluster,
:host => @host, :port => @port, :embedded => @embedded)
options = {
:cluster => @cluster,
:host => @host,
:port => @port,
:bind_host => @bind_host,
:node_name => @node_name,
}
# :node or :transport protocols
options[:type] = @protocol.to_sym
options[:bind_port] = @bind_port unless @bind_port.nil?
# TransportClient requires a number for a port.
options[:port] = options[:port].to_i if options[:type] == :transport
@client = ElasticSearch::Client.new(options)
# Check to see if we *can* get the template
java_client = @client.instance_eval{@client}
begin
check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name)
result = check_template.execute #=> Run previously...
rescue Exception => e
@logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s)
@manage_template = false
end
# TODO(sissel): check if we can manage templates
#@logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s)
if @manage_template
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
if @template_overwrite
@logger.info("Template overwrite enabled. Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s)
if !result.getIndexTemplates.isEmpty
delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name)
result = delete_template.execute
if result.isAcknowledged
@logger.info("Successfully deleted template", :template_name => @template_name)
else
@logger.error("Failed to delete template", :template_name => @template_name)
end
end
end # end if @template_overwrite
has_template = false
@logger.debug("Fetching all templates...")
gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*")
result = gettemplates.execute
# Results of this come as a list, so we need to iterate through it
if !result.getIndexTemplates.isEmpty
template_metadata_list = result.getIndexTemplates
templates = {}
i = 0
template_metadata_list.size.times do
template_data = template_metadata_list.get(i)
templates[template_data.name] = template_data.template
i += 1
end
template_idx_name = @index.sub(/%{[^}]+}/,'*')
alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name }
@logger.debug("No Logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
else
has_template = true
@logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s)
end
end
if !has_template #=> No template found, we're going to add one
get_template_json
put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json)
result = put_template.execute
if result.isAcknowledged
@logger.info("Successfully inserted template", :template_name => @template_name)
else
@logger.error("Failed to insert template", :template_name => @template_name)
end
end
# TODO(sissel): Manage the template
end # if @manage_templates
buffer_initialize(
@ -291,8 +288,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
# Disable 'local only' - LOGSTASH-277
#builder.local(true)
builder.settings.put("cluster.name", @cluster) if !@cluster.nil?
builder.settings.put("node.name", @node_name) if !@node_name.nil?
builder.settings.put("cluster.name", @cluster) if @cluster
builder.settings.put("node.name", @node_name) if @node_name
builder.settings.put("http.port", @embedded_http_port)
@embedded_elasticsearch = builder.node
@ -302,28 +299,22 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
public
def receive(event)
return unless output?(event)
buffer_receive([event, index, type])
end # def receive
def flush(events, teardown=false)
request = @client.bulk
events.each do |event, index, type|
index = event.sprintf(@index)
# Set the 'type' value for the index.
if @index_type.nil?
type = event["type"] || "logs"
else
type = event.sprintf(@index_type)
end
if @document_id
request.index(index, type, event.sprintf(@document_id), event.to_json)
else
request.index(index, type, nil, event.to_json)
end
# Set the 'type' value for the index.
if @index_type
type = event.sprintf(@index_type)
else
type = event["type"] || "logs"
end
request.execute!
index = event.sprintf(@index)
document_id = @document_id ? event.sprintf(@document_id) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash])
end # def receive
def flush(actions, teardown=false)
@client.bulk(actions)
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
# (aka partially fail), we need to figure out what documents need to be
# retried.

View file

@ -0,0 +1,193 @@
require "logstash/outputs/elasticsearch"
require "cabin"
module LogStash::Outputs::Elasticsearch
module Protocols
class Base
private
def initialize(options={})
# host(s), port, cluster
@logger = Cabin::Channel.get
end
def client
return @client if @client
@client = build_client(@options)
return @client
end # def client
def template_install(name, template, force=false)
if template_exists?(name, template) && !force
@logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
return
end
# TODO(sissel): ???
end
# Do a bulk request with the given actions.
#
# 'actions' is expected to be an array of bulk requests as string json
# values.
#
# Each 'action' becomes a single line in the bulk api call. For more
# details on the format of each.
def bulk(actions)
raise NotImplemented, "You must implement this yourself"
# bulk([
# '{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }',
# '{ "field1" : "value1" }'
#])
end
public(:initialize)
end
class HTTPClient < Base
private
DEFAULT_OPTIONS = {
:port => 9200
}
def initialize(options={})
super
require "elasticsearch" # gem 'elasticsearch-ruby'
@options = DEFAULT_OPTIONS.merge(options)
@client = client
end
def build_client(options)
client = Elasticsearch::Client.new(
:host => [options[:host], options[:port]].join(":")
)
return client
end
def bulk(actions)
@client.bulk(:body => actions.collect do |action, args, source|
if source
next [ { action => args }, source ]
else
next { action => args }
end
end.flatten)
end # def bulk
public(:bulk)
end
class NodeClient < Base
private
DEFAULT_OPTIONS = {
:port => 9300,
}
def initialize(options={})
super
require "java"
@options = DEFAULT_OPTIONS.merge(options)
setup(@options)
@client = client
end # def initialize
def settings
return @settings
end
def setup(options={})
@settings = org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder
if options[:host]
@settings.put("discovery.zen.ping.multicast.enabled", false)
@settings.put("discovery.zen.ping.unicast.hosts", hosts(options))
end
@settings.put("node.client", true)
@settings.put("http.enabled", false)
if options[:client_settings]
options[:client_settings].each do |key, value|
@settings.put(key, value)
end
end
return @settings
end
def hosts(options)
if options[:port].to_s =~ /^\d+-\d+$/
# port ranges are 'host[port1-port2]' according to
# http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
# However, it seems to only query the first port.
# So generate our own list of unicast hosts to scan.
range = Range.new(*options[:port].split("-"))
return range.collect { |p| "#{options[:host]}:#{p}" }.join(",")
else
return "#{options[:host]}:#{options[:port]}"
end
end # def hosts
def build_client(options)
nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder
return nodebuilder.settings(@settings).node.client
end # def build_client
def bulk(actions)
# Actions an array of [ action, action_metadata, source ]
prep = @client.prepareBulk
actions.each do |action, args, source|
prep.add(build_request(action, args, source))
end
response = prep.execute.actionGet()
# TODO(sissel): What format should the response be in?
end # def bulk
def build_request(action, args, source)
case action
when "index"
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
request.id(args[:_id]) if args[:_id]
request.source(source)
when "delete"
request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
request.id(args[:_id])
#when "update"
#when "create"
end # case action
request.type(args[:_type]) if args[:_type]
return request
end # def build_request
public(:initialize, :bulk)
end # class NodeClient
class TransportClient < NodeClient
private
def build_client(options)
client = org.elasticsearch.client.transport.TransportClient.new(settings.build)
if options[:host]
client.addTransportAddress(
org.elasticsearch.common.transport.InetSocketTransportAddress.new(
options[:host], options[:port]
)
)
end
return client
end # def build_client
end # class TransportClient
end # module Protocols
module Requests
class GetIndexTemplates; end
class Bulk; end
class Index; end
class Delete; end
end
end

View file

@ -64,7 +64,6 @@ Gem::Specification.new do |gem|
if RUBY_PLATFORM == 'java'
gem.platform = RUBY_PLATFORM
gem.add_runtime_dependency "jruby-elasticsearch", ["0.0.17"] #(BSD license)
gem.add_runtime_dependency "jruby-httpclient" #(Apache 2.0 license)
gem.add_runtime_dependency "bouncy-castle-java", "1.5.0147" #(MIT license)
gem.add_runtime_dependency "jruby-openssl", "0.8.7" #(CPL/GPL/LGPL license)