mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
BUG: Fix incorrect type handling between Java pipeline and Ruby pipeline
Fixes #9671
This commit is contained in:
parent
9e4c8799df
commit
6352001177
2 changed files with 26 additions and 4 deletions
|
@ -6,12 +6,12 @@ require_relative "../support/mocks_classes"
|
||||||
|
|
||||||
#TODO: Figure out how to add more tests that actually cover inflight events
|
#TODO: Figure out how to add more tests that actually cover inflight events
|
||||||
#This will require some janky multithreading stuff
|
#This will require some janky multithreading stuff
|
||||||
describe LogStash::PipelineReporter do
|
shared_examples "a pipeline reporter" do |pipeline_setup|
|
||||||
let(:generator_count) { 5 }
|
let(:generator_count) { 5 }
|
||||||
let(:config) do
|
let(:config) do
|
||||||
"input { generator { count => #{generator_count} } } output { dummyoutput {} } "
|
"input { generator { count => #{generator_count} } } output { dummyoutput {} } "
|
||||||
end
|
end
|
||||||
let(:pipeline) { mock_pipeline_from_string(config)}
|
let(:pipeline) { Kernel.send(pipeline_setup, config)}
|
||||||
let(:reporter) { pipeline.reporter }
|
let(:reporter) { pipeline.reporter }
|
||||||
|
|
||||||
before do
|
before do
|
||||||
|
@ -29,6 +29,16 @@ describe LogStash::PipelineReporter do
|
||||||
pipeline.shutdown
|
pipeline.shutdown
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "stalling threads info" do
|
||||||
|
it "should start with no stalled threads" do
|
||||||
|
expect(@pre_snapshot.stalling_threads_info).to eql([])
|
||||||
|
end
|
||||||
|
|
||||||
|
it "should end with no stalled threads" do
|
||||||
|
expect(@pre_snapshot.stalling_threads_info).to eql([])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "events filtered" do
|
describe "events filtered" do
|
||||||
it "should start at zero" do
|
it "should start at zero" do
|
||||||
expect(@pre_snapshot.events_filtered).to eql(0)
|
expect(@pre_snapshot.events_filtered).to eql(0)
|
||||||
|
@ -59,3 +69,8 @@ describe LogStash::PipelineReporter do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe LogStash::PipelineReporter do
|
||||||
|
it_behaves_like "a pipeline reporter", :mock_pipeline_from_string
|
||||||
|
it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string
|
||||||
|
end
|
||||||
|
|
|
@ -155,7 +155,14 @@ public final class PipelineReporterExt extends RubyBasicObject {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
private RubyArray outputInfo(final ThreadContext context) {
|
private RubyArray outputInfo(final ThreadContext context) {
|
||||||
final RubyArray result = context.runtime.newArray();
|
final RubyArray result = context.runtime.newArray();
|
||||||
((Iterable<?>) pipeline.callMethod(context, "outputs")).forEach(output -> {
|
final IRubyObject outputs = pipeline.callMethod(context, "outputs");
|
||||||
|
final Iterable<IRubyObject> outputIterable;
|
||||||
|
if (outputs instanceof Iterable) {
|
||||||
|
outputIterable = (Iterable<IRubyObject>) outputs;
|
||||||
|
} else {
|
||||||
|
outputIterable = (Iterable<IRubyObject>) outputs.toJava(Iterable.class);
|
||||||
|
}
|
||||||
|
outputIterable.forEach(output -> {
|
||||||
final OutputDelegatorExt delegator = (OutputDelegatorExt) output;
|
final OutputDelegatorExt delegator = (OutputDelegatorExt) output;
|
||||||
final RubyHash hash = RubyHash.newHash(context.runtime);
|
final RubyHash hash = RubyHash.newHash(context.runtime);
|
||||||
hash.op_aset(context, TYPE_KEY, delegator.configName(context));
|
hash.op_aset(context, TYPE_KEY, delegator.configName(context));
|
||||||
|
@ -186,7 +193,7 @@ public final class PipelineReporterExt extends RubyBasicObject {
|
||||||
RubyUtil.RUBY.newString("inflight_count").newFrozen();
|
RubyUtil.RUBY.newString("inflight_count").newFrozen();
|
||||||
|
|
||||||
private static final RubyString STALLING_THREADS_KEY =
|
private static final RubyString STALLING_THREADS_KEY =
|
||||||
RubyUtil.RUBY.newString("stalling_thread_info").newFrozen();
|
RubyUtil.RUBY.newString("stalling_threads_info").newFrozen();
|
||||||
|
|
||||||
private static final RubyString PLUGIN_KEY =
|
private static final RubyString PLUGIN_KEY =
|
||||||
RubyUtil.RUBY.newString("plugin").newFrozen();
|
RubyUtil.RUBY.newString("plugin").newFrozen();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue