remove need for extra ShutdownWatcher thread

The creation of a Ruby thread from Java seems to be a trigger
for jruby/jruby#6207.

Pipeline#shutdown now blocks on the ShutdownWatcher#start, which will wait for
pipeline.finished_execution? to be true.
This removes the need for the pattern:
  `pipeline.shutdown { block } && pipeline.thread.join`
And can be replaced with just `pipeline.shutdown`

To avoid having `shutdown` blocked waiting for ready? when pipeline crashes too quickly,
this method returns immediately if finished_execution? is true.

Most uses of pipeline#run have also been replaced by pipeline#start
since the latter will block until the pipeline is ready, again avoiding
the pattern:
  `pipeline.run && sleep 0.1 until pipeline.ready?`

Pipeline tests have been changed according to these two changes.

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
This commit is contained in:
Joao Duarte 2020-06-05 15:50:54 +01:00 committed by João Duarte
parent bf7041fa28
commit 29d7dcef94
12 changed files with 67 additions and 157 deletions

View file

@ -378,30 +378,23 @@ module LogStash; class JavaPipeline < JavaBasePipeline
# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
# @param before_stop [Proc] code block called before performing stop operation on input plugins
def shutdown(&before_stop)
# and will block until the pipeline has successfully shut down.
def shutdown
return if finished_execution?
# shutdown can only start once the pipeline has completed its startup.
# avoid potential race condition between the startup sequence and this
# shutdown method which can be called from another thread at any time
sleep(0.1) while !ready?
# TODO: should we also check against calling shutdown multiple times concurrently?
before_stop.call if block_given?
stop_inputs
# We make this call blocking, so we know for sure when the method return the shutdown is
# stopped
wait_for_workers
wait_for_shutdown
clear_pipeline_metrics
@logger.info("Pipeline terminated", "pipeline.id" => pipeline_id)
end # def shutdown
def wait_for_workers
@logger.debug("Closing inputs", default_logging_keys)
@worker_threads.map(&:join)
@logger.debug("Worker closed", default_logging_keys)
def wait_for_shutdown
ShutdownWatcher.new(self).start
end
def stop_inputs

View file

@ -501,30 +501,22 @@ module LogStash; class Pipeline < BasePipeline
# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
# @param before_stop [Proc] code block called before performing stop operation on input plugins
def shutdown(&before_stop)
# and will block until the pipeline has successfully shut down.
def shutdown
return if finished_execution?
# shutdown can only start once the pipeline has completed its startup.
# avoid potential race condition between the startup sequence and this
# shutdown method which can be called from another thread at any time
sleep(0.1) while !ready?
# TODO: should we also check against calling shutdown multiple times concurrently?
before_stop.call if block_given?
stop_inputs
# We make this call blocking, so we know for sure when the method return the shutdown is
# stopped
wait_for_workers
wait_for_shutdown
clear_pipeline_metrics
end # def shutdown
def wait_for_workers
@worker_threads.each do |t|
t.join
@logger.debug("Worker terminated", default_logging_keys(:thread => t.inspect))
end
def wait_for_shutdown
ShutdownWatcher.new(self).start
end
def stop_inputs

View file

@ -66,8 +66,7 @@ module LogStash module PipelineAction
# the block must emit a success boolean value
# First shutdown old pipeline
old_pipeline.shutdown { LogStash::ShutdownWatcher.start(old_pipeline) }
old_pipeline.thread.join
old_pipeline.shutdown
# Then create a new pipeline
new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)

View file

@ -27,8 +27,7 @@ module LogStash module PipelineAction
def execute(agent, pipelines_registry)
pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline|
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
pipeline.thread.join
pipeline.shutdown
end
LogStash::ConvergeResult::SuccessfulAction.new

View file

