From 64a7a710940f000eeef318c6d759c9c51ed65013 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Tue, 10 May 2022 15:29:40 +0100 Subject: [PATCH] Hide Shutdown Watcher stall message when PQ draining (#13934) (#14084) Hide shutdown stall message when queue is draining Fixed: #9544 Co-authored-by: Andrea Selva (cherry picked from commit 41cb3d368003808cef4fb926a8b1da3f9d790d9c) --- logstash-core/lib/logstash/java_pipeline.rb | 11 ++- .../lib/logstash/worker_loop_thread.rb | 30 ++++++ .../spec/logstash/shutdown_watcher_spec.rb | 1 + .../execution/ShutdownWatcherExt.java | 21 ++-- .../org/logstash/execution/WorkerLoop.java | 2 +- .../execution/ShutdownWatcherExtTest.java | 96 ++++++++++++------- .../shutdown_watcher_ext_pipeline_template.rb | 30 ++++++ 7 files changed, 145 insertions(+), 46 deletions(-) create mode 100644 logstash-core/lib/logstash/worker_loop_thread.rb create mode 100644 logstash-core/src/test/resources/shutdown_watcher_ext_pipeline_template.rb diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index bc10c2f66..cddd9aee0 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -24,6 +24,7 @@ require "logstash/outputs/base" require "logstash/instrument/collector" require "logstash/compiler" require "logstash/config/lir_serializer" +require "logstash/worker_loop_thread" module LogStash; class JavaPipeline < JavaBasePipeline include LogStash::Util::Loggable @@ -292,7 +293,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline # Once all WorkerLoop have been initialized run them in separate threads worker_loops.each_with_index do |worker_loop, t| - thread = Thread.new do + thread = WorkerLoopThread.new(worker_loop) do Util.set_thread_name("[#{pipeline_id}]>worker#{t}") ThreadContext.put("pipeline.id", pipeline_id) begin @@ -548,6 +549,14 @@ module LogStash; class JavaPipeline < JavaBasePipeline } end + def shutdown_requested? + @shutdownRequested.get + end + + def worker_threads_draining? + @worker_threads.any? {|t| t.worker_loop.draining? } + end + private def close_plugin_and_ignore(plugin) diff --git a/logstash-core/lib/logstash/worker_loop_thread.rb b/logstash-core/lib/logstash/worker_loop_thread.rb new file mode 100644 index 000000000..aa3b6dfcd --- /dev/null +++ b/logstash-core/lib/logstash/worker_loop_thread.rb @@ -0,0 +1,30 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "thread" + +module LogStash + class WorkerLoopThread < Thread + attr_reader :worker_loop + + def initialize(worker_loop) + super + @worker_loop = worker_loop + end + + end +end diff --git a/logstash-core/spec/logstash/shutdown_watcher_spec.rb b/logstash-core/spec/logstash/shutdown_watcher_spec.rb index 3196117f5..eefeaf6c1 100644 --- a/logstash-core/spec/logstash/shutdown_watcher_spec.rb +++ b/logstash-core/spec/logstash/shutdown_watcher_spec.rb @@ -30,6 +30,7 @@ describe LogStash::ShutdownWatcher do allow(pipeline).to receive(:finished_execution?).and_return(false) allow(reporter).to receive(:snapshot).and_return(reporter_snapshot) allow(reporter_snapshot).to receive(:o_simple_hash).and_return({}) + allow(pipeline).to receive(:worker_threads_draining?).and_return(false) end context "when pipeline is stalled" do diff --git a/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java b/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java index 3ac84ec4f..fbb145476 100644 --- a/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/ShutdownWatcherExt.java @@ -154,8 +154,7 @@ public final class ShutdownWatcherExt extends RubyBasicObject { while (true) { TimeUnit.SECONDS.sleep(cyclePeriod); attemptsCount.incrementAndGet(); - if (stopped(context).isTrue() || - pipeline.callMethod(context, "finished_execution?").isTrue()) { + if (stopped(context).isTrue() || pipeline.callMethod(context, "finished_execution?").isTrue()) { break; } reports.add(pipelineReportSnapshot(context)); @@ -163,17 +162,21 @@ public final class ShutdownWatcherExt extends RubyBasicObject { reports.remove(0); } if (cycleNumber == reportEvery - 1) { - LOGGER.warn(reports.get(reports.size() - 1).callMethod(context, "to_s") - .asJavaString()); + boolean isPqDraining = pipeline.callMethod(context, "worker_threads_draining?").isTrue(); + + if (!isPqDraining) { + LOGGER.warn(reports.get(reports.size() - 1).callMethod(context, "to_s").asJavaString()); + } + if (shutdownStalled(context).isTrue()) { if (stalledCount == 0) { - LOGGER.error( - "The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information." - ); + LOGGER.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information."); + if (isPqDraining) { + LOGGER.info("The queue is draining before shutdown."); + } } ++stalledCount; - if (isUnsafeShutdown(context, null).isTrue() && - abortThreshold == stalledCount) { + if (isUnsafeShutdown(context, null).isTrue() && abortThreshold == stalledCount) { LOGGER.fatal("Forcefully quitting Logstash ..."); forceExit(context); } diff --git a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java index 121f0fb57..b60080671 100644 --- a/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java +++ b/logstash-core/src/main/java/org/logstash/execution/WorkerLoop.java @@ -106,7 +106,7 @@ public final class WorkerLoop implements Runnable { } } - private boolean isDraining() { + public boolean isDraining() { return drainQueue && !readClient.isEmpty(); } } diff --git a/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java b/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java index 8a22820b0..c4def6e07 100644 --- a/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java +++ b/logstash-core/src/test/java/org/logstash/execution/ShutdownWatcherExtTest.java @@ -20,68 +20,87 @@ package org.logstash.execution; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.logging.log4j.junit.LoggerContextRule; +import org.apache.logging.log4j.test.appender.ListAppender; import org.assertj.core.api.Assertions; import org.jruby.RubySystemExit; import org.jruby.exceptions.RaiseException; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; import org.logstash.RubyUtil; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + /** * Tests for {@link ShutdownWatcherExt}. */ @NotThreadSafe public final class ShutdownWatcherExtTest { + private static final String CONFIG = "log4j2-test1.xml"; + private static final Path PIPELINE_TEMPLATE = Paths.get("./src/test/resources/shutdown_watcher_ext_pipeline_template.rb").toAbsolutePath(); + private ListAppender appender; + + @ClassRule + public static LoggerContextRule CTX = new LoggerContextRule(CONFIG); + + @Before + public void setup() { + appender = CTX.getListAppender("EventLogger").clear(); + } + @Test - public void testShouldForceShutdown() throws InterruptedException { + public void pipelineWithUnsafeShutdownShouldForceShutdown() throws InterruptedException, IOException { + String pipeline = resolvedPipeline(false); + watcherShutdownStallingPipeline(pipeline); + + // non drain pipeline should print stall msg + boolean printStalling = appender.getMessages().stream().anyMatch((msg) -> msg.contains("stalling")); + assertTrue(printStalling); + } + + + @Test + public void pipelineWithDrainShouldNotPrintStallMsg() throws InterruptedException, IOException { + String pipeline = resolvedPipeline(true); + watcherShutdownStallingPipeline(pipeline); + + boolean printStalling = appender.getMessages().stream().anyMatch((msg) -> msg.contains("stalling")); + assertFalse(printStalling); + } + + private void watcherShutdownStallingPipeline(String rubyScript) throws InterruptedException { final ExecutorService exec = Executors.newSingleThreadExecutor(); try { final Future res = exec.submit(() -> { final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); ShutdownWatcherExt.setUnsafeShutdown(context, null, context.tru); return new ShutdownWatcherExt(context.runtime, RubyUtil.SHUTDOWN_WATCHER_CLASS) - .initialize( - context, new IRubyObject[]{ - RubyUtil.RUBY.evalScriptlet( - String.join( - "\n", - "pipeline = Object.new", - "reporter = Object.new", - "snapshot = Object.new", - "inflight_count = java.util.concurrent.atomic.AtomicInteger.new", - "snapshot.define_singleton_method(:inflight_count) do", - "inflight_count.increment_and_get + 1", - "end", - "threads = {}", - "snapshot.define_singleton_method(:stalling_threads) do", - "threads", - "end", - "reporter.define_singleton_method(:snapshot) do", - "snapshot", - "end", - "pipeline.define_singleton_method(:thread) do", - "Thread.current", - "end", - "pipeline.define_singleton_method(:finished_execution?) do", - "false", - "end", - "pipeline.define_singleton_method(:reporter) do", - "reporter", - "end", - "pipeline" - ) - ), - context.runtime.newFloat(0.01) - } - ).start(context); + .initialize( + context, new IRubyObject[]{ + RubyUtil.RUBY.evalScriptlet(rubyScript), + context.runtime.newFloat(0.01) + } + ).start(context); }); res.get(); Assertions.fail("Shutdown watcher did not invoke system exit(-1)"); @@ -96,6 +115,13 @@ public final class ShutdownWatcherExtTest { Assertions.fail("Failed to shut down shutdown watcher thread"); } } + } + private static String getPipelineTemplate() throws IOException { + return new String(Files.readAllBytes(PIPELINE_TEMPLATE)); + } + + private static String resolvedPipeline(Boolean isDraining) throws IOException { + return getPipelineTemplate().replace("%{value_placeholder}", isDraining.toString()); } } diff --git a/logstash-core/src/test/resources/shutdown_watcher_ext_pipeline_template.rb b/logstash-core/src/test/resources/shutdown_watcher_ext_pipeline_template.rb new file mode 100644 index 000000000..6e7cb792a --- /dev/null +++ b/logstash-core/src/test/resources/shutdown_watcher_ext_pipeline_template.rb @@ -0,0 +1,30 @@ +pipeline = Object.new +reporter = Object.new +snapshot = Object.new +inflight_count = java.util.concurrent.atomic.AtomicInteger.new +snapshot.define_singleton_method(:inflight_count) do + inflight_count.increment_and_get + 1 +end +threads = {} +snapshot.define_singleton_method(:stalling_threads) do + threads +end +snapshot.define_singleton_method(:to_s) do + "inflight_count=>" + inflight_count.get.to_s + ", stalling_threads_info=>{...}" +end +reporter.define_singleton_method(:snapshot) do + snapshot +end +pipeline.define_singleton_method(:thread) do + Thread.current +end +pipeline.define_singleton_method(:finished_execution?) do + false +end +pipeline.define_singleton_method(:reporter) do + reporter +end +pipeline.define_singleton_method(:worker_threads_draining?) do + %{value_placeholder} +end +pipeline \ No newline at end of file