- Support teardown

- Fix off-by-one on flushing
- Remove embedded ES support
This commit is contained in:
Jordan Sissel 2012-09-01 16:25:11 -07:00
parent 1f991c7417
commit 4b287d8b87

View file

@ -12,7 +12,7 @@ require "logstash/outputs/base"
class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
config_name "elasticsearch_http"
plugin_status "experimental"
plugin_status "beta"
# ElasticSearch server name. This is optional if your server is discoverable.
config :host, :validate => :string
@ -39,17 +39,6 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
# REST API port (normally 9200).
config :port, :validate => :number, :default => 9200
# Run the elasticsearch server embedded in this process.
# This option is useful if you want to run a single logstash process that
# handles log processing and indexing; it saves you from needing to run
# a separate elasticsearch process.
config :embedded, :validate => :boolean, :default => false
# If you are running the embedded elasticsearch server, you can set the http
# port it listens on here; it is not common to need this setting changed from
# default.
config :embedded_http_port, :validate => :string, :default => "9200-9300"
# Set the number of events to queue up before writing to elasticsearch.
#
# If this value is set to 1, the normal ['index
@ -61,33 +50,6 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
public
def register
# TODO(sissel): find a better way of declaring where the elasticsearch
# libraries are
# TODO(sissel): can skip this step if we're running from a jar.
jarpath = File.join(File.dirname(__FILE__), "../../../vendor/**/*.jar")
Dir[jarpath].each do |jar|
require jar
end
# setup log4j properties for elasticsearch
@logger.setup_log4j
if @embedded
# Check for settings that are incompatible with @embedded
%w(host).each do |name|
if instance_variable_get("@#{name}")
@logger.error("outputs/elasticsearch: You cannot specify " \
"'embedded => true' and also set '#{name}'")
raise "Invalid configuration detected. Please fix."
end
# Force localhost for embedded elasticsearch
@host = "localhost"
end
# Start elasticsearch local.
start_local_elasticsearch
end
require "ftw" # gem ftw
@agent = FTW::Agent.new
@ -103,19 +65,6 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
@queue = []
end # def register
protected
def start_local_elasticsearch
@logger.info("Starting embedded ElasticSearch local node.")
builder = org.elasticsearch.node.NodeBuilder.nodeBuilder
# Disable 'local only' - LOGSTASH-277
#builder.local(true)
builder.settings.put("cluster.name", @cluster) if !@cluster.nil?
builder.settings.put("http.port", @embedded_http_port)
@embedded_elasticsearch = builder.node
@embedded_elasticsearch.start
end # def start_local_elasticsearch
public
def receive(event)
return unless output?(event)
@ -151,21 +100,31 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
{ "index" => { "_index" => index, "_type" => type } }.to_json,
event.to_json
].join("\n")
if @queue.size > @flush_size
response = @agent.post!("http://#{@host}:#{@port}/_bulk",
:body => @queue.join("\n"))
@queue.clear
# We must read the body to free up this connection for reuse.
body = "";
response.read_body { |chunk| body += chunk }
#if response.status != 201
if response.status != 200
@logger.error("Error writing (bulk) to elasticsearch",
:response => response, :response_body => body)
end
end
flush if @queue.size >= @flush_size
end # def receive_bulk
end # class LogStash::Outputs::Elasticsearch
def flush
puts "Flushing #{@queue.count} events"
# If we don't tack a trailing newline at the end, elasticsearch
# doesn't seem to process the last event in this bulk index call.
response = @agent.post!("http://#{@host}:#{@port}/_bulk",
:body => @queue.join("\n") + "\n")
# Consume the body for error checking
# This will also free up the connection for reuse.
body = ""
response.read_body { |chunk| body += chunk }
if response.status != 200
@logger.error("Error writing (bulk) to elasticsearch",
:response => response, :response_body => body,
:request_body => @queue.join("\n"))
end
@queue.clear
end # def flush
def teardown
flush if @queue.size > 0
end # def teardown
end # class LogStash::Outputs::ElasticSearchHTTP