diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 0153bf3ad..0e356ec76 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -45,7 +45,7 @@ class LogStash::Agent @collect_metric = setting("metric.collect") @metric = create_metric_collector - @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(metric) + @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(create_metric_collector) end def execute @@ -83,7 +83,6 @@ class LogStash::Agent pipeline = create_pipeline(pipeline_settings) return unless pipeline.is_a?(LogStash::Pipeline) - pipeline.metric = @metric if @auto_reload && pipeline.non_reloadable_plugins.any? @logger.error(I18n.t("logstash.agent.non_reloadable_config_register"), :pipeline_id => pipeline_id, diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 3114cdb7c..a8c142147 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -54,17 +54,9 @@ module LogStash; class Pipeline @worker_threads = [] - # Metric object should be passed upstream, multiple pipeline share the same metric - # and collector only the namespace will changes. - # If no metric is given, we use a `NullMetric` for all internal calls. - # We also do this to make the changes backward compatible with previous testing of the - # pipeline. - # # This needs to be configured before we evaluate the code to make - # sure the metric instance is correctly send to the plugin. - # NOTE: It is the responsibility of the Agent to set this externally with a setter - # if there's an intent of this not being a NullMetric - @metric = Instrument::NullMetric.new + # sure the metric instance is correctly send to the plugins to make the namespace scoping work + @metric = settings.get_value("metric.collect") ? Instrument::Metric.new : Instrument::NullMetric.new grammar = LogStashConfigParser.new @config = grammar.parse(config_str) @@ -80,7 +72,7 @@ module LogStash; class Pipeline # The config code is hard to represent as a log message... # So just print it. - if @settings.get("config.debug") && logger.debug? + if @settings.get_value("config.debug") && logger.debug? logger.debug("Compiled pipeline code", :code => code) end diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 711a4b37d..5a40ee9c0 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -98,6 +98,7 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + subject.shutdown end end @@ -114,6 +115,7 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + subject.shutdown end end @@ -129,6 +131,8 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + + subject.shutdown end end end @@ -157,6 +161,7 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + subject.shutdown end end @@ -172,6 +177,7 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + subject.shutdown end end @@ -186,6 +192,7 @@ describe LogStash::Agent do sleep 0.1 Stud.stop!(t) t.join + subject.shutdown end end end @@ -361,6 +368,7 @@ describe LogStash::Agent do end after :each do + subject.shutdown Stud.stop!(@t) @t.join end diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb index bdd83d4ff..1aeef0e73 100644 --- a/logstash-core/spec/logstash/pipeline_reporter_spec.rb +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -47,6 +47,10 @@ describe LogStash::PipelineReporter do @post_snapshot = reporter.snapshot end + after do + pipeline.shutdown + end + describe "events filtered" do it "should start at zero" do expect(@pre_snapshot.events_filtered).to eql(0) @@ -82,4 +86,4 @@ describe LogStash::PipelineReporter do expect(@post_snapshot.output_info.first[:events_received]).to eql(generator_count) end end -end \ No newline at end of file +end diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index a9f2abfb0..0fb6be836 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -155,6 +155,7 @@ describe LogStash::Pipeline do {:count_was=>worker_thread_count, :filters=>["dummyfilter"]}) pipeline.run expect(pipeline.worker_threads.size).to eq(safe_thread_count) + pipeline.shutdown end end @@ -168,6 +169,7 @@ describe LogStash::Pipeline do {:worker_threads=> override_thread_count, :filters=>["dummyfilter"]}) pipeline.run expect(pipeline.worker_threads.size).to eq(override_thread_count) + pipeline.shutdown end end end @@ -193,6 +195,7 @@ describe LogStash::Pipeline do pipeline = TestPipeline.new(test_config_with_filters) pipeline.run expect(pipeline.worker_threads.size).to eq(worker_thread_count) + pipeline.shutdown end end end @@ -239,6 +242,7 @@ describe LogStash::Pipeline do expect(pipeline.outputs.size ).to eq(1) expect(pipeline.outputs.first.workers.size ).to eq(::LogStash::SETTINGS.get("pipeline.output.workers")) expect(pipeline.outputs.first.workers.first.num_closes ).to eq(1) + pipeline.shutdown end it "should call output close correctly with output workers" do @@ -255,6 +259,7 @@ describe LogStash::Pipeline do output_delegator.workers.each do |plugin| expect(plugin.num_closes ).to eq(1) end + pipeline.shutdown end end end @@ -276,6 +281,7 @@ describe LogStash::Pipeline do expect(pipeline).to receive(:start_flusher).ordered.and_call_original pipeline.run + pipeline.shutdown end end @@ -392,8 +398,14 @@ describe LogStash::Pipeline do output { } CONFIG - it "uses a `NullMetric` object if no metric is given" do - pipeline = LogStash::Pipeline.new(config) + it "uses a `NullMetric` object if `metric.collect` is set to false" do + settings = double("LogStash::SETTINGS") + + allow(settings).to receive(:get_value).with("pipeline.id").and_return("main") + allow(settings).to receive(:get_value).with("metric.collect").and_return(false) + allow(settings).to receive(:get_value).with("config.debug").and_return(false) + + pipeline = LogStash::Pipeline.new(config, settings) expect(pipeline.metric).to be_kind_of(LogStash::Instrument::NullMetric) end end @@ -511,7 +523,7 @@ describe LogStash::Pipeline do t = Thread.new { subject.run } sleep(0.1) expect(subject.started_at).to be < Time.now - t.kill rescue nil + subject.shutdown end end @@ -536,7 +548,7 @@ describe LogStash::Pipeline do t = Thread.new { subject.run } sleep(0.1) expect(subject.uptime).to be > 0 - t.kill rescue nil + subject.shutdown end end end @@ -590,7 +602,6 @@ describe LogStash::Pipeline do # Reset the metric store LogStash::Instrument::Collector.instance.clear - subject.metric = metric Thread.new { subject.run } # make sure we have received all the generated events sleep 1 while dummyoutput.events.size < number_of_events