mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
This commit adds null guard to get the native thread when constructing pipeline report
Fix: #15300
(cherry picked from commit cd78558121
)
Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com>
This commit is contained in:
parent
d040e88653
commit
745225ec79
1 changed files with 11 additions and 3 deletions
|
@ -37,6 +37,7 @@ import org.logstash.RubyUtil;
|
||||||
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
|
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JRuby extension
|
* JRuby extension
|
||||||
|
@ -161,7 +162,7 @@ public final class PipelineReporterExt extends RubyBasicObject {
|
||||||
final RubyArray result = context.runtime.newArray();
|
final RubyArray result = context.runtime.newArray();
|
||||||
((Iterable<IRubyObject>) pipeline.callMethod(context, "worker_threads"))
|
((Iterable<IRubyObject>) pipeline.callMethod(context, "worker_threads"))
|
||||||
.forEach(thread -> {
|
.forEach(thread -> {
|
||||||
final long nativeThreadId = ((RubyThread) thread).getNativeThread().getId();
|
|
||||||
final RubyHash hash = RubyHash.newHash(context.runtime);
|
final RubyHash hash = RubyHash.newHash(context.runtime);
|
||||||
IRubyObject status = thread.callMethod(context, "status");
|
IRubyObject status = thread.callMethod(context, "status");
|
||||||
if (status.isNil()) {
|
if (status.isNil()) {
|
||||||
|
@ -170,8 +171,15 @@ public final class PipelineReporterExt extends RubyBasicObject {
|
||||||
hash.op_aset(context, STATUS_KEY, status);
|
hash.op_aset(context, STATUS_KEY, status);
|
||||||
hash.op_aset(context, ALIVE_KEY, thread.callMethod(context, "alive?"));
|
hash.op_aset(context, ALIVE_KEY, thread.callMethod(context, "alive?"));
|
||||||
hash.op_aset(context, INDEX_KEY, context.runtime.newFixnum(result.size()));
|
hash.op_aset(context, INDEX_KEY, context.runtime.newFixnum(result.size()));
|
||||||
final IRubyObject batch = batchMap.op_aref(context, context.runtime.newFixnum(nativeThreadId));
|
|
||||||
hash.op_aset(context, INFLIGHT_COUNT_KEY, extractBatchSize(context, batch));
|
IRubyObject batchSize = Optional.of((RubyThread) thread)
|
||||||
|
.map(RubyThread::getNativeThread)
|
||||||
|
.map(Thread::getId)
|
||||||
|
.map(id -> batchMap.op_aref(context, context.runtime.newFixnum(id)))
|
||||||
|
.map(batch -> extractBatchSize(context, batch))
|
||||||
|
.orElse(context.runtime.newFixnum(0L));
|
||||||
|
|
||||||
|
hash.op_aset(context, INFLIGHT_COUNT_KEY, batchSize);
|
||||||
result.add(hash);
|
result.add(hash);
|
||||||
});
|
});
|
||||||
return result;
|
return result;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue