mark reload as failure if pipeline.run aborts

currently a reload is only marked as a failured if it fails the classic
config test check, where we check for parameter names, existing plugins, etc.

This change waits for the pipeline to transition to running before
marking the reload as success, otherwise it is a failure.

fixes #6195

Fixes #6196
This commit is contained in:
Joao Duarte 2016-11-04 14:21:18 +00:00 committed by João Duarte
parent 286cfdf784
commit 4e75d4dc60
2 changed files with 54 additions and 7 deletions

View file

@ -281,12 +281,12 @@ class LogStash::Agent
return unless pipeline.is_a?(LogStash::Pipeline) return unless pipeline.is_a?(LogStash::Pipeline)
return if pipeline.ready? return if pipeline.ready?
@logger.debug("starting pipeline", :id => id) @logger.debug("starting pipeline", :id => id)
Thread.new do t = Thread.new do
LogStash::Util.set_thread_name("pipeline.#{id}") LogStash::Util.set_thread_name("pipeline.#{id}")
begin begin
pipeline.run pipeline.run
rescue => e rescue => e
@reload_metric.namespace([id.to_sym, :reloads]) do |n| @reload_metric.namespace([id.to_sym, :reloads]).tap do |n|
n.increment(:failures) n.increment(:failures)
n.gauge(:last_error, { :message => e.message, :backtrace => e.backtrace}) n.gauge(:last_error, { :message => e.message, :backtrace => e.backtrace})
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now) n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
@ -294,7 +294,15 @@ class LogStash::Agent
@logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace) @logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace)
end end
end end
sleep 0.01 until pipeline.ready? while true do
if !t.alive?
return false
elsif pipeline.ready?
return true
else
sleep 0.01
end
end
end end
def stop_pipeline(id) def stop_pipeline(id)
@ -326,12 +334,13 @@ class LogStash::Agent
stop_pipeline(pipeline_id) stop_pipeline(pipeline_id)
reset_pipeline_metrics(pipeline_id) reset_pipeline_metrics(pipeline_id)
@pipelines[pipeline_id] = new_pipeline @pipelines[pipeline_id] = new_pipeline
start_pipeline(pipeline_id) if start_pipeline(pipeline_id) # pipeline started successfuly
@reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n| @reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n|
n.increment(:successes) n.increment(:successes)
n.gauge(:last_success_timestamp, LogStash::Timestamp.now) n.gauge(:last_success_timestamp, LogStash::Timestamp.now)
end end
end end
end
def clean_state? def clean_state?
@pipelines.empty? @pipelines.empty?

View file

@ -546,5 +546,43 @@ describe LogStash::Agent do
expect(value).to be > 0 expect(value).to be > 0
end end
end end
context "when reloading a config that raises exception on pipeline.run" do
let(:new_config) { "input { generator { count => 10000 } }" }
let(:new_config_generator_counter) { 500 }
class BrokenGenerator < LogStash::Inputs::Generator
def register
raise ArgumentError
end
end
before :each do
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(BrokenGenerator)
File.open(config_path, "w") do |f|
f.write(new_config)
f.fsync
end
end
it "does not increase the successful reload count" do
expect { subject.send(:"reload_pipeline!", "main") }.to_not change {
snapshot = subject.metric.collector.snapshot_metric
reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads]
reload_metrics[:successes].value
}
end
it "increases the failured reload count" do
expect { subject.send(:"reload_pipeline!", "main") }.to change {
snapshot = subject.metric.collector.snapshot_metric
reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads]
reload_metrics[:failures].value
}.by(1)
end
end
end end
end end