mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
6c4beadd29
commit
417afd1c5a
9 changed files with 281 additions and 150 deletions
|
@ -4,7 +4,6 @@ require "concurrent"
|
|||
require "logstash/filters/base"
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/outputs/base"
|
||||
require "logstash/shutdown_watcher"
|
||||
require "logstash/instrument/collector"
|
||||
require "logstash/queue_factory"
|
||||
require "logstash/compiler"
|
||||
|
|
|
@ -8,7 +8,6 @@ require "logstash/config/file"
|
|||
require "logstash/filters/base"
|
||||
require "logstash/inputs/base"
|
||||
require "logstash/outputs/base"
|
||||
require "logstash/shutdown_watcher"
|
||||
require "logstash/instrument/collector"
|
||||
require "logstash/filter_delegator"
|
||||
require "logstash/queue_factory"
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/pipeline_action/base"
|
||||
require "logstash/shutdown_watcher"
|
||||
require "logstash/converge_result"
|
||||
|
||||
module LogStash module PipelineAction
|
||||
|
|
|
@ -27,7 +27,6 @@ LogStash::Environment.load_locale!
|
|||
|
||||
require "logstash/agent"
|
||||
require "logstash/config/defaults"
|
||||
require "logstash/shutdown_watcher"
|
||||
require "logstash/patches/clamp"
|
||||
require "logstash/settings"
|
||||
require "logstash/version"
|
||||
|
|
|
@ -1,120 +1 @@
|
|||
# encoding: utf-8
|
||||
require "concurrent/atomic/atomic_fixnum"
|
||||
require "concurrent/atomic/atomic_boolean"
|
||||
|
||||
module LogStash
|
||||
class ShutdownWatcher
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
CHECK_EVERY = 1 # second
|
||||
REPORT_EVERY = 5 # checks
|
||||
ABORT_AFTER = 3 # stalled reports
|
||||
|
||||
attr_reader :cycle_period, :report_every, :abort_threshold
|
||||
|
||||
def initialize(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
|
||||
@pipeline = pipeline
|
||||
@cycle_period = cycle_period
|
||||
@report_every = report_every
|
||||
@abort_threshold = abort_threshold
|
||||
@reports = []
|
||||
@attempts_count = Concurrent::AtomicFixnum.new(0)
|
||||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
end
|
||||
|
||||
def self.unsafe_shutdown=(boolean)
|
||||
@unsafe_shutdown = boolean
|
||||
end
|
||||
|
||||
def self.unsafe_shutdown?
|
||||
@unsafe_shutdown
|
||||
end
|
||||
|
||||
def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
|
||||
controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
|
||||
Thread.new(controller) { |controller| controller.start }
|
||||
end
|
||||
|
||||
def logger
|
||||
self.class.logger
|
||||
end
|
||||
|
||||
def attempts_count
|
||||
@attempts_count.value
|
||||
end
|
||||
|
||||
def stop!
|
||||
@running.make_false
|
||||
end
|
||||
|
||||
def stopped?
|
||||
@running.false?
|
||||
end
|
||||
|
||||
def start
|
||||
sleep(@cycle_period)
|
||||
cycle_number = 0
|
||||
stalled_count = 0
|
||||
running!
|
||||
Stud.interval(@cycle_period) do
|
||||
@attempts_count.increment
|
||||
break if stopped?
|
||||
break unless @pipeline.thread.alive?
|
||||
@reports << pipeline_report_snapshot
|
||||
@reports.delete_at(0) if @reports.size > @report_every # expire old report
|
||||
if cycle_number == (@report_every - 1) # it's report time!
|
||||
logger.warn(@reports.last.to_s)
|
||||
|
||||
if shutdown_stalled?
|
||||
logger.error("The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.") if stalled_count == 0
|
||||
stalled_count += 1
|
||||
|
||||
if self.class.unsafe_shutdown? && @abort_threshold == stalled_count
|
||||
logger.fatal("Forcefully quitting logstash..")
|
||||
force_exit()
|
||||
break
|
||||
end
|
||||
else
|
||||
stalled_count = 0
|
||||
end
|
||||
end
|
||||
cycle_number = (cycle_number + 1) % @report_every
|
||||
end
|
||||
ensure
|
||||
stop!
|
||||
end
|
||||
|
||||
def pipeline_report_snapshot
|
||||
@pipeline.reporter.snapshot
|
||||
end
|
||||
|
||||
# A pipeline shutdown is stalled if
|
||||
# * at least REPORT_EVERY reports have been created
|
||||
# * the inflight event count is in monotonically increasing
|
||||
# * there are worker threads running which aren't blocked on SizedQueue pop/push
|
||||
# * the stalled thread list is constant in the previous REPORT_EVERY reports
|
||||
def shutdown_stalled?
|
||||
return false unless @reports.size == @report_every #
|
||||
# is stalled if inflight count is either constant or increasing
|
||||
stalled_event_count = @reports.each_cons(2).all? do |prev_report, next_report|
|
||||
prev_report.inflight_count <= next_report.inflight_count
|
||||
end
|
||||
if stalled_event_count
|
||||
@reports.each_cons(2).all? do |prev_report, next_report|
|
||||
prev_report.stalling_threads == next_report.stalling_threads
|
||||
end
|
||||
else
|
||||
false
|
||||
end
|
||||
end
|
||||
|
||||
def force_exit
|
||||
exit(-1)
|
||||
end
|
||||
|
||||
private
|
||||
def running!
|
||||
@running.make_true
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/shutdown_watcher"
|
||||
|
||||
describe LogStash::ShutdownWatcher do
|
||||
let(:check_every) { 0.01 }
|
||||
|
@ -24,32 +23,6 @@ describe LogStash::ShutdownWatcher do
|
|||
allow(reporter_snapshot).to receive(:stalling_threads) { { } }
|
||||
end
|
||||
|
||||
describe ".unsafe_shutdown = true" do
|
||||
let(:abort_threshold) { subject.abort_threshold }
|
||||
let(:report_every) { subject.report_every }
|
||||
|
||||
before :each do
|
||||
subject.class.unsafe_shutdown = true
|
||||
end
|
||||
|
||||
it "should force the shutdown" do
|
||||
expect(subject).to receive(:force_exit).once
|
||||
subject.start
|
||||
end
|
||||
|
||||
it "should do exactly \"abort_threshold\" stall checks" do
|
||||
allow(subject).to receive(:force_exit)
|
||||
expect(subject).to receive(:shutdown_stalled?).exactly(abort_threshold).times.and_call_original
|
||||
subject.start
|
||||
end
|
||||
|
||||
it "should do exactly \"abort_threshold\"*\"report_every\" stall checks" do
|
||||
allow(subject).to receive(:force_exit)
|
||||
expect(subject).to receive(:pipeline_report_snapshot).exactly(abort_threshold*report_every).times.and_call_original
|
||||
subject.start
|
||||
end
|
||||
end
|
||||
|
||||
describe ".unsafe_shutdown = false" do
|
||||
|
||||
before :each do
|
||||
|
|
|
@ -19,6 +19,7 @@ import org.logstash.execution.EventDispatcherExt;
|
|||
import org.logstash.execution.ExecutionContextExt;
|
||||
import org.logstash.execution.PipelineReporterExt;
|
||||
import org.logstash.execution.QueueReadClientBase;
|
||||
import org.logstash.execution.ShutdownWatcherExt;
|
||||
import org.logstash.ext.JRubyLogstashErrorsExt;
|
||||
import org.logstash.ext.JRubyWrappedWriteClientExt;
|
||||
import org.logstash.ext.JrubyAckedReadClientExt;
|
||||
|
@ -163,6 +164,8 @@ public final class RubyUtil {
|
|||
|
||||
public static final RubyClass PIPELINE_REPORTER_CLASS;
|
||||
|
||||
public static final RubyClass SHUTDOWN_WATCHER_CLASS;
|
||||
|
||||
public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS;
|
||||
|
||||
public static final RubyClass HOOKS_REGISTRY_CLASS;
|
||||
|
@ -196,6 +199,8 @@ public final class RubyUtil {
|
|||
PLUGIN_METRIC_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
|
||||
"PluginMetricFactory", RUBY.getObject(), PluginFactoryExt.Metrics::new
|
||||
);
|
||||
SHUTDOWN_WATCHER_CLASS =
|
||||
setupLogstashClass(ShutdownWatcherExt::new, ShutdownWatcherExt.class);
|
||||
PLUGIN_METRIC_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Metrics.class);
|
||||
EXECUTION_CONTEXT_FACTORY_CLASS.defineAnnotatedMethods(
|
||||
PluginFactoryExt.ExecutionContext.class
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyBasicObject;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyThread;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
|
||||
@JRubyClass(name = "ShutdownWatcher")
|
||||
public final class ShutdownWatcherExt extends RubyBasicObject {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(ShutdownWatcherExt.class);
|
||||
|
||||
private static final AtomicBoolean unsafeShutdown = new AtomicBoolean(false);
|
||||
|
||||
private final List<IRubyObject> reports = new ArrayList<>();
|
||||
|
||||
private final AtomicInteger attemptsCount = new AtomicInteger(0);
|
||||
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
|
||||
private long cyclePeriod = 1L;
|
||||
|
||||
private int reportEvery = 5;
|
||||
|
||||
private int abortThreshold = 3;
|
||||
|
||||
private IRubyObject pipeline;
|
||||
|
||||
@JRubyMethod(meta = true, required = 1, optional = 3)
|
||||
public static RubyThread start(final ThreadContext context, final IRubyObject recv, final IRubyObject[] args) {
|
||||
return new RubyThread(context.runtime, context.runtime.getThread(), () -> {
|
||||
try {
|
||||
new ShutdownWatcherExt(context.runtime, RubyUtil.SHUTDOWN_WATCHER_CLASS)
|
||||
.initialize(context, args).start(context);
|
||||
} catch (final InterruptedException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "unsafe_shutdown?", meta = true)
|
||||
public static IRubyObject isUnsafeShutdown(final ThreadContext context,
|
||||
final IRubyObject recv) {
|
||||
return unsafeShutdown.get() ? context.tru : context.fals;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "unsafe_shutdown=", meta = true)
|
||||
public static IRubyObject setUnsafeShutdown(final ThreadContext context,
|
||||
final IRubyObject recv, final IRubyObject arg) {
|
||||
unsafeShutdown.set(arg.isTrue());
|
||||
return context.nil;
|
||||
}
|
||||
|
||||
public ShutdownWatcherExt(final Ruby runtime, final RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod(required = 1, optional = 3)
|
||||
public ShutdownWatcherExt initialize(final ThreadContext context, final IRubyObject[] args) {
|
||||
pipeline = args[0];
|
||||
if (args.length >= 2) {
|
||||
cyclePeriod = args[1].convertToInteger().getLongValue();
|
||||
if (args.length >= 3) {
|
||||
reportEvery = args[2].convertToInteger().getIntValue();
|
||||
if (args.length >= 4) {
|
||||
abortThreshold = args[3].convertToInteger().getIntValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "pipeline_report_snapshot")
|
||||
public IRubyObject pipelineReportSnapshot(final ThreadContext context) {
|
||||
return pipeline.callMethod(context, "reporter").callMethod(context, "snapshot");
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "shutdown_stalled?")
|
||||
public IRubyObject shutdownStalled(final ThreadContext context) {
|
||||
if (reports.size() != reportEvery) {
|
||||
return context.fals;
|
||||
}
|
||||
final int[] inflightCounts = reports.stream().mapToInt(
|
||||
obj -> obj.callMethod(context, "inflight_count").convertToInteger().getIntValue()
|
||||
).toArray();
|
||||
boolean stalled = true;
|
||||
for (int i = 0; i < inflightCounts.length - 1; ++i) {
|
||||
if (inflightCounts[i] > inflightCounts[i + 1]) {
|
||||
stalled = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (stalled) {
|
||||
final IRubyObject[] stallingThreads = reports.stream().map(
|
||||
obj -> obj.callMethod(context, "stalling_threads")
|
||||
).toArray(IRubyObject[]::new);
|
||||
for (int i = 0; i < stallingThreads.length - 1; ++i) {
|
||||
if (!stallingThreads[i].op_equal(context, stallingThreads[i + 1]).isTrue()) {
|
||||
stalled = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return stalled ? context.tru : context.fals;
|
||||
}
|
||||
return context.fals;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "stop!")
|
||||
public IRubyObject stop(final ThreadContext context) {
|
||||
return running.compareAndSet(true, false) ? context.tru : context.fals;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "stopped?")
|
||||
public IRubyObject stopped(final ThreadContext context) {
|
||||
return running.get() ? context.fals : context.tru;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "attempts_count")
|
||||
public IRubyObject attemptsCount(final ThreadContext context) {
|
||||
return context.runtime.newFixnum(attemptsCount.get());
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject start(final ThreadContext context) throws InterruptedException {
|
||||
int cycleNumber = 0;
|
||||
int stalledCount = 0;
|
||||
running.set(true);
|
||||
try {
|
||||
while (true) {
|
||||
TimeUnit.SECONDS.sleep(cyclePeriod);
|
||||
attemptsCount.incrementAndGet();
|
||||
if (stopped(context).isTrue() ||
|
||||
!pipeline.callMethod(context, "thread")
|
||||
.callMethod(context, "alive?").isTrue()) {
|
||||
break;
|
||||
}
|
||||
reports.add(pipelineReportSnapshot(context));
|
||||
if (reports.size() > reportEvery) {
|
||||
reports.remove(0);
|
||||
}
|
||||
if (cycleNumber == reportEvery - 1) {
|
||||
LOGGER.warn(reports.get(reports.size() - 1).anyToString().asJavaString());
|
||||
if (shutdownStalled(context).isTrue()) {
|
||||
if (stalledCount == 0) {
|
||||
LOGGER.error(
|
||||
"The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information."
|
||||
);
|
||||
}
|
||||
++stalledCount;
|
||||
if (isUnsafeShutdown(context, null).isTrue() &&
|
||||
abortThreshold == stalledCount) {
|
||||
LOGGER.fatal("Forcefully quitting Logstash ...");
|
||||
forceExit(context);
|
||||
}
|
||||
} else {
|
||||
stalledCount = 0;
|
||||
}
|
||||
}
|
||||
cycleNumber = (cycleNumber + 1) % reportEvery;
|
||||
}
|
||||
return context.nil;
|
||||
} finally {
|
||||
stop(context);
|
||||
}
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "cycle_period")
|
||||
public IRubyObject cyclePeriod(final ThreadContext context) {
|
||||
return context.runtime.newFixnum(cyclePeriod);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "report_every")
|
||||
public IRubyObject reportEvery(final ThreadContext context) {
|
||||
return context.runtime.newFixnum(reportEvery);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "abort_threshold")
|
||||
public IRubyObject abortThreshold(final ThreadContext context) {
|
||||
return context.runtime.newFixnum(abortThreshold);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "force_exit")
|
||||
public IRubyObject forceExit(final ThreadContext context) {
|
||||
throw context.runtime.newSystemExit(-1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package org.logstash.execution;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.concurrent.NotThreadSafe;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.jruby.RubySystemExit;
|
||||
import org.jruby.exceptions.RaiseException;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.junit.Test;
|
||||
import org.logstash.RubyUtil;
|
||||
|
||||
/**
|
||||
* Tests for {@link ShutdownWatcherExt}.
|
||||
*/
|
||||
@NotThreadSafe
|
||||
public final class ShutdownWatcherExtTest {
|
||||
|
||||
@Test
|
||||
public void testShouldForceShutdown() throws InterruptedException {
|
||||
final ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
try {
|
||||
final Future<IRubyObject> res = exec.submit(() -> {
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
ShutdownWatcherExt.setUnsafeShutdown(context, null, context.tru);
|
||||
return new ShutdownWatcherExt(context.runtime, RubyUtil.SHUTDOWN_WATCHER_CLASS)
|
||||
.initialize(
|
||||
context, new IRubyObject[]{
|
||||
RubyUtil.RUBY.evalScriptlet(
|
||||
String.join(
|
||||
"\n",
|
||||
"pipeline = Object.new",
|
||||
"reporter = Object.new",
|
||||
"snapshot = Object.new",
|
||||
"inflight_count = java.util.concurrent.atomic.AtomicInteger.new",
|
||||
"snapshot.define_singleton_method(:inflight_count) do",
|
||||
"inflight_count.increment_and_get + 1",
|
||||
"end",
|
||||
"threads = {}",
|
||||
"snapshot.define_singleton_method(:stalling_threads) do",
|
||||
"threads",
|
||||
"end",
|
||||
"reporter.define_singleton_method(:snapshot) do",
|
||||
"snapshot",
|
||||
"end",
|
||||
"pipeline.define_singleton_method(:thread) do",
|
||||
"Thread.current",
|
||||
"end",
|
||||
"pipeline.define_singleton_method(:reporter) do",
|
||||
"reporter",
|
||||
"end",
|
||||
"pipeline"
|
||||
)
|
||||
),
|
||||
context.runtime.newFloat(0.01)
|
||||
}
|
||||
).start(context);
|
||||
});
|
||||
res.get();
|
||||
Assertions.fail("Shutdown watcher did not invoke system exit(-1)");
|
||||
} catch (final ExecutionException ex) {
|
||||
final RaiseException cause = (RaiseException) ex.getCause();
|
||||
Assertions.assertThat(cause.getException()).isInstanceOf(RubySystemExit.class);
|
||||
} finally {
|
||||
exec.shutdownNow();
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
ShutdownWatcherExt.setUnsafeShutdown(context, null, context.fals);
|
||||
if (!exec.awaitTermination(30L, TimeUnit.SECONDS)) {
|
||||
Assertions.fail("Failed to shut down shutdown watcher thread");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue