diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index a0b4f582c..c44a81f72 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -378,30 +378,23 @@ module LogStash; class JavaPipeline < JavaBasePipeline # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread - # @param before_stop [Proc] code block called before performing stop operation on input plugins - def shutdown(&before_stop) + # and will block until the pipeline has successfully shut down. + def shutdown + return if finished_execution? # shutdown can only start once the pipeline has completed its startup. # avoid potential race condition between the startup sequence and this # shutdown method which can be called from another thread at any time sleep(0.1) while !ready? # TODO: should we also check against calling shutdown multiple times concurrently? - - before_stop.call if block_given? - stop_inputs - - # We make this call blocking, so we know for sure when the method return the shutdown is - # stopped - wait_for_workers + wait_for_shutdown clear_pipeline_metrics @logger.info("Pipeline terminated", "pipeline.id" => pipeline_id) end # def shutdown - def wait_for_workers - @logger.debug("Closing inputs", default_logging_keys) - @worker_threads.map(&:join) - @logger.debug("Worker closed", default_logging_keys) + def wait_for_shutdown + ShutdownWatcher.new(self).start end def stop_inputs diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 3afc80677..30d1663d1 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -501,30 +501,22 @@ module LogStash; class Pipeline < BasePipeline # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread - # @param before_stop [Proc] code block called before performing stop operation on input plugins - def shutdown(&before_stop) + # and will block until the pipeline has successfully shut down. + def shutdown + return if finished_execution? + # shutdown can only start once the pipeline has completed its startup. # avoid potential race condition between the startup sequence and this # shutdown method which can be called from another thread at any time sleep(0.1) while !ready? - # TODO: should we also check against calling shutdown multiple times concurrently? - - before_stop.call if block_given? - stop_inputs - - # We make this call blocking, so we know for sure when the method return the shutdown is - # stopped - wait_for_workers + wait_for_shutdown clear_pipeline_metrics end # def shutdown - def wait_for_workers - @worker_threads.each do |t| - t.join - @logger.debug("Worker terminated", default_logging_keys(:thread => t.inspect)) - end + def wait_for_shutdown + ShutdownWatcher.new(self).start end def stop_inputs diff --git a/logstash-core/lib/logstash/pipeline_action/reload.rb b/logstash-core/lib/logstash/pipeline_action/reload.rb index 4dc9bc77c..56e430914 100644 --- a/logstash-core/lib/logstash/pipeline_action/reload.rb +++ b/logstash-core/lib/logstash/pipeline_action/reload.rb @@ -66,8 +66,7 @@ module LogStash module PipelineAction # the block must emit a success boolean value # First shutdown old pipeline - old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) } - old_pipeline.thread.join + old_pipeline.shutdown # Then create a new pipeline new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent) diff --git a/logstash-core/lib/logstash/pipeline_action/stop.rb b/logstash-core/lib/logstash/pipeline_action/stop.rb index 50d6367fa..be631e54c 100644 --- a/logstash-core/lib/logstash/pipeline_action/stop.rb +++ b/logstash-core/lib/logstash/pipeline_action/stop.rb @@ -27,8 +27,7 @@ module LogStash module PipelineAction def execute(agent, pipelines_registry) pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline| - pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) } - pipeline.thread.join + pipeline.shutdown end LogStash::ConvergeResult::SuccessfulAction.new diff --git a/logstash-core/spec/logstash/java_pipeline_spec.rb b/logstash-core/spec/logstash/java_pipeline_spec.rb index a82a63460..430d51a65 100644 --- a/logstash-core/spec/logstash/java_pipeline_spec.rb +++ b/logstash-core/spec/logstash/java_pipeline_spec.rb @@ -212,22 +212,15 @@ describe LogStash::JavaPipeline do Thread.abort_on_exception = true pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } Timeout.timeout(timeout) do - sleep(0.1) until pipeline.ready? - end - Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do - wait(3).for do - # give us a bit of time to flush the events - # puts("*****" + output.events.map{|e| e.message}.to_s) - output.events.map{|e| e.get("message")}.include?("END") - end.to be_truthy + pipeline.start + sleep 0.01 until pipeline.stopped? end + pipeline.shutdown + expect(output.events.map{|e| e.get("message")}).to include("END") expect(output.events.size).to eq(2) expect(output.events[0].get("tags")).to eq(["notdropped"]) expect(output.events[1].get("tags")).to eq(["notdropped"]) - pipeline.shutdown - t.join Thread.abort_on_exception = abort_on_exception_state end @@ -289,7 +282,7 @@ describe LogStash::JavaPipeline do pipeline = mock_java_pipeline_from_string(test_config_with_filters) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]})) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(safe_thread_count) pipeline.shutdown end @@ -302,7 +295,7 @@ describe LogStash::JavaPipeline do " not work with multiple worker threads" pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]})) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(override_thread_count) pipeline.shutdown end @@ -329,7 +322,7 @@ describe LogStash::JavaPipeline do it "starts multiple filter threads" do skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245") pipeline = mock_java_pipeline_from_string(test_config_with_filters) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(worker_thread_count) pipeline.shutdown end @@ -374,18 +367,10 @@ describe LogStash::JavaPipeline do let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) } let(:output) { pipeline.outputs.first } - before do - allow(output).to receive(:do_close) - end - - after do - pipeline.shutdown - end - it "should call close of output without output-workers" do - pipeline.run - - expect(output).to have_received(:do_close).once + expect(output).to receive(:do_close).once + pipeline.start + pipeline.shutdown end end end @@ -433,7 +418,7 @@ describe LogStash::JavaPipeline do expect(pipeline).to receive(:transition_to_running).ordered.and_call_original expect(pipeline).to receive(:start_flusher).ordered.and_call_original - pipeline.run + pipeline.start pipeline.shutdown end end @@ -604,15 +589,10 @@ describe LogStash::JavaPipeline do allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) allow(logger).to receive(:warn) - # pipeline must be first called outside the thread context because it lazily initialize and will create a - # race condition if called in the thread - p = pipeline - t = Thread.new { p.run } - Timeout.timeout(timeout) do - sleep(0.1) until pipeline.ready? - end + pipeline.start + # the only input auto-closes, so the pipeline will automatically stop. + sleep(0.01) until pipeline.stopped? pipeline.shutdown - t.join end it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do @@ -778,9 +758,8 @@ describe LogStash::JavaPipeline do it "flush periodically" do Thread.abort_on_exception = true pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } Timeout.timeout(timeout) do - sleep(0.1) until pipeline.ready? + pipeline.start end Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do wait(10).for do @@ -792,8 +771,6 @@ describe LogStash::JavaPipeline do expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) pipeline.shutdown - - t.join end end @@ -824,9 +801,8 @@ describe LogStash::JavaPipeline do it "flush periodically without error on nil flush return" do Thread.abort_on_exception = true pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } Timeout.timeout(timeout) do - sleep(0.1) until pipeline.ready? + pipeline.start end Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do wait(10).for do @@ -838,8 +814,6 @@ describe LogStash::JavaPipeline do expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) pipeline.shutdown - - t.join end end @@ -877,9 +851,8 @@ describe LogStash::JavaPipeline do it "flush periodically" do Thread.abort_on_exception = true pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } Timeout.timeout(timeout) do - sleep(0.1) until pipeline.ready? + pipeline.start end Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do wait(11).for do @@ -891,8 +864,6 @@ describe LogStash::JavaPipeline do expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) pipeline.shutdown - - t.join end end @@ -931,7 +902,7 @@ describe LogStash::JavaPipeline do it "correctly distributes events" do pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj) - pipeline.run + pipeline.start pipeline.shutdown expect(output.events.size).to eq(60) expect(output.events.count {|e| e.get("cloned") == "cloned"}).to eq(30) @@ -961,7 +932,7 @@ describe LogStash::JavaPipeline do end it "return when the pipeline started working" do - subject.run + subject.start expect(subject.started_at).to be < Time.now subject.shutdown end @@ -989,18 +960,12 @@ describe LogStash::JavaPipeline do context "when the pipeline is started" do it "return the duration in milliseconds" do - # subject must be first call outside the thread context because of lazy initialization - s = subject - t = Thread.new { s.run } Timeout.timeout(timeout) do - sleep(0.1) until subject.ready? - end - Timeout.timeout(timeout) do - sleep(0.1) + subject.start end + sleep(0.1) expect(subject.uptime).to be > 0 subject.shutdown - t.join end end end @@ -1042,12 +1007,6 @@ describe LogStash::JavaPipeline do end let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) } let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store } - let(:pipeline_thread) do - # subject has to be called for the first time outside the thread because it will create a race condition - # with the subject.ready? call since subject is lazily initialized - s = subject - Thread.new { s.run } - end before :each do allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput) @@ -1056,9 +1015,8 @@ describe LogStash::JavaPipeline do allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(LogStash::Filters::DummyFilter) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) - pipeline_thread Timeout.timeout(timeout) do - sleep(0.1) until subject.ready? + subject.start end # make sure we have received all the generated events @@ -1072,7 +1030,6 @@ describe LogStash::JavaPipeline do after :each do subject.shutdown - pipeline_thread.join end context "global metric" do diff --git a/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb b/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb index 03fdaeabc..15ae5459b 100644 --- a/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb +++ b/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb @@ -92,13 +92,13 @@ describe LogStash::Pipeline do it "retrieves proper pipeline-level DLQ writer" do expect_any_instance_of(org.logstash.common.io.DeadLetterQueueWriter).to receive(:close).and_call_original - subject.run + subject.start + subject.shutdown dlq_path = java.nio.file.Paths.get(pipeline_settings_obj.get("path.dead_letter_queue"), pipeline_id) dlq_reader = org.logstash.common.io.DeadLetterQueueReader.new(dlq_path) entry = dlq_reader.pollEntry(40) expect(entry).to_not be_nil expect(entry.reason).to eq("my reason") - subject.shutdown end end @@ -109,7 +109,7 @@ describe LogStash::Pipeline do it "does not write to the DLQ" do expect(LogStash::Util::DummyDeadLetterQueueWriter).to receive(:new).and_call_original expect_any_instance_of(LogStash::Util::DummyDeadLetterQueueWriter).to receive(:close).and_call_original - subject.run + subject.start dlq_path = java.nio.file.Paths.get(pipeline_settings_obj.get("path.dead_letter_queue"), pipeline_id) expect(java.nio.file.Files.exists(dlq_path)).to eq(false) subject.shutdown diff --git a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb index 3c18176cc..e82da96fb 100644 --- a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb +++ b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb @@ -100,13 +100,6 @@ describe LogStash::Pipeline do let(:max_bytes) { 1024 * 1024 * 1024 } # 1 gb let(:times) { [] } - let(:pipeline_thread) do - # subject has to be called for the first time outside the thread because it will create a race condition - # with the subject.ready? call since subject is lazily initialized - s = subject - Thread.new { s.run } - end - let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") } before :each do @@ -126,7 +119,7 @@ describe LogStash::Pipeline do pipeline_settings_obj.set("queue.max_bytes", max_bytes) times.push(Time.now.to_f) - pipeline_thread + subject.start sleep(0.1) until subject.ready? # make sure we have received all the generated events @@ -139,7 +132,6 @@ describe LogStash::Pipeline do after :each do subject.shutdown - pipeline_thread.join # Dir.rm_rf(this_queue_folder) end diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb index 54a865d59..e2590a94d 100644 --- a/logstash-core/spec/logstash/pipeline_reporter_spec.rb +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -37,12 +37,11 @@ shared_examples "a pipeline reporter" do |pipeline_setup| @pre_snapshot = reporter.snapshot - pipeline.run - @post_snapshot = reporter.snapshot - end - - after do + pipeline.start + # wait for stopped? so the input can produce all events + sleep 0.01 until pipeline.stopped? pipeline.shutdown + @post_snapshot = reporter.snapshot end describe "stalling threads info" do @@ -87,6 +86,10 @@ shared_examples "a pipeline reporter" do |pipeline_setup| end describe LogStash::PipelineReporter do - it_behaves_like "a pipeline reporter", :mock_pipeline_from_string - it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string + context "with ruby execution" do + it_behaves_like "a pipeline reporter", :mock_pipeline_from_string + end + context "with java execution" do + it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string + end end diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index d1058e762..2187b68a2 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -210,7 +210,7 @@ describe LogStash::Pipeline do Thread.abort_on_exception = true pipeline = mock_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } + pipeline.start Timeout.timeout(timeout) do sleep(0.1) until pipeline.ready? end @@ -225,7 +225,6 @@ describe LogStash::Pipeline do expect(output.events[0].get("tags")).to eq(["notdropped"]) expect(output.events[1].get("tags")).to eq(["notdropped"]) pipeline.shutdown - t.join Thread.abort_on_exception = abort_on_exception_state end @@ -305,7 +304,7 @@ describe LogStash::Pipeline do pipeline = mock_pipeline_from_string(test_config_with_filters) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]})) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(safe_thread_count) pipeline.shutdown end @@ -318,7 +317,7 @@ describe LogStash::Pipeline do " not work with multiple worker threads" pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]})) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(override_thread_count) pipeline.shutdown end @@ -345,7 +344,7 @@ describe LogStash::Pipeline do it "starts multiple filter threads" do skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245") pipeline = mock_pipeline_from_string(test_config_with_filters) - pipeline.run + pipeline.start expect(pipeline.worker_threads.size).to eq(worker_thread_count) pipeline.shutdown end @@ -399,7 +398,7 @@ describe LogStash::Pipeline do end it "should call close of output without output-workers" do - pipeline.run + pipeline.start expect(output).to have_received(:do_close).once end @@ -446,7 +445,7 @@ describe LogStash::Pipeline do expect(pipeline).to receive(:transition_to_running).ordered.and_call_original expect(pipeline).to receive(:start_flusher).ordered.and_call_original - pipeline.run + pipeline.start pipeline.shutdown end end @@ -493,13 +492,11 @@ describe LogStash::Pipeline do # pipeline must be first called outside the thread context because it lazily initialize and will create a # race condition if called in the thread - p = pipeline - t = Thread.new { p.run } + pipeline.start Timeout.timeout(timeout) do sleep(0.1) until pipeline.ready? end pipeline.shutdown - t.join end it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do @@ -694,7 +691,7 @@ describe LogStash::Pipeline do it "flush periodically" do Thread.abort_on_exception = true pipeline = mock_pipeline_from_string(config, pipeline_settings_obj) - t = Thread.new { pipeline.run } + pipeline.start Timeout.timeout(timeout) do sleep(0.1) until pipeline.ready? end @@ -708,8 +705,6 @@ describe LogStash::Pipeline do expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true) pipeline.shutdown - - t.join end end @@ -772,7 +767,7 @@ describe LogStash::Pipeline do end it "return when the pipeline started working" do - subject.run + subject.start expect(subject.started_at).to be < Time.now subject.shutdown end @@ -800,9 +795,7 @@ describe LogStash::Pipeline do context "when the pipeline is started" do it "return the duration in milliseconds" do - # subject must be first call outside the thread context because of lazy initialization - s = subject - t = Thread.new { s.run } + subject.start Timeout.timeout(timeout) do sleep(0.1) until subject.ready? end @@ -811,7 +804,6 @@ describe LogStash::Pipeline do end expect(subject.uptime).to be > 0 subject.shutdown - t.join end end end @@ -853,12 +845,6 @@ describe LogStash::Pipeline do end let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) } let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store } - let(:pipeline_thread) do - # subject has to be called for the first time outside the thread because it will create a race condition - # with the subject.ready? call since subject is lazily initialized - s = subject - Thread.new { s.run } - end before :each do allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput) @@ -867,7 +853,7 @@ describe LogStash::Pipeline do allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(LogStash::Filters::DummyFilter) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) - pipeline_thread + subject.start Timeout.timeout(timeout) do sleep(0.1) until subject.ready? end @@ -883,7 +869,6 @@ describe LogStash::Pipeline do after :each do subject.shutdown - pipeline_thread.join end context "global metric" do diff --git a/logstash-core/spec/logstash/shutdown_watcher_spec.rb b/logstash-core/spec/logstash/shutdown_watcher_spec.rb index b563a798e..3196117f5 100644 --- a/logstash-core/spec/logstash/shutdown_watcher_spec.rb +++ b/logstash-core/spec/logstash/shutdown_watcher_spec.rb @@ -27,7 +27,7 @@ describe LogStash::ShutdownWatcher do before :each do allow(pipeline).to receive(:reporter).and_return(reporter) - allow(pipeline).to receive(:thread).and_return(Thread.current) + allow(pipeline).to receive(:finished_execution?).and_return(false) allow(reporter).to receive(:snapshot).and_return(reporter_snapshot) allow(reporter_snapshot).to receive(:o_simple_hash).and_return({}) end diff --git a/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java b/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java index adbafc4aa..12930b51a 100644 --- a/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java @@ -60,18 +60,6 @@ public final class ShutdownWatcherExt extends RubyBasicObject { private IRubyObject pipeline; - @JRubyMethod(meta = true, required = 1, optional = 3) - public static RubyThread start(final ThreadContext context, final IRubyObject recv, final IRubyObject[] args) { - return new RubyThread(context.runtime, context.runtime.getThread(), () -> { - try { - new ShutdownWatcherExt(context.runtime, RubyUtil.SHUTDOWN_WATCHER_CLASS) - .initialize(context, args).start(context); - } catch (final InterruptedException ex) { - throw new IllegalStateException(ex); - } - }); - } - @JRubyMethod(name = "unsafe_shutdown?", meta = true) public static IRubyObject isUnsafeShutdown(final ThreadContext context, final IRubyObject recv) { @@ -164,8 +152,7 @@ public final class ShutdownWatcherExt extends RubyBasicObject { TimeUnit.SECONDS.sleep(cyclePeriod); attemptsCount.incrementAndGet(); if (stopped(context).isTrue() || - !pipeline.callMethod(context, "thread") - .callMethod(context, "alive?").isTrue()) { + pipeline.callMethod(context, "finished_execution?").isTrue()) { break; } reports.add(pipelineReportSnapshot(context)); diff --git a/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java b/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java index a48c8a8cb..8a22820b0 100644 --- a/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java +++ b/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java @@ -70,6 +70,9 @@ public final class ShutdownWatcherExtTest { "pipeline.define_singleton_method(:thread) do", "Thread.current", "end", + "pipeline.define_singleton_method(:finished_execution?) do", + "false", + "end", "pipeline.define_singleton_method(:reporter) do", "reporter", "end",