mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge pull request #1105 from jordansissel/elasticsearch-deletes-and-more
Elasticsearch deletes and unification (protocol => http)
This commit is contained in:
commit
f7b7cc3248
4 changed files with 571 additions and 126 deletions
|
@ -5,12 +5,11 @@ require "stud/buffer"
|
||||||
|
|
||||||
# This output lets you store logs in Elasticsearch and is the most recommended
|
# This output lets you store logs in Elasticsearch and is the most recommended
|
||||||
# output for Logstash. If you plan on using the Kibana web interface, you'll
|
# output for Logstash. If you plan on using the Kibana web interface, you'll
|
||||||
# need to use this output or the elasticsearch_http output.
|
# need to use this output.
|
||||||
#
|
#
|
||||||
# *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch
|
# *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch
|
||||||
# %ELASTICSEARCH_VERSION%. If you use any other version of Elasticsearch,
|
# %ELASTICSEARCH_VERSION%. If you use any other version of Elasticsearch,
|
||||||
# you should consider using the [elasticsearch_http](elasticsearch_http)
|
# you should set `protocol => http` in this plugin.
|
||||||
# output instead.
|
|
||||||
#
|
#
|
||||||
# If you want to set other Elasticsearch options that are not exposed directly
|
# If you want to set other Elasticsearch options that are not exposed directly
|
||||||
# as config options, there are two options:
|
# as config options, there are two options:
|
||||||
|
@ -18,21 +17,21 @@ require "stud/buffer"
|
||||||
# * create an elasticsearch.yml file in the $PWD of the Logstash process
|
# * create an elasticsearch.yml file in the $PWD of the Logstash process
|
||||||
# * pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
|
# * pass in es.* java properties (java -Des.node.foo= or ruby -J-Des.node.foo=)
|
||||||
#
|
#
|
||||||
# This plugin will join your Elasticsearch cluster, so it will show up in
|
# With the default `protocol` setting ("node"), this plugin will join your
|
||||||
# Elasticsearch's cluster health status.
|
# Elasticsearch cluster as a client node, so it will show up in Elasticsearch's
|
||||||
|
# cluster status.
|
||||||
#
|
#
|
||||||
# You can learn more about Elasticsearch at <http://www.elasticsearch.org>
|
# You can learn more about Elasticsearch at <http://www.elasticsearch.org>
|
||||||
#
|
#
|
||||||
# ## Operational Notes
|
# ## Operational Notes
|
||||||
#
|
#
|
||||||
# Template management is a new feature and requires at least version
|
# Template management requires at least version Elasticsearch 0.90.7> If you
|
||||||
# Elasticsearch 0.90.5+
|
# are still using a version older than this, please upgrade and receive
|
||||||
|
# more benefits than just template management!
|
||||||
#
|
#
|
||||||
# If you are still using a version older than this, please upgrade for
|
# If using the default `protocol` setting ("node"), your firewalls might need
|
||||||
# more benefits than just template management.
|
# to permit port 9300 in *both* directions (from Logstash to Elasticsearch, and
|
||||||
#
|
# Elasticsearch to Logstash)
|
||||||
# Your firewalls might need to permit port 9300 in *both* directions (from
|
|
||||||
# Logstash to Elasticsearch, and Elasticsearch to Logstash)
|
|
||||||
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
include Stud::Buffer
|
include Stud::Buffer
|
||||||
|
|
||||||
|
@ -88,9 +87,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
# work in your environment.
|
# work in your environment.
|
||||||
config :host, :validate => :string
|
config :host, :validate => :string
|
||||||
|
|
||||||
# The port for Elasticsearch transport to use. This is *not* the Elasticsearch
|
# The port for Elasticsearch transport to use.
|
||||||
# REST API port (normally 9200).
|
#
|
||||||
config :port, :validate => :string, :default => "9300-9305"
|
# If you do not set this, the following defaults are used:
|
||||||
|
# * `protocol => http` - port 9200
|
||||||
|
# * `protocol => transport` - port 9300-9305
|
||||||
|
# * `protocol => node` - port 9300-9305
|
||||||
|
config :port, :validate => :string
|
||||||
|
|
||||||
# The name/address of the host to bind to for Elasticsearch clustering
|
# The name/address of the host to bind to for Elasticsearch clustering
|
||||||
config :bind_host, :validate => :string
|
config :bind_host, :validate => :string
|
||||||
|
@ -125,7 +128,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
# events before flushing that out to Elasticsearch. This setting
|
# events before flushing that out to Elasticsearch. This setting
|
||||||
# controls how many events will be buffered before sending a batch
|
# controls how many events will be buffered before sending a batch
|
||||||
# of events.
|
# of events.
|
||||||
config :flush_size, :validate => :number, :default => 100
|
config :flush_size, :validate => :number, :default => 5000
|
||||||
|
|
||||||
# The amount of time since last flush before a flush is forced.
|
# The amount of time since last flush before a flush is forced.
|
||||||
#
|
#
|
||||||
|
@ -142,28 +145,100 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
#
|
#
|
||||||
# The 'node' protocol will connect to the cluster as a normal Elasticsearch
|
# The 'node' protocol will connect to the cluster as a normal Elasticsearch
|
||||||
# node (but will not store data). This allows you to use things like
|
# node (but will not store data). This allows you to use things like
|
||||||
# multicast discovery.
|
# multicast discovery. If you use the `node` protocol, you must permit
|
||||||
|
# bidirectional communication on the port 9300 (or whichever port you have
|
||||||
|
# configured).
|
||||||
#
|
#
|
||||||
# The 'transport' protocol will connect to the host you specify and will
|
# The 'transport' protocol will connect to the host you specify and will
|
||||||
# not show up as a 'node' in the Elasticsearch cluster. This is useful
|
# not show up as a 'node' in the Elasticsearch cluster. This is useful
|
||||||
# in situations where you cannot permit connections outbound from the
|
# in situations where you cannot permit connections outbound from the
|
||||||
# Elasticsearch cluster to this Logstash server.
|
# Elasticsearch cluster to this Logstash server.
|
||||||
config :protocol, :validate => [ "node", "transport" ], :default => "node"
|
#
|
||||||
|
# The default `protocol` setting under java/jruby is "node". The default
|
||||||
|
# `protocol` on non-java rubies is "http"
|
||||||
|
config :protocol, :validate => [ "node", "transport", "http" ]
|
||||||
|
|
||||||
|
# The Elasticsearch action to perform. Valid actions are: `index`, `delete`.
|
||||||
|
#
|
||||||
|
# Use of this setting *REQUIRES* you also configure the `document_id` setting
|
||||||
|
# because `delete` actions all require a document id.
|
||||||
|
#
|
||||||
|
# What does each action do?
|
||||||
|
#
|
||||||
|
# - index: indexes a document (an event from logstash).
|
||||||
|
# - delete: deletes a document by id
|
||||||
|
#
|
||||||
|
# For more details on actions, check out the [Elasticsearch bulk API documentation](http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html)
|
||||||
|
config :action, :validate => :string, :default => "index"
|
||||||
|
|
||||||
public
|
public
|
||||||
def register
|
def register
|
||||||
# TODO(sissel): find a better way of declaring where the Elasticsearch
|
client_settings = {}
|
||||||
# libraries are
|
client_settings["cluster.name"] = @cluster if @cluster
|
||||||
# TODO(sissel): can skip this step if we're running from a jar.
|
client_settings["network.host"] = @bind_host if @bind_host
|
||||||
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/jar/elasticsearch*/lib/*.jar")
|
client_settings["transport.tcp.port"] = @bind_port if @bind_port
|
||||||
Dir[jarpath].each do |jar|
|
|
||||||
require jar
|
if @node_name
|
||||||
|
client_settings["node.name"] = @node_name
|
||||||
|
else
|
||||||
|
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
|
||||||
end
|
end
|
||||||
|
|
||||||
# setup log4j properties for Elasticsearch
|
if @protocol.nil?
|
||||||
LogStash::Logger.setup_log4j(@logger)
|
@protocol = (RUBY_PLATFORM == "java") ? "node" : "http"
|
||||||
|
end
|
||||||
|
|
||||||
|
if ["node", "transport"].include?(@protocol)
|
||||||
|
# Node or TransportClient; requires JRuby
|
||||||
|
if RUBY_PLATFORM != "java"
|
||||||
|
raise LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }"
|
||||||
|
end
|
||||||
|
|
||||||
|
require "logstash/loadlibs"
|
||||||
|
# setup log4j properties for Elasticsearch
|
||||||
|
LogStash::Logger.setup_log4j(@logger)
|
||||||
|
end
|
||||||
|
|
||||||
|
require "logstash/outputs/elasticsearch/protocol"
|
||||||
|
|
||||||
|
if @port.nil?
|
||||||
|
@port = case @protocol
|
||||||
|
when "http"; "9200"
|
||||||
|
when "transport", "node"; "9300-9305"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if @host.nil? && @protocol == "http"
|
||||||
|
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
|
||||||
|
@host = "localhost"
|
||||||
|
end
|
||||||
|
|
||||||
|
options = {
|
||||||
|
:host => @host,
|
||||||
|
:port => @port,
|
||||||
|
:client_settings => client_settings
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
client_class = case @protocol
|
||||||
|
when "transport"
|
||||||
|
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
|
||||||
|
when "node"
|
||||||
|
LogStash::Outputs::Elasticsearch::Protocols::NodeClient
|
||||||
|
when "http"
|
||||||
|
LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
|
||||||
|
end
|
||||||
|
|
||||||
|
@client = client_class.new(options)
|
||||||
|
|
||||||
|
@logger.info("New Elasticsearch output", :cluster => @cluster,
|
||||||
|
:host => @host, :port => @port, :embedded => @embedded,
|
||||||
|
:protocol => @protocol)
|
||||||
|
|
||||||
if @embedded
|
if @embedded
|
||||||
|
if RUBY_PLATFORM == "java"
|
||||||
|
raise LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}"
|
||||||
|
end
|
||||||
# Default @host with embedded to localhost. This should help avoid
|
# Default @host with embedded to localhost. This should help avoid
|
||||||
# newbies tripping on ubuntu and other distros that have a default
|
# newbies tripping on ubuntu and other distros that have a default
|
||||||
# firewall that blocks multicast.
|
# firewall that blocks multicast.
|
||||||
|
@ -172,85 +247,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
# Start Elasticsearch local.
|
# Start Elasticsearch local.
|
||||||
start_local_elasticsearch
|
start_local_elasticsearch
|
||||||
end
|
end
|
||||||
require "jruby-elasticsearch"
|
|
||||||
|
|
||||||
@logger.info("New Elasticsearch output", :cluster => @cluster,
|
|
||||||
:host => @host, :port => @port, :embedded => @embedded)
|
|
||||||
options = {
|
|
||||||
:cluster => @cluster,
|
|
||||||
:host => @host,
|
|
||||||
:port => @port,
|
|
||||||
:bind_host => @bind_host,
|
|
||||||
:node_name => @node_name,
|
|
||||||
}
|
|
||||||
|
|
||||||
# :node or :transport protocols
|
|
||||||
options[:type] = @protocol.to_sym
|
|
||||||
|
|
||||||
options[:bind_port] = @bind_port unless @bind_port.nil?
|
|
||||||
|
|
||||||
# TransportClient requires a number for a port.
|
|
||||||
options[:port] = options[:port].to_i if options[:type] == :transport
|
|
||||||
|
|
||||||
@client = ElasticSearch::Client.new(options)
|
|
||||||
|
|
||||||
# Check to see if we *can* get the template
|
|
||||||
java_client = @client.instance_eval{@client}
|
|
||||||
begin
|
|
||||||
check_template = ElasticSearch::GetIndexTemplatesRequest.new(java_client, @template_name)
|
|
||||||
result = check_template.execute #=> Run previously...
|
|
||||||
rescue Exception => e
|
|
||||||
@logger.error("Unable to check template. Automatic template management disabled.", :error => e.to_s)
|
|
||||||
@manage_template = false
|
|
||||||
end
|
|
||||||
|
|
||||||
if @manage_template
|
if @manage_template
|
||||||
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
|
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
|
||||||
if @template_overwrite
|
@client.template_install(@template_name, get_template, @template_overwrite)
|
||||||
@logger.info("Template overwrite enabled. Deleting template if it exists.", :template_overwrite => @template_overwrite.to_s)
|
|
||||||
if !result.getIndexTemplates.isEmpty
|
|
||||||
delete_template = ElasticSearch::DeleteIndexTemplateRequest.new(java_client, @template_name)
|
|
||||||
result = delete_template.execute
|
|
||||||
if result.isAcknowledged
|
|
||||||
@logger.info("Successfully deleted template", :template_name => @template_name)
|
|
||||||
else
|
|
||||||
@logger.error("Failed to delete template", :template_name => @template_name)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end # end if @template_overwrite
|
|
||||||
has_template = false
|
|
||||||
@logger.debug("Fetching all templates...")
|
|
||||||
gettemplates = ElasticSearch::GetIndexTemplatesRequest.new(java_client, "*")
|
|
||||||
result = gettemplates.execute
|
|
||||||
# Results of this come as a list, so we need to iterate through it
|
|
||||||
if !result.getIndexTemplates.isEmpty
|
|
||||||
template_metadata_list = result.getIndexTemplates
|
|
||||||
templates = {}
|
|
||||||
i = 0
|
|
||||||
template_metadata_list.size.times do
|
|
||||||
template_data = template_metadata_list.get(i)
|
|
||||||
templates[template_data.name] = template_data.template
|
|
||||||
i += 1
|
|
||||||
end
|
|
||||||
template_idx_name = @index.sub(/%{[^}]+}/,'*')
|
|
||||||
alt_template_idx_name = @index.sub(/-%{[^}]+}/,'*')
|
|
||||||
if !templates.any? { |k,v| v == template_idx_name || v == alt_template_idx_name }
|
|
||||||
@logger.debug("No Logstash template found in Elasticsearch", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name)
|
|
||||||
else
|
|
||||||
has_template = true
|
|
||||||
@logger.info("Found existing Logstash template match.", :has_template => has_template, :name => template_idx_name, :alt => alt_template_idx_name, :templates => templates.to_s)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
if !has_template #=> No template found, we're going to add one
|
|
||||||
get_template_json
|
|
||||||
put_template = ElasticSearch::PutIndexTemplateRequest.new(java_client, @template_name, @template_json)
|
|
||||||
result = put_template.execute
|
|
||||||
if result.isAcknowledged
|
|
||||||
@logger.info("Successfully inserted template", :template_name => @template_name)
|
|
||||||
else
|
|
||||||
@logger.error("Failed to insert template", :template_name => @template_name)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end # if @manage_templates
|
end # if @manage_templates
|
||||||
|
|
||||||
buffer_initialize(
|
buffer_initialize(
|
||||||
|
@ -261,7 +262,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
end # def register
|
end # def register
|
||||||
|
|
||||||
public
|
public
|
||||||
def get_template_json
|
def get_template
|
||||||
if @template.nil?
|
if @template.nil?
|
||||||
if __FILE__ =~ /^(jar:)?file:\/.+!.+/
|
if __FILE__ =~ /^(jar:)?file:\/.+!.+/
|
||||||
begin
|
begin
|
||||||
|
@ -281,8 +282,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
@template_json = IO.read(@template).gsub(/\n/,'')
|
template_json = IO.read(@template).gsub(/\n/,'')
|
||||||
@logger.info("Using mapping template", :template => @template_json)
|
@logger.info("Using mapping template", :template => template_json)
|
||||||
|
return JSON.parse(template_json)
|
||||||
end # def get_template
|
end # def get_template
|
||||||
|
|
||||||
protected
|
protected
|
||||||
|
@ -291,8 +293,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
|
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
|
||||||
# Disable 'local only' - LOGSTASH-277
|
# Disable 'local only' - LOGSTASH-277
|
||||||
#builder.local(true)
|
#builder.local(true)
|
||||||
builder.settings.put("cluster.name", @cluster) if !@cluster.nil?
|
builder.settings.put("cluster.name", @cluster) if @cluster
|
||||||
builder.settings.put("node.name", @node_name) if !@node_name.nil?
|
builder.settings.put("node.name", @node_name) if @node_name
|
||||||
builder.settings.put("http.port", @embedded_http_port)
|
builder.settings.put("http.port", @embedded_http_port)
|
||||||
|
|
||||||
@embedded_elasticsearch = builder.node
|
@embedded_elasticsearch = builder.node
|
||||||
|
@ -302,28 +304,22 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
|
||||||
public
|
public
|
||||||
def receive(event)
|
def receive(event)
|
||||||
return unless output?(event)
|
return unless output?(event)
|
||||||
buffer_receive([event, index, type])
|
|
||||||
end # def receive
|
|
||||||
|
|
||||||
def flush(events, teardown=false)
|
# Set the 'type' value for the index.
|
||||||
request = @client.bulk
|
if @index_type
|
||||||
events.each do |event, index, type|
|
type = event.sprintf(@index_type)
|
||||||
index = event.sprintf(@index)
|
else
|
||||||
|
type = event["type"] || "logs"
|
||||||
# Set the 'type' value for the index.
|
|
||||||
if @index_type.nil?
|
|
||||||
type = event["type"] || "logs"
|
|
||||||
else
|
|
||||||
type = event.sprintf(@index_type)
|
|
||||||
end
|
|
||||||
if @document_id
|
|
||||||
request.index(index, type, event.sprintf(@document_id), event.to_json)
|
|
||||||
else
|
|
||||||
request.index(index, type, nil, event.to_json)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
request.execute!
|
index = event.sprintf(@index)
|
||||||
|
|
||||||
|
document_id = @document_id ? event.sprintf(@document_id) : nil
|
||||||
|
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type }, event.to_hash])
|
||||||
|
end # def receive
|
||||||
|
|
||||||
|
def flush(actions, teardown=false)
|
||||||
|
@client.bulk(actions)
|
||||||
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
|
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
|
||||||
# (aka partially fail), we need to figure out what documents need to be
|
# (aka partially fail), we need to figure out what documents need to be
|
||||||
# retried.
|
# retried.
|
||||||
|
|
271
lib/logstash/outputs/elasticsearch/protocol.rb
Normal file
271
lib/logstash/outputs/elasticsearch/protocol.rb
Normal file
|
@ -0,0 +1,271 @@
|
||||||
|
require "logstash/outputs/elasticsearch"
|
||||||
|
require "cabin"
|
||||||
|
|
||||||
|
module LogStash::Outputs::Elasticsearch
|
||||||
|
module Protocols
|
||||||
|
class Base
|
||||||
|
private
|
||||||
|
def initialize(options={})
|
||||||
|
# host(s), port, cluster
|
||||||
|
@logger = Cabin::Channel.get
|
||||||
|
end
|
||||||
|
|
||||||
|
def client
|
||||||
|
return @client if @client
|
||||||
|
@client = build_client(@options)
|
||||||
|
return @client
|
||||||
|
end # def client
|
||||||
|
|
||||||
|
|
||||||
|
def template_install(name, template, force=false)
|
||||||
|
if template_exists?(name) && !force
|
||||||
|
@logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
template_put(name, template)
|
||||||
|
end
|
||||||
|
|
||||||
|
# Do a bulk request with the given actions.
|
||||||
|
#
|
||||||
|
# 'actions' is expected to be an array of bulk requests as string json
|
||||||
|
# values.
|
||||||
|
#
|
||||||
|
# Each 'action' becomes a single line in the bulk api call. For more
|
||||||
|
# details on the format of each.
|
||||||
|
def bulk(actions)
|
||||||
|
raise NotImplemented, "You must implement this yourself"
|
||||||
|
# bulk([
|
||||||
|
# '{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }',
|
||||||
|
# '{ "field1" : "value1" }'
|
||||||
|
#])
|
||||||
|
end
|
||||||
|
|
||||||
|
public(:initialize, :template_install)
|
||||||
|
end
|
||||||
|
|
||||||
|
class HTTPClient < Base
|
||||||
|
private
|
||||||
|
|
||||||
|
DEFAULT_OPTIONS = {
|
||||||
|
:port => 9200
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(options={})
|
||||||
|
require "ftw"
|
||||||
|
super
|
||||||
|
require "elasticsearch" # gem 'elasticsearch-ruby'
|
||||||
|
@options = DEFAULT_OPTIONS.merge(options)
|
||||||
|
@client = client
|
||||||
|
end
|
||||||
|
|
||||||
|
def build_client(options)
|
||||||
|
client = Elasticsearch::Client.new(
|
||||||
|
:host => [options[:host], options[:port]].join(":")
|
||||||
|
)
|
||||||
|
|
||||||
|
# Use FTW to do indexing requests, for now, until we
|
||||||
|
# can identify and resolve performance problems of elasticsearch-ruby
|
||||||
|
@bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk"
|
||||||
|
@agent = FTW::Agent.new
|
||||||
|
|
||||||
|
return client
|
||||||
|
end
|
||||||
|
|
||||||
|
if ENV["BULK"] == "esruby"
|
||||||
|
def bulk(actions)
|
||||||
|
bulk_esruby(actions)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
def bulk(actions)
|
||||||
|
bulk_ftw(actions)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def bulk_esruby(actions)
|
||||||
|
@client.bulk(:body => actions.collect do |action, args, source|
|
||||||
|
if source
|
||||||
|
next [ { action => args }, source ]
|
||||||
|
else
|
||||||
|
next { action => args }
|
||||||
|
end
|
||||||
|
end.flatten)
|
||||||
|
end # def bulk_esruby
|
||||||
|
|
||||||
|
# Avoid creating a new string for newline every time
|
||||||
|
NEWLINE = "\n".freeze
|
||||||
|
def bulk_ftw(actions)
|
||||||
|
body = actions.collect do |action, args, source|
|
||||||
|
header = { action => args }
|
||||||
|
if source
|
||||||
|
next [ header.to_json, NEWLINE, source.to_json, NEWLINE ]
|
||||||
|
else
|
||||||
|
next [ header.to_json, NEWLINE ]
|
||||||
|
end
|
||||||
|
end.flatten.join("")
|
||||||
|
begin
|
||||||
|
response = @agent.post!(@bulk_url, :body => body)
|
||||||
|
rescue EOFError
|
||||||
|
@logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
|
||||||
|
return # abort this flush
|
||||||
|
end
|
||||||
|
|
||||||
|
# Consume the body for error checking
|
||||||
|
# This will also free up the connection for reuse.
|
||||||
|
response_body = ""
|
||||||
|
begin
|
||||||
|
response.read_body { |chunk| response_body += chunk }
|
||||||
|
rescue EOFError
|
||||||
|
@logger.warn("EOF while reading response body from elasticsearch",
|
||||||
|
:url => @bulk_url)
|
||||||
|
return # abort this flush
|
||||||
|
end
|
||||||
|
|
||||||
|
if response.status != 200
|
||||||
|
@logger.error("Error writing (bulk) to elasticsearch",
|
||||||
|
:response => response, :response_body => response_body,
|
||||||
|
:request_body => body)
|
||||||
|
return
|
||||||
|
end
|
||||||
|
end # def bulk_ftw
|
||||||
|
|
||||||
|
def template_exists?(name)
|
||||||
|
@client.indices.get_template(:name => name)
|
||||||
|
return true
|
||||||
|
rescue Elasticsearch::Transport::Transport::Errors::NotFound
|
||||||
|
return false
|
||||||
|
end # def template_exists?
|
||||||
|
|
||||||
|
def template_put(name, template)
|
||||||
|
@client.indices.put_template(:name => name, :body => template)
|
||||||
|
end # template_put
|
||||||
|
|
||||||
|
public(:bulk)
|
||||||
|
end # class HTTPClient
|
||||||
|
|
||||||
|
class NodeClient < Base
|
||||||
|
private
|
||||||
|
|
||||||
|
DEFAULT_OPTIONS = {
|
||||||
|
:port => 9300,
|
||||||
|
}
|
||||||
|
|
||||||
|
def initialize(options={})
|
||||||
|
super
|
||||||
|
require "java"
|
||||||
|
@options = DEFAULT_OPTIONS.merge(options)
|
||||||
|
setup(@options)
|
||||||
|
@client = client
|
||||||
|
end # def initialize
|
||||||
|
|
||||||
|
def settings
|
||||||
|
return @settings
|
||||||
|
end
|
||||||
|
|
||||||
|
def setup(options={})
|
||||||
|
@settings = org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder
|
||||||
|
if options[:host]
|
||||||
|
@settings.put("discovery.zen.ping.multicast.enabled", false)
|
||||||
|
@settings.put("discovery.zen.ping.unicast.hosts", hosts(options))
|
||||||
|
end
|
||||||
|
|
||||||
|
@settings.put("node.client", true)
|
||||||
|
@settings.put("http.enabled", false)
|
||||||
|
|
||||||
|
if options[:client_settings]
|
||||||
|
options[:client_settings].each do |key, value|
|
||||||
|
@settings.put(key, value)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
return @settings
|
||||||
|
end
|
||||||
|
|
||||||
|
def hosts(options)
|
||||||
|
if options[:port].to_s =~ /^\d+-\d+$/
|
||||||
|
# port ranges are 'host[port1-port2]' according to
|
||||||
|
# http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
|
||||||
|
# However, it seems to only query the first port.
|
||||||
|
# So generate our own list of unicast hosts to scan.
|
||||||
|
range = Range.new(*options[:port].split("-"))
|
||||||
|
return range.collect { |p| "#{options[:host]}:#{p}" }.join(",")
|
||||||
|
else
|
||||||
|
return "#{options[:host]}:#{options[:port]}"
|
||||||
|
end
|
||||||
|
end # def hosts
|
||||||
|
|
||||||
|
def build_client(options)
|
||||||
|
nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder
|
||||||
|
return nodebuilder.settings(@settings).node.client
|
||||||
|
end # def build_client
|
||||||
|
|
||||||
|
def bulk(actions)
|
||||||
|
# Actions an array of [ action, action_metadata, source ]
|
||||||
|
prep = @client.prepareBulk
|
||||||
|
actions.each do |action, args, source|
|
||||||
|
prep.add(build_request(action, args, source))
|
||||||
|
end
|
||||||
|
response = prep.execute.actionGet()
|
||||||
|
|
||||||
|
# TODO(sissel): What format should the response be in?
|
||||||
|
end # def bulk
|
||||||
|
|
||||||
|
def build_request(action, args, source)
|
||||||
|
case action
|
||||||
|
when "index"
|
||||||
|
request = org.elasticsearch.action.index.IndexRequest.new(args[:_index])
|
||||||
|
request.id(args[:_id]) if args[:_id]
|
||||||
|
request.source(source)
|
||||||
|
when "delete"
|
||||||
|
request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index])
|
||||||
|
request.id(args[:_id])
|
||||||
|
#when "update"
|
||||||
|
#when "create"
|
||||||
|
end # case action
|
||||||
|
|
||||||
|
request.type(args[:_type]) if args[:_type]
|
||||||
|
return request
|
||||||
|
end # def build_request
|
||||||
|
|
||||||
|
def template_exists?(name)
|
||||||
|
request = org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequestBuilder.new(@client.admin.indices, name)
|
||||||
|
response = request.get
|
||||||
|
return !response.getIndexTemplates.isEmpty
|
||||||
|
end # def template_exists?
|
||||||
|
|
||||||
|
def template_put(name, template)
|
||||||
|
request = org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder.new(@client.admin.indices, name)
|
||||||
|
request.setSource(template.to_json)
|
||||||
|
|
||||||
|
# execute the request and get the response, if it fails, we'll get an exception.
|
||||||
|
request.get
|
||||||
|
end # template_put
|
||||||
|
|
||||||
|
public(:initialize, :bulk)
|
||||||
|
end # class NodeClient
|
||||||
|
|
||||||
|
class TransportClient < NodeClient
|
||||||
|
private
|
||||||
|
def build_client(options)
|
||||||
|
client = org.elasticsearch.client.transport.TransportClient.new(settings.build)
|
||||||
|
|
||||||
|
if options[:host]
|
||||||
|
client.addTransportAddress(
|
||||||
|
org.elasticsearch.common.transport.InetSocketTransportAddress.new(
|
||||||
|
options[:host], options[:port].to_i
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
return client
|
||||||
|
end # def build_client
|
||||||
|
end # class TransportClient
|
||||||
|
end # module Protocols
|
||||||
|
|
||||||
|
module Requests
|
||||||
|
class GetIndexTemplates; end
|
||||||
|
class Bulk; end
|
||||||
|
class Index; end
|
||||||
|
class Delete; end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
|
@ -63,7 +63,6 @@ Gem::Specification.new do |gem|
|
||||||
|
|
||||||
if RUBY_PLATFORM == 'java'
|
if RUBY_PLATFORM == 'java'
|
||||||
gem.platform = RUBY_PLATFORM
|
gem.platform = RUBY_PLATFORM
|
||||||
gem.add_runtime_dependency "jruby-elasticsearch", ["0.0.17"] #(BSD license)
|
|
||||||
gem.add_runtime_dependency "jruby-httpclient" #(Apache 2.0 license)
|
gem.add_runtime_dependency "jruby-httpclient" #(Apache 2.0 license)
|
||||||
gem.add_runtime_dependency "bouncy-castle-java", "1.5.0147" #(MIT license)
|
gem.add_runtime_dependency "bouncy-castle-java", "1.5.0147" #(MIT license)
|
||||||
gem.add_runtime_dependency "jruby-openssl", "0.8.7" #(CPL/GPL/LGPL license)
|
gem.add_runtime_dependency "jruby-openssl", "0.8.7" #(CPL/GPL/LGPL license)
|
||||||
|
|
|
@ -158,4 +158,183 @@ describe "outputs/elasticsearch" do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "action => ..." do
|
||||||
|
index_name = 10.times.collect { rand(10).to_s }.join("")
|
||||||
|
|
||||||
|
config <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "hello world"
|
||||||
|
count => 100
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
elasticsearch {
|
||||||
|
host => "127.0.0.1"
|
||||||
|
index => "#{index_name}"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
|
||||||
|
agent do
|
||||||
|
ftw = FTW::Agent.new
|
||||||
|
ftw.post!("http://localhost:9200/#{index_name}/_refresh")
|
||||||
|
|
||||||
|
# Wait until all events are available.
|
||||||
|
Stud::try(10.times) do
|
||||||
|
data = ""
|
||||||
|
response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_count?q=*")
|
||||||
|
response.read_body { |chunk| data << chunk }
|
||||||
|
result = JSON.parse(data)
|
||||||
|
count = result["count"]
|
||||||
|
insist { count } == 100
|
||||||
|
end
|
||||||
|
|
||||||
|
response = ftw.get!("http://127.0.0.1:9200/#{index_name}/_search?q=*&size=1000")
|
||||||
|
data = ""
|
||||||
|
response.read_body { |chunk| data << chunk }
|
||||||
|
result = JSON.parse(data)
|
||||||
|
result["hits"]["hits"].each do |doc|
|
||||||
|
insist { doc["_type"] } == "logs"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "default event type value" do
|
||||||
|
# Generate a random index name
|
||||||
|
index = 10.times.collect { rand(10).to_s }.join("")
|
||||||
|
event_count = 100 + rand(100)
|
||||||
|
flush_size = rand(200) + 1
|
||||||
|
|
||||||
|
config <<-CONFIG
|
||||||
|
input {
|
||||||
|
generator {
|
||||||
|
message => "hello world"
|
||||||
|
count => #{event_count}
|
||||||
|
type => "generated"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output {
|
||||||
|
elasticsearch {
|
||||||
|
host => "127.0.0.1"
|
||||||
|
index => "#{index}"
|
||||||
|
flush_size => #{flush_size}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
agent do
|
||||||
|
ftw = FTW::Agent.new
|
||||||
|
ftw.post!("http://localhost:9200/#{index}/_refresh")
|
||||||
|
|
||||||
|
# Wait until all events are available.
|
||||||
|
Stud::try(10.times) do
|
||||||
|
data = ""
|
||||||
|
response = ftw.get!("http://127.0.0.1:9200/#{index}/_count?q=*")
|
||||||
|
response.read_body { |chunk| data << chunk }
|
||||||
|
result = JSON.parse(data)
|
||||||
|
count = result["count"]
|
||||||
|
insist { count } == event_count
|
||||||
|
end
|
||||||
|
|
||||||
|
response = ftw.get!("http://127.0.0.1:9200/#{index}/_search?q=*&size=1000")
|
||||||
|
data = ""
|
||||||
|
response.read_body { |chunk| data << chunk }
|
||||||
|
result = JSON.parse(data)
|
||||||
|
result["hits"]["hits"].each do |doc|
|
||||||
|
insist { doc["_type"] } == "generated"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "index template expected behavior" do
|
||||||
|
["node", "transport", "http"].each do |protocol|
|
||||||
|
context "with protocol => #{protocol}" do
|
||||||
|
subject do
|
||||||
|
require "logstash/outputs/elasticsearch"
|
||||||
|
settings = {
|
||||||
|
"manage_template" => true,
|
||||||
|
"template_overwrite" => true,
|
||||||
|
"protocol" => protocol,
|
||||||
|
"host" => "localhost"
|
||||||
|
}
|
||||||
|
next LogStash::Outputs::ElasticSearch.new(settings)
|
||||||
|
end
|
||||||
|
|
||||||
|
before :each do
|
||||||
|
# Delete all templates first.
|
||||||
|
require "elasticsearch"
|
||||||
|
|
||||||
|
# Clean ES of data before we start.
|
||||||
|
@es = Elasticsearch::Client.new
|
||||||
|
@es.indices.delete_template(:name => "*")
|
||||||
|
|
||||||
|
# This can fail if there are no indexes, ignore failure.
|
||||||
|
@es.indices.delete(:index => "*") rescue nil
|
||||||
|
|
||||||
|
subject.register
|
||||||
|
|
||||||
|
subject.receive(LogStash::Event.new("message" => "sample message here"))
|
||||||
|
subject.receive(LogStash::Event.new("somevalue" => 100))
|
||||||
|
subject.receive(LogStash::Event.new("somevalue" => 10))
|
||||||
|
subject.receive(LogStash::Event.new("somevalue" => 1))
|
||||||
|
subject.receive(LogStash::Event.new("country" => "us"))
|
||||||
|
subject.receive(LogStash::Event.new("country" => "at"))
|
||||||
|
subject.receive(LogStash::Event.new("geoip" => { "location" => [ 0.0, 0.0 ] }))
|
||||||
|
subject.buffer_flush(:final => true)
|
||||||
|
@es.indices.refresh
|
||||||
|
|
||||||
|
# Wait or fail until everything's indexed.
|
||||||
|
Stud::try(20.times) do
|
||||||
|
r = @es.search
|
||||||
|
insist { r["hits"]["total"] } == 7
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it "permits phrase searching on string fields" do
|
||||||
|
results = @es.search(:q => "message:\"sample message\"")
|
||||||
|
insist { results["hits"]["total"] } == 1
|
||||||
|
insist { results["hits"]["hits"][0]["_source"]["message"] } == "sample message here"
|
||||||
|
end
|
||||||
|
|
||||||
|
it "numbers dynamically map to a numeric type and permit range queries" do
|
||||||
|
results = @es.search(:q => "somevalue:[5 TO 105]")
|
||||||
|
insist { results["hits"]["total"] } == 2
|
||||||
|
|
||||||
|
values = results["hits"]["hits"].collect { |r| r["_source"]["somevalue"] }
|
||||||
|
insist { values }.include?(10)
|
||||||
|
insist { values }.include?(100)
|
||||||
|
reject { values }.include?(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "creates .raw field fro any string field which is not_analyzed" do
|
||||||
|
results = @es.search(:q => "message.raw:\"sample message here\"")
|
||||||
|
insist { results["hits"]["total"] } == 1
|
||||||
|
insist { results["hits"]["hits"][0]["_source"]["message"] } == "sample message here"
|
||||||
|
|
||||||
|
# partial or terms should not work.
|
||||||
|
results = @es.search(:q => "message.raw:\"sample\"")
|
||||||
|
insist { results["hits"]["total"] } == 0
|
||||||
|
end
|
||||||
|
|
||||||
|
it "make [geoip][location] a geo_point" do
|
||||||
|
results = @es.search(:body => { "filter" => { "geo_distance" => { "distance" => "1000km", "geoip.location" => { "lat" => 0.5, "lon" => 0.5 } } } })
|
||||||
|
insist { results["hits"]["total"] } == 1
|
||||||
|
insist { results["hits"]["hits"][0]["_source"]["geoip"]["location"] } == [ 0.0, 0.0 ]
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should index stopwords like 'at' " do
|
||||||
|
results = @es.search(:body => { "facets" => { "t" => { "terms" => { "field" => "country" } } } })["facets"]["t"]
|
||||||
|
terms = results["terms"].collect { |t| t["term"] }
|
||||||
|
|
||||||
|
insist { terms }.include?("us")
|
||||||
|
|
||||||
|
# 'at' is a stopword, make sure stopwords are not ignored.
|
||||||
|
insist { terms }.include?("at")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue