fix: calculated inflight_count from in-flight batches (#14760) (#14766)

During stalled shutdowns while waiting for in-flight batches to complete,
our shutdown watcher emits helpful information about what work is in flight,
including the actual threads and plugins that are still executing.

Since ~6.3.0, the `inflight_count` metric in this log message has always
been `0`, in part because of two somewhat-overlapping bugs:

 - elastic/logstash#8987 and elastic/logstash#9056 (7.0, 6.3) changed
   the `inflight_batches` map provided by the queue read clients to index
   batches by native thread id, but pipeline reporter continued to
   attempt to extract by ruby thread object. Because it does not find
   the thread in the "batch map", it reports zero.
 - elastic/logstash#9111 (7.0, 6.3) changed the _value_ stored in
   the `inflight_batches` map provided by a new common queue read client
   from an object responding to `#size` to a java `QueueBatch` which
   does not respond to `size`. If our pipeline reporter had been able to
   look up the queue batch, it would have failed with a `NoMethodError`.

We resolve the issue by (1) extracting the batch from our "batch map" using
the native thread id and (2) safely extracting the value from a `QueueBatch`
before falling through to `Object#size` or 0.

(cherry picked from commit 4941c25f32)

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
This commit is contained in:
github-actions[bot] 2022-11-18 06:36:36 -08:00 committed by GitHub
parent 69ce6ebdb8
commit 3822b60890
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 83 additions and 8 deletions

View file

@ -30,10 +30,14 @@ shared_examples "a pipeline reporter" do |pipeline_setup|
let(:pipeline) { Kernel.send(pipeline_setup, config)}
let(:reporter) { pipeline.reporter }
before do
let(:do_setup_plugin_registry) do
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_call_original
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_call_original
end
before do
do_setup_plugin_registry
@pre_snapshot = reporter.snapshot
@ -50,7 +54,7 @@ shared_examples "a pipeline reporter" do |pipeline_setup|
end
it "should end with no stalled threads" do
expect(@pre_snapshot.stalling_threads_info).to eql([])
expect(@post_snapshot.stalling_threads_info).to eql([])
end
end
@ -82,6 +86,40 @@ shared_examples "a pipeline reporter" do |pipeline_setup|
it "should be zero after running" do
expect(@post_snapshot.inflight_count).to eql(0)
end
# We provide a hooked filter that captures a new reporter snapshot with each event.
# Since the event being processed is by-definition part of a batch that is in-flight,
# we expect all of the resulting reporter snapshots to have non-zero inflight_event-s
context "while running" do
let!(:report_queue) { Queue.new }
let(:hooked_dummy_filter_class) do
::LogStash::Filters::DummyFilter.with_hook do |event|
report_queue << reporter.snapshot
end
end
let(:hooked_dummy_filter_name) { hooked_dummy_filter_class.config_name }
let(:config) do
<<~EOCONFIG
input { generator { count => #{generator_count} } }
filter { #{hooked_dummy_filter_name} {} }
output { dummyoutput {} }
EOCONFIG
end
let(:do_setup_plugin_registry) do
super()
allow(LogStash::Plugin).to receive(:lookup).with("filter", hooked_dummy_filter_name)
.and_return(hooked_dummy_filter_class)
end
it 'captures inflight counts that are non-zero ' do
inflight_reports = Array.new(report_queue.size) { report_queue.pop }
expect(inflight_reports).to_not be_empty
expect(inflight_reports).to all(have_attributes(inflight_count: (a_value > 0)))
end
end
end
end

View file

@ -59,6 +59,18 @@ module LogStash
def filter(event)
# noop
end
##
# Returns a one-off subclass of the DummyFilter that
# executes the provided hook with each event it receives
def self.with_hook(&block)
Class.new(self) do
config_name "dummyfilter_#{__id__}"
define_method(:filter) do |event|
block.call(event)
end
end
end
end
end

View file

@ -27,6 +27,7 @@ import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.RubyThread;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
@ -160,6 +161,7 @@ public final class PipelineReporterExt extends RubyBasicObject {
final RubyArray result = context.runtime.newArray();
((Iterable<IRubyObject>) pipeline.callMethod(context, "worker_threads"))
.forEach(thread -> {
final long nativeThreadId = ((RubyThread) thread).getNativeThread().getId();
final RubyHash hash = RubyHash.newHash(context.runtime);
IRubyObject status = thread.callMethod(context, "status");
if (status.isNil()) {
@ -168,17 +170,35 @@ public final class PipelineReporterExt extends RubyBasicObject {
hash.op_aset(context, STATUS_KEY, status);
hash.op_aset(context, ALIVE_KEY, thread.callMethod(context, "alive?"));
hash.op_aset(context, INDEX_KEY, context.runtime.newFixnum(result.size()));
final IRubyObject batch = batchMap.op_aref(context, thread);
hash.op_aset(
context, INFLIGHT_COUNT_KEY,
batch.isNil() ?
context.runtime.newFixnum(0) : batch.callMethod(context, "size")
);
final IRubyObject batch = batchMap.op_aref(context, context.runtime.newFixnum(nativeThreadId));
hash.op_aset(context, INFLIGHT_COUNT_KEY, extractBatchSize(context, batch));
result.add(hash);
});
return result;
}
/**
* Attempts to safely extract the batch size from a wrapped {@link QueueBatch} or
* a ruby object responding to {@code size}
*
* @param context The Ruby {@code ThreadContext}
* @param batch a batch, which may be a wrapped {@link QueueBatch} or a ruby
* object that responds to `#size`
* @return the detected size, or zero.
*/
private IRubyObject extractBatchSize(final ThreadContext context, final IRubyObject batch) {
if (!batch.isNil()) {
if (QueueBatch.class.isAssignableFrom(batch.getJavaClass())) {
final int filteredSize = batch.toJava(QueueBatch.class).filteredSize();
return getRuntime().newFixnum(filteredSize);
}
if (batch.respondsTo("size")) {
return batch.callMethod(context, "size");
}
}
return context.runtime.newFixnum(0L);
}
@SuppressWarnings({"unchecked","rawtypes"})
private RubyArray outputInfo(final ThreadContext context) {
final RubyArray result = context.runtime.newArray();
@ -267,6 +287,11 @@ public final class PipelineReporterExt extends RubyBasicObject {
return data.op_aref(context, method);
}
@JRubyMethod(name = "respond_to_missing?")
public IRubyObject isRespondToMissing(final ThreadContext context, final IRubyObject method, final IRubyObject includePrivate) {
return context.tru;
}
@JRubyMethod(name = "format_threads_by_plugin")
@SuppressWarnings("unchecked")
public RubyHash formatThreadsByPlugin(final ThreadContext context) {