mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Use ftw instead of elasticsearch-ruby for bulk indexing
In tests, using ftw directly results in 30% more events/sec than using elasticsearch-ruby for bulk requests.
This commit is contained in:
parent
52a53a605c
commit
fb18f76277
1 changed files with 57 additions and 3 deletions
|
@ -51,10 +51,12 @@ module LogStash::Outputs::Elasticsearch
|
|||
}
|
||||
|
||||
def initialize(options={})
|
||||
require "ftw"
|
||||
super
|
||||
require "elasticsearch" # gem 'elasticsearch-ruby'
|
||||
@options = DEFAULT_OPTIONS.merge(options)
|
||||
@client = client
|
||||
|
||||
end
|
||||
|
||||
def build_client(options)
|
||||
|
@ -62,10 +64,25 @@ module LogStash::Outputs::Elasticsearch
|
|||
:host => [options[:host], options[:port]].join(":")
|
||||
)
|
||||
|
||||
# Use FTW to do indexing requests, for now, until we
|
||||
# can identify and resolve performance problems of elasticsearch-ruby
|
||||
@bulk_url = "http://#{options[:host]}:#{options[:port]}/_bulk"
|
||||
@agent = FTW::Agent.new
|
||||
|
||||
return client
|
||||
end
|
||||
|
||||
def bulk(actions)
|
||||
if ENV["BULK"] == "esruby"
|
||||
def bulk(actions)
|
||||
bulk_esruby(actions)
|
||||
end
|
||||
else
|
||||
def bulk(actions)
|
||||
bulk_ftw(actions)
|
||||
end
|
||||
end
|
||||
|
||||
def bulk_esruby(actions)
|
||||
@client.bulk(:body => actions.collect do |action, args, source|
|
||||
if source
|
||||
next [ { action => args }, source ]
|
||||
|
@ -73,10 +90,47 @@ module LogStash::Outputs::Elasticsearch
|
|||
next { action => args }
|
||||
end
|
||||
end.flatten)
|
||||
end # def bulk
|
||||
end # def bulk_esruby
|
||||
|
||||
# Avoid creating a new string for newline every time
|
||||
NEWLINE = "\n".freeze
|
||||
def bulk_ftw(actions)
|
||||
body = actions.collect do |action, args, source|
|
||||
header = { action => args }
|
||||
if source
|
||||
next [ header.to_json, NEWLINE, source.to_json, NEWLINE ]
|
||||
else
|
||||
next [ header.to_json, NEWLINE ]
|
||||
end
|
||||
end.flatten.join("")
|
||||
begin
|
||||
response = @agent.post!(@bulk_url, :body => body)
|
||||
rescue EOFError
|
||||
@logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
|
||||
return # abort this flush
|
||||
end
|
||||
|
||||
# Consume the body for error checking
|
||||
# This will also free up the connection for reuse.
|
||||
response_body = ""
|
||||
begin
|
||||
response.read_body { |chunk| response_body += chunk }
|
||||
rescue EOFError
|
||||
@logger.warn("EOF while reading response body from elasticsearch",
|
||||
:url => @bulk_url)
|
||||
return # abort this flush
|
||||
end
|
||||
|
||||
if response.status != 200
|
||||
@logger.error("Error writing (bulk) to elasticsearch",
|
||||
:response => response, :response_body => response_body,
|
||||
:request_body => body)
|
||||
return
|
||||
end
|
||||
end # def bulk_ftw
|
||||
|
||||
public(:bulk)
|
||||
end
|
||||
end # class HTTPClient
|
||||
|
||||
class NodeClient < Base
|
||||
private
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue