diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 4f2537b3f..4e23dc8a7 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -281,12 +281,12 @@ class LogStash::Agent return unless pipeline.is_a?(LogStash::Pipeline) return if pipeline.ready? @logger.debug("starting pipeline", :id => id) - Thread.new do + t = Thread.new do LogStash::Util.set_thread_name("pipeline.#{id}") begin pipeline.run rescue => e - @reload_metric.namespace([id.to_sym, :reloads]) do |n| + @reload_metric.namespace([id.to_sym, :reloads]).tap do |n| n.increment(:failures) n.gauge(:last_error, { :message => e.message, :backtrace => e.backtrace}) n.gauge(:last_failure_timestamp, LogStash::Timestamp.now) @@ -294,7 +294,15 @@ class LogStash::Agent @logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace) end end - sleep 0.01 until pipeline.ready? + while true do + if !t.alive? + return false + elsif pipeline.ready? + return true + else + sleep 0.01 + end + end end def stop_pipeline(id) @@ -326,10 +334,11 @@ class LogStash::Agent stop_pipeline(pipeline_id) reset_pipeline_metrics(pipeline_id) @pipelines[pipeline_id] = new_pipeline - start_pipeline(pipeline_id) - @reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n| - n.increment(:successes) - n.gauge(:last_success_timestamp, LogStash::Timestamp.now) + if start_pipeline(pipeline_id) # pipeline started successfuly + @reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n| + n.increment(:successes) + n.gauge(:last_success_timestamp, LogStash::Timestamp.now) + end end end diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 21b925c39..bafce331e 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -546,5 +546,43 @@ describe LogStash::Agent do expect(value).to be > 0 end end + + context "when reloading a config that raises exception on pipeline.run" do + let(:new_config) { "input { generator { count => 10000 } }" } + let(:new_config_generator_counter) { 500 } + + class BrokenGenerator < LogStash::Inputs::Generator + def register + raise ArgumentError + end + end + + before :each do + + allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(BrokenGenerator) + + File.open(config_path, "w") do |f| + f.write(new_config) + f.fsync + end + + end + + it "does not increase the successful reload count" do + expect { subject.send(:"reload_pipeline!", "main") }.to_not change { + snapshot = subject.metric.collector.snapshot_metric + reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads] + reload_metrics[:successes].value + } + end + + it "increases the failured reload count" do + expect { subject.send(:"reload_pipeline!", "main") }.to change { + snapshot = subject.metric.collector.snapshot_metric + reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads] + reload_metrics[:failures].value + }.by(1) + end + end end end