diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb index e73e1a314..2753be480 100644 --- a/logstash-core/spec/logstash/pipeline_reporter_spec.rb +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -6,12 +6,12 @@ require_relative "../support/mocks_classes" #TODO: Figure out how to add more tests that actually cover inflight events #This will require some janky multithreading stuff -describe LogStash::PipelineReporter do +shared_examples "a pipeline reporter" do |pipeline_setup| let(:generator_count) { 5 } let(:config) do "input { generator { count => #{generator_count} } } output { dummyoutput {} } " end - let(:pipeline) { mock_pipeline_from_string(config)} + let(:pipeline) { Kernel.send(pipeline_setup, config)} let(:reporter) { pipeline.reporter } before do @@ -29,6 +29,16 @@ describe LogStash::PipelineReporter do pipeline.shutdown end + describe "stalling threads info" do + it "should start with no stalled threads" do + expect(@pre_snapshot.stalling_threads_info).to eql([]) + end + + it "should end with no stalled threads" do + expect(@pre_snapshot.stalling_threads_info).to eql([]) + end + end + describe "events filtered" do it "should start at zero" do expect(@pre_snapshot.events_filtered).to eql(0) @@ -59,3 +69,8 @@ describe LogStash::PipelineReporter do end end end + +describe LogStash::PipelineReporter do + it_behaves_like "a pipeline reporter", :mock_pipeline_from_string + it_behaves_like "a pipeline reporter", :mock_java_pipeline_from_string +end diff --git a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java index 45240e4de..9ef3b0e7d 100644 --- a/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/PipelineReporterExt.java @@ -155,7 +155,14 @@ public final class PipelineReporterExt extends RubyBasicObject { @SuppressWarnings("unchecked") private RubyArray outputInfo(final ThreadContext context) { final RubyArray result = context.runtime.newArray(); - ((Iterable) pipeline.callMethod(context, "outputs")).forEach(output -> { + final IRubyObject outputs = pipeline.callMethod(context, "outputs"); + final Iterable outputIterable; + if (outputs instanceof Iterable) { + outputIterable = (Iterable) outputs; + } else { + outputIterable = (Iterable) outputs.toJava(Iterable.class); + } + outputIterable.forEach(output -> { final OutputDelegatorExt delegator = (OutputDelegatorExt) output; final RubyHash hash = RubyHash.newHash(context.runtime); hash.op_aset(context, TYPE_KEY, delegator.configName(context)); @@ -186,7 +193,7 @@ public final class PipelineReporterExt extends RubyBasicObject { RubyUtil.RUBY.newString("inflight_count").newFrozen(); private static final RubyString STALLING_THREADS_KEY = - RubyUtil.RUBY.newString("stalling_thread_info").newFrozen(); + RubyUtil.RUBY.newString("stalling_threads_info").newFrozen(); private static final RubyString PLUGIN_KEY = RubyUtil.RUBY.newString("plugin").newFrozen();