From 75411cfe0bd816e74e63f995d73f189d74bc9706 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 20 Aug 2020 14:53:51 -0400 Subject: [PATCH] ensure input plugin close is called upon termination or pipeline reload --- logstash-core/lib/logstash/java_pipeline.rb | 46 ++++++++++++------- logstash-core/lib/logstash/pipeline.rb | 46 ++++++++++++------- .../spec/logstash/java_pipeline_spec.rb | 6 ++- logstash-core/spec/logstash/pipeline_spec.rb | 5 +- 4 files changed, 68 insertions(+), 35 deletions(-) diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index 7e7ba3016..c20138e36 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -378,32 +378,33 @@ module LogStash; class JavaPipeline < JavaBasePipeline plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e if plugin.stop? - @logger.debug("Input plugin raised exception during shutdown, ignoring it.", - default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace)) + @logger.debug( + "Input plugin raised exception during shutdown, ignoring it.", + default_logging_keys( + :plugin => plugin.class.config_name, + :exception => e.message, + :backtrace => e.backtrace)) return end # otherwise, report error and restart - @logger.error(I18n.t("logstash.pipeline.worker-error-debug", - default_logging_keys( - :plugin => plugin.inspect, - :error => e.message, - :exception => e.class, - :stacktrace => e.backtrace.join("\n")))) + @logger.error(I18n.t( + "logstash.pipeline.worker-error-debug", + default_logging_keys( + :plugin => plugin.inspect, + :error => e.message, + :exception => e.class, + :stacktrace => e.backtrace.join("\n")))) # Assuming the failure that caused this exception is transient, # let's sleep for a bit and execute #run again sleep(1) - begin - plugin.do_close - rescue => close_exception - @logger.debug("Input plugin raised exception while closing, ignoring", - default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message, - :backtrace => close_exception.backtrace)) - end + close_plugin_and_ignore(plugin) retry + ensure + close_plugin_and_ignore(plugin) end - end # def inputworker + end # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread @@ -519,6 +520,19 @@ module LogStash; class JavaPipeline < JavaBasePipeline private + def close_plugin_and_ignore(plugin) + begin + plugin.do_close + rescue => e + @logger.warn( + "plugin raised exception while closing, ignoring", + default_logging_keys( + :plugin => plugin.class.config_name, + :exception => e.message, + :backtrace => e.backtrace)) + end + end + # @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception def init_worker_loop begin diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index ccec00bab..54089d518 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -474,32 +474,33 @@ module LogStash; class Pipeline < BasePipeline plugin.run(wrapped_write_client(plugin.id.to_sym)) rescue => e if plugin.stop? - @logger.debug("Input plugin raised exception during shutdown, ignoring it.", - default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace)) + @logger.debug( + "Input plugin raised exception during shutdown, ignoring it.", + default_logging_keys( + :plugin => plugin.class.config_name, + :exception => e.message, + :backtrace => e.backtrace)) return end # otherwise, report error and restart - @logger.error(I18n.t("logstash.pipeline.worker-error-debug", - default_logging_keys( - :plugin => plugin.inspect, - :error => e.message, - :exception => e.class, - :stacktrace => e.backtrace.join("\n")))) + @logger.error(I18n.t( + "logstash.pipeline.worker-error-debug", + default_logging_keys( + :plugin => plugin.inspect, + :error => e.message, + :exception => e.class, + :stacktrace => e.backtrace.join("\n")))) # Assuming the failure that caused this exception is transient, # let's sleep for a bit and execute #run again sleep(1) - begin - plugin.do_close - rescue => close_exception - @logger.debug("Input plugin raised exception while closing, ignoring", - default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message, - :backtrace => close_exception.backtrace)) - end + close_plugin_and_ignore(plugin) retry + ensure + close_plugin_and_ignore(plugin) end - end # def inputworker + end # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread @@ -654,6 +655,19 @@ module LogStash; class Pipeline < BasePipeline private + def close_plugin_and_ignore(plugin) + begin + plugin.do_close + rescue => e + @logger.warn( + "plugin raised exception while closing, ignoring", + default_logging_keys( + :plugin => plugin.class.config_name, + :exception => e.message, + :backtrace => e.backtrace)) + end + end + def maybe_setup_out_plugins if @outputs_registered.make_true register_plugins(@outputs) diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index 69f5ceab9..ac7f0f209 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -405,12 +405,14 @@ describe LogStash::JavaPipeline do eos } - context "output close" do + context "input and output close" do let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) } let(:output) { pipeline.outputs.first } + let(:input) { pipeline.inputs.first } - it "should call close of output without output-workers" do + it "should call close of input and output without output-workers" do expect(output).to receive(:do_close).once + expect(input).to receive(:do_close).once pipeline.start pipeline.shutdown end diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index a4561e237..01f8d411f 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -371,18 +371,21 @@ describe LogStash::Pipeline do eos } - context "output close" do + context "inputs and output close" do let(:pipeline) { mock_pipeline_from_string(test_config_without_output_workers) } let(:output) { pipeline.outputs.first } + let(:input) { pipeline.inputs.first } before do allow(output).to receive(:do_close) + allow(input).to receive(:do_close) end it "should call close of output without output-workers" do pipeline.start pipeline.shutdown expect(output).to have_received(:do_close).once + expect(input).to have_received(:do_close).once end end end