@ -212,22 +212,15 @@ describe LogStash::JavaPipeline do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(3).for do
# give us a bit of time to flush the events
# puts("*****" + output.events.map{|e| e.message}.to_s)
output.events.map{|e| e.get("message")}.include?("END")
end.to be_truthy
pipeline.start
sleep 0.01 until pipeline.stopped?
end
pipeline.shutdown
expect(output.events.map{|e| e.get("message")}).to include("END")
expect(output.events.size).to eq(2)
expect(output.events[0].get("tags")).to eq(["notdropped"])
expect(output.events[1].get("tags")).to eq(["notdropped"])
pipeline.shutdown
t.join
Thread.abort_on_exception = abort_on_exception_state
end
@ -289,7 +282,7 @@ describe LogStash::JavaPipeline do
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
expect(pipeline.logger).to receive(:warn).with(msg,
hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(safe_thread_count)
pipeline.shutdown
end
@ -302,7 +295,7 @@ describe LogStash::JavaPipeline do
" not work with multiple worker threads"
pipeline = mock_java_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(override_thread_count)
pipeline.shutdown
end
@ -329,7 +322,7 @@ describe LogStash::JavaPipeline do
it "starts multiple filter threads" do
skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245")
pipeline = mock_java_pipeline_from_string(test_config_with_filters)
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(worker_thread_count)
pipeline.shutdown
end
@ -374,18 +367,10 @@ describe LogStash::JavaPipeline do
let(:pipeline) { mock_java_pipeline_from_string(test_config_without_output_workers) }
let(:output) { pipeline.outputs.first }
before do
allow(output).to receive(:do_close)
end
after do
pipeline.shutdown
end
it "should call close of output without output-workers" do
pipeline.run
expect(output).to have_received(:do_close).once
expect(output).to receive(:do_close).once
pipeline.start
pipeline.shutdown
end
end
end
@ -433,7 +418,7 @@ describe LogStash::JavaPipeline do
expect(pipeline).to receive(:transition_to_running).ordered.and_call_original
expect(pipeline).to receive(:start_flusher).ordered.and_call_original
pipeline.run
pipeline.start
pipeline.shutdown
end
end
@ -604,15 +589,10 @@ describe LogStash::JavaPipeline do
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(logger).to receive(:warn)
# pipeline must be first called outside the thread context because it lazily initialize and will create a
# race condition if called in the thread
p = pipeline
t = Thread.new { p.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
pipeline.start
# the only input auto-closes, so the pipeline will automatically stop.
sleep(0.01) until pipeline.stopped?
pipeline.shutdown
t.join
end
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
@ -778,9 +758,8 @@ describe LogStash::JavaPipeline do
it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
pipeline.start
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
@ -792,8 +771,6 @@ describe LogStash::JavaPipeline do
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
pipeline.shutdown
t.join
end
end
@ -824,9 +801,8 @@ describe LogStash::JavaPipeline do
it "flush periodically without error on nil flush return" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
pipeline.start
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(10).for do
@ -838,8 +814,6 @@ describe LogStash::JavaPipeline do
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
pipeline.shutdown
t.join
end
end
@ -877,9 +851,8 @@ describe LogStash::JavaPipeline do
it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
pipeline.start
end
Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
wait(11).for do
@ -891,8 +864,6 @@ describe LogStash::JavaPipeline do
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
pipeline.shutdown
t.join
end
end
@ -931,7 +902,7 @@ describe LogStash::JavaPipeline do
it "correctly distributes events" do
pipeline = mock_java_pipeline_from_string(config, pipeline_settings_obj)
pipeline.run
pipeline.start
pipeline.shutdown
expect(output.events.size).to eq(60)
expect(output.events.count {|e| e.get("cloned") == "cloned"}).to eq(30)
@ -961,7 +932,7 @@ describe LogStash::JavaPipeline do
end
it "return when the pipeline started working" do
subject.run
subject.start
expect(subject.started_at).to be < Time.now
subject.shutdown
end
@ -989,18 +960,12 @@ describe LogStash::JavaPipeline do
context "when the pipeline is started" do
it "return the duration in milliseconds" do
# subject must be first call outside the thread context because of lazy initialization
s = subject
t = Thread.new { s.run }
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
end
Timeout.timeout(timeout) do
sleep(0.1)
subject.start
end
sleep(0.1)
expect(subject.uptime).to be > 0
subject.shutdown
t.join
end
end
end
@ -1042,12 +1007,6 @@ describe LogStash::JavaPipeline do
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
@ -1056,9 +1015,8 @@ describe LogStash::JavaPipeline do
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(LogStash::Filters::DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
pipeline_thread
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
subject.start
end
# make sure we have received all the generated events
@ -1072,7 +1030,6 @@ describe LogStash::JavaPipeline do
after :each do
subject.shutdown
pipeline_thread.join
end
context "global metric" do

View file

@ -92,13 +92,13 @@ describe LogStash::Pipeline do
it "retrieves proper pipeline-level DLQ writer" do
expect_any_instance_of(org.logstash.common.io.DeadLetterQueueWriter).to receive(:close).and_call_original
subject.run
subject.start
subject.shutdown
dlq_path = java.nio.file.Paths.get(pipeline_settings_obj.get("path.dead_letter_queue"), pipeline_id)
dlq_reader = org.logstash.common.io.DeadLetterQueueReader.new(dlq_path)
entry = dlq_reader.pollEntry(40)
expect(entry).to_not be_nil
expect(entry.reason).to eq("my reason")
subject.shutdown
end
end
@ -109,7 +109,7 @@ describe LogStash::Pipeline do
it "does not write to the DLQ" do
expect(LogStash::Util::DummyDeadLetterQueueWriter).to receive(:new).and_call_original
expect_any_instance_of(LogStash::Util::DummyDeadLetterQueueWriter).to receive(:close).and_call_original
subject.run
subject.start
dlq_path = java.nio.file.Paths.get(pipeline_settings_obj.get("path.dead_letter_queue"), pipeline_id)
expect(java.nio.file.Files.exists(dlq_path)).to eq(false)
subject.shutdown

View file

@ -100,13 +100,6 @@ describe LogStash::Pipeline do
let(:max_bytes) { 1024 * 1024 * 1024 } # 1 gb
let(:times) { [] }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
before :each do
@ -126,7 +119,7 @@ describe LogStash::Pipeline do
pipeline_settings_obj.set("queue.max_bytes", max_bytes)
times.push(Time.now.to_f)
pipeline_thread
subject.start
sleep(0.1) until subject.ready?
# make sure we have received all the generated events
@ -139,7 +132,6 @@ describe LogStash::Pipeline do
after :each do
subject.shutdown
pipeline_thread.join
# Dir.rm_rf(this_queue_folder)
end

View file

@ -37,12 +37,11 @@ shared_examples "a pipeline reporter" do |pipeline_setup|
@pre_snapshot = reporter.snapshot
pipeline.run
@post_snapshot = reporter.snapshot
end
after do
pipeline.start
# wait for stopped? so the input can produce all events
sleep 0.01 until pipeline.stopped?
pipeline.shutdown
@post_snapshot = reporter.snapshot
end
describe "stalling threads info" do
@ -87,6 +86,10 @@ shared_examples "a pipeline reporter" do |pipeline_setup|
end
describe LogStash::PipelineReporter do
it_behaves_like "a pipeline reporter", :mock_pipeline_from_string
it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string
context "with ruby execution" do
it_behaves_like "a pipeline reporter", :mock_pipeline_from_string
end
context "with java execution" do
it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string
end
end

View file

@ -210,7 +210,7 @@ describe LogStash::Pipeline do
Thread.abort_on_exception = true
pipeline = mock_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
pipeline.start
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
@ -225,7 +225,6 @@ describe LogStash::Pipeline do
expect(output.events[0].get("tags")).to eq(["notdropped"])
expect(output.events[1].get("tags")).to eq(["notdropped"])
pipeline.shutdown
t.join
Thread.abort_on_exception = abort_on_exception_state
end
@ -305,7 +304,7 @@ describe LogStash::Pipeline do
pipeline = mock_pipeline_from_string(test_config_with_filters)
expect(pipeline.logger).to receive(:warn).with(msg,
hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(safe_thread_count)
pipeline.shutdown
end
@ -318,7 +317,7 @@ describe LogStash::Pipeline do
" not work with multiple worker threads"
pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj)
expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]}))
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(override_thread_count)
pipeline.shutdown
end
@ -345,7 +344,7 @@ describe LogStash::Pipeline do
it "starts multiple filter threads" do
skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245")
pipeline = mock_pipeline_from_string(test_config_with_filters)
pipeline.run
pipeline.start
expect(pipeline.worker_threads.size).to eq(worker_thread_count)
pipeline.shutdown
end
@ -399,7 +398,7 @@ describe LogStash::Pipeline do
end
it "should call close of output without output-workers" do
pipeline.run
pipeline.start
expect(output).to have_received(:do_close).once
end
@ -446,7 +445,7 @@ describe LogStash::Pipeline do
expect(pipeline).to receive(:transition_to_running).ordered.and_call_original
expect(pipeline).to receive(:start_flusher).ordered.and_call_original
pipeline.run
pipeline.start
pipeline.shutdown
end
end
@ -493,13 +492,11 @@ describe LogStash::Pipeline do
# pipeline must be first called outside the thread context because it lazily initialize and will create a
# race condition if called in the thread
p = pipeline
t = Thread.new { p.run }
pipeline.start
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
pipeline.shutdown
t.join
end
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
@ -694,7 +691,7 @@ describe LogStash::Pipeline do
it "flush periodically" do
Thread.abort_on_exception = true
pipeline = mock_pipeline_from_string(config, pipeline_settings_obj)
t = Thread.new { pipeline.run }
pipeline.start
Timeout.timeout(timeout) do
sleep(0.1) until pipeline.ready?
end
@ -708,8 +705,6 @@ describe LogStash::Pipeline do
expect(output.events.any? {|e| e.get("message") == "dummy_flush"}).to eq(true)
pipeline.shutdown
t.join
end
end
@ -772,7 +767,7 @@ describe LogStash::Pipeline do
end
it "return when the pipeline started working" do
subject.run
subject.start
expect(subject.started_at).to be < Time.now
subject.shutdown
end
@ -800,9 +795,7 @@ describe LogStash::Pipeline do
context "when the pipeline is started" do
it "return the duration in milliseconds" do
# subject must be first call outside the thread context because of lazy initialization
s = subject
t = Thread.new { s.run }
subject.start
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
end
@ -811,7 +804,6 @@ describe LogStash::Pipeline do
end
expect(subject.uptime).to be > 0
subject.shutdown
t.join
end
end
end
@ -853,12 +845,6 @@ describe LogStash::Pipeline do
end
let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
let(:pipeline_thread) do
# subject has to be called for the first time outside the thread because it will create a race condition
# with the subject.ready? call since subject is lazily initialized
s = subject
Thread.new { s.run }
end
before :each do
allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
@ -867,7 +853,7 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("filter", "dummyfilter").and_return(LogStash::Filters::DummyFilter)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
pipeline_thread
subject.start
Timeout.timeout(timeout) do
sleep(0.1) until subject.ready?
end
@ -883,7 +869,6 @@ describe LogStash::Pipeline do
after :each do
subject.shutdown
pipeline_thread.join
end
context "global metric" do

