Merge pull request #1182 from jordansissel/retry-on-failure

Retry errors writing to Elasticsearch
This commit is contained in:
Jordan Sissel 2014-03-18 17:38:27 -07:00
commit 289d5d1b8d
3 changed files with 9 additions and 6 deletions

View file

@ -332,6 +332,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
# (aka partially fail), we need to figure out what documents need to be
# retried.
#
# In the worst case, a failing flush (exception) will incur a retry from Stud::Buffer.
end # def flush
def teardown

View file

@ -106,7 +106,7 @@ module LogStash::Outputs::Elasticsearch
response = @agent.post!(@bulk_url, :body => body)
rescue EOFError
@logger.warn("EOF while writing request or reading response header from elasticsearch", :host => @host, :port => @port)
return # abort this flush
raise
end
# Consume the body for error checking
@ -117,14 +117,14 @@ module LogStash::Outputs::Elasticsearch
rescue EOFError
@logger.warn("EOF while reading response body from elasticsearch",
:url => @bulk_url)
return # abort this flush
raise
end
if response.status != 200
@logger.error("Error writing (bulk) to elasticsearch",
:response => response, :response_body => response_body,
:request_body => body)
return
raise "Non-OK response code from Elasticsearch: #{response.status}"
end
end # def bulk_ftw

View file

@ -219,6 +219,7 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
[ header.to_json, newline, event.to_json, newline ]
end.flatten
post(body.join(""))
end # def receive_bulk
@ -228,7 +229,7 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
rescue EOFError
@logger.warn("EOF while writing request or reading response header from elasticsearch",
:host => @host, :port => @port)
return # abort this flush
raise
end
# Consume the body for error checking
@ -239,14 +240,14 @@ class LogStash::Outputs::ElasticSearchHTTP < LogStash::Outputs::Base
rescue EOFError
@logger.warn("EOF while reading response body from elasticsearch",
:host => @host, :port => @port)
return # abort this flush
raise
end
if response.status != 200
@logger.error("Error writing (bulk) to elasticsearch",
:response => response, :response_body => body,
:request_body => @queue.join("\n"))
return
raise
end
end # def post