ensure input plugin close is called upon termination or pipeline reload

This commit is contained in:
Colin Surprenant 2020-08-20 14:53:51 -04:00
parent 4ef0204098
commit 75411cfe0b
4 changed files with 68 additions and 35 deletions

View file

@ -378,13 +378,18 @@ module LogStash; class JavaPipeline < JavaBasePipeline
plugin.run(wrapped_write_client(plugin.id.to_sym)) plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e rescue => e
if plugin.stop? if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.", @logger.debug(
default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace)) "Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
return return
end end
# otherwise, report error and restart # otherwise, report error and restart
@logger.error(I18n.t("logstash.pipeline.worker-error-debug", @logger.error(I18n.t(
"logstash.pipeline.worker-error-debug",
default_logging_keys( default_logging_keys(
:plugin => plugin.inspect, :plugin => plugin.inspect,
:error => e.message, :error => e.message,
@ -394,16 +399,12 @@ module LogStash; class JavaPipeline < JavaBasePipeline
# Assuming the failure that caused this exception is transient, # Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again # let's sleep for a bit and execute #run again
sleep(1) sleep(1)
begin close_plugin_and_ignore(plugin)
plugin.do_close
rescue => close_exception
@logger.debug("Input plugin raised exception while closing, ignoring",
default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message,
:backtrace => close_exception.backtrace))
end
retry retry
ensure
close_plugin_and_ignore(plugin)
end
end end
end # def inputworker
# initiate the pipeline shutdown sequence # initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread # this method is intended to be called from outside the pipeline thread
@ -519,6 +520,19 @@ module LogStash; class JavaPipeline < JavaBasePipeline
private private
def close_plugin_and_ignore(plugin)
begin
plugin.do_close
rescue => e
@logger.warn(
"plugin raised exception while closing, ignoring",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
end
end
# @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception # @return [WorkerLoop] a new WorkerLoop instance or nil upon construction exception
def init_worker_loop def init_worker_loop
begin begin

View file

@ -474,13 +474,18 @@ module LogStash; class Pipeline < BasePipeline
plugin.run(wrapped_write_client(plugin.id.to_sym)) plugin.run(wrapped_write_client(plugin.id.to_sym))
rescue => e rescue => e
if plugin.stop? if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.", @logger.debug(
default_logging_keys(:plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace)) "Input plugin raised exception during shutdown, ignoring it.",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
return return
end end
# otherwise, report error and restart # otherwise, report error and restart
@logger.error(I18n.t("logstash.pipeline.worker-error-debug", @logger.error(I18n.t(
"logstash.pipeline.worker-error-debug",
default_logging_keys( default_logging_keys(
:plugin => plugin.inspect, :plugin => plugin.inspect,
:error => e.message, :error => e.message,
@ -490,16 +495,12 @@ module LogStash; class Pipeline < BasePipeline
# Assuming the failure that caused this exception is transient, # Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again # let's sleep for a bit and execute #run again
sleep(1) sleep(1)
begin close_plugin_and_ignore(plugin)
plugin.do_close
rescue => close_exception
@logger.debug("Input plugin raised exception while closing, ignoring",
default_logging_keys(:plugin => plugin.class.config_name, :exception => close_exception.message,
:backtrace => close_exception.backtrace))
end
retry retry
ensure
close_plugin_and_ignore(plugin)
end
end end
end # def inputworker
# initiate the pipeline shutdown sequence # initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread # this method is intended to be called from outside the pipeline thread
@ -654,6 +655,19 @@ module LogStash; class Pipeline < BasePipeline
private private
def close_plugin_and_ignore(plugin)
begin
plugin.do_close
rescue => e
@logger.warn(
"plugin raised exception while closing, ignoring",
default_logging_keys(
:plugin => plugin.class.config_name,
:exception => e.message,
:backtrace => e.backtrace))
end
end
def maybe_setup_out_plugins def maybe_setup_out_plugins
if @outputs_registered.make_true if @outputs_registered.make_true
register_plugins(@outputs) register_plugins(@outputs)

View file

@ -405,12 +405,14 @@ describe LogStash::JavaPipeline do
eos eos
} }
context "output close" do context "input and output close" do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) } let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first } let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }
it "should call close of output without output-workers" do it "should call close of input and output without output-workers" do
expect(output).to receive(:do_close).once expect(output).to receive(:do_close).once
expect(input).to receive(:do_close).once
pipeline.start pipeline.start
pipeline.shutdown pipeline.shutdown
end end

View file

@ -371,18 +371,21 @@ describe LogStash::Pipeline do
eos eos
} }
context "output close" do context "inputs and output close" do
let(:pipeline) { mock_pipeline_from_string(test_config_without_output_workers) } let(:pipeline) { mock_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first } let(:output) { pipeline.outputs.first }
let(:input) { pipeline.inputs.first }
before do before do
allow(output).to receive(:do_close) allow(output).to receive(:do_close)
allow(input).to receive(:do_close)
end end
it "should call close of output without output-workers" do it "should call close of output without output-workers" do
pipeline.start pipeline.start
pipeline.shutdown pipeline.shutdown
expect(output).to have_received(:do_close).once expect(output).to have_received(:do_close).once
expect(input).to have_received(:do_close).once
end end
end end
end end