View file

@ -27,7 +27,7 @@ describe LogStash::ShutdownWatcher do
before :each do
allow(pipeline).to receive(:reporter).and_return(reporter)
allow(pipeline).to receive(:thread).and_return(Thread.current)
allow(pipeline).to receive(:finished_execution?).and_return(false)
allow(reporter).to receive(:snapshot).and_return(reporter_snapshot)
allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})
end

View file

@ -60,18 +60,6 @@ public final class ShutdownWatcherExt extends RubyBasicObject {
private IRubyObject pipeline;
@JRubyMethod(meta = true, required = 1, optional = 3)
public static RubyThread start(final ThreadContext context, final IRubyObject recv, final IRubyObject[] args) {
return new RubyThread(context.runtime, context.runtime.getThread(), () -> {
try {
new ShutdownWatcherExt(context.runtime, RubyUtil.SHUTDOWN_WATCHER_CLASS)
.initialize(context, args).start(context);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
});
}
@JRubyMethod(name = "unsafe_shutdown?", meta = true)
public static IRubyObject isUnsafeShutdown(final ThreadContext context,
final IRubyObject recv) {
@ -164,8 +152,7 @@ public final class ShutdownWatcherExt extends RubyBasicObject {
TimeUnit.SECONDS.sleep(cyclePeriod);
attemptsCount.incrementAndGet();
if (stopped(context).isTrue() ||
!pipeline.callMethod(context, "thread")
.callMethod(context, "alive?").isTrue()) {
pipeline.callMethod(context, "finished_execution?").isTrue()) {
break;
}
reports.add(pipelineReportSnapshot(context));

View file

@ -70,6 +70,9 @@ public final class ShutdownWatcherExtTest {
"pipeline.define_singleton_method(:thread) do",
"Thread.current",
"end",
"pipeline.define_singleton_method(:finished_execution?) do",
"false",
"end",
"pipeline.define_singleton_method(:reporter) do",
"reporter",
"end",