diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 8bb3649e9..855223230 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -45,8 +45,10 @@ class LogStash::Agent @upgrade_mutex = Mutex.new @collect_metric = setting("metric.collect") - @metric = create_metric_collector - @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(create_metric_collector) + + + # Create the collectors and configured it with the library + configure_metrics_collectors end def execute @@ -156,14 +158,25 @@ class LogStash::Agent end end - def create_metric_collector - if collect_metrics? - @logger.debug("Agent: Configuring metric collection") - LogStash::Instrument::Collector.instance.agent = self - LogStash::Instrument::Metric.new - else - LogStash::Instrument::NullMetric.new - end + def configure_metrics_collectors + @collector = LogStash::Instrument::Collector.new + + @metric = if collect_metrics? + @logger.debug("Agent: Configuring metric collection") + LogStash::Instrument::Metric.new(@collector) + else + LogStash::Instrument::NullMetric.new + end + + + @periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric) + end + + def reset_metrics_collectors + @periodic_pollers.stop + @collector.stop + configure_metrics_collectors + @periodic_pollers.start end def collect_metrics? @@ -171,7 +184,6 @@ class LogStash::Agent end def create_pipeline(settings, config=nil) - if config.nil? begin config = fetch_config(settings) @@ -182,7 +194,7 @@ class LogStash::Agent end begin - LogStash::Pipeline.new(config, settings) + LogStash::Pipeline.new(config, settings, metric) rescue => e @logger.error("fetched an invalid config", :config => config, :reason => e.message) return @@ -204,6 +216,11 @@ class LogStash::Agent return end + # Reset the current collected stats, + # starting a pipeline with a new configuration should be the same as restarting + # logstash. + reset_metrics_collectors + new_pipeline = create_pipeline(old_pipeline.settings, new_config) return if new_pipeline.nil? @@ -225,12 +242,6 @@ class LogStash::Agent return unless pipeline.is_a?(LogStash::Pipeline) return if pipeline.ready? @logger.info("starting pipeline", :id => id) - - # Reset the current collected stats, - # starting a pipeline with a new configuration should be the same as restarting - # logstash. - reset_collector - Thread.new do LogStash::Util.set_thread_name("pipeline.#{id}") begin @@ -266,6 +277,8 @@ class LogStash::Agent def upgrade_pipeline(pipeline_id, new_pipeline) stop_pipeline(pipeline_id) @pipelines[pipeline_id] = new_pipeline + + new_pipeline = metric start_pipeline(pipeline_id) end @@ -273,10 +286,6 @@ class LogStash::Agent @pipelines.empty? end - def reset_collector - LogStash::Instrument::Collector.instance.clear - end - def setting(key) @settings.get(key) end diff --git a/logstash-core/lib/logstash/instrument/collector.rb b/logstash-core/lib/logstash/instrument/collector.rb index 2059ea222..ff4c658bc 100644 --- a/logstash-core/lib/logstash/instrument/collector.rb +++ b/logstash-core/lib/logstash/instrument/collector.rb @@ -8,7 +8,7 @@ require "singleton" require "thread" module LogStash module Instrument - # The Collector singleton is the single point of reference for all + # The Collector is the single point of reference for all # the metrics collection inside logstash, the metrics library will make # direct calls to this class. # @@ -91,6 +91,10 @@ module LogStash module Instrument @snapshot_task.execute end + def stop + @snapshot_task.shutdown + end + # Create a snapshot of the MetricStore and send it to to the registered observers # The observer will receive the following signature in the update methode. # diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index a8c142147..46c3e4318 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -32,8 +32,8 @@ module LogStash; class Pipeline :started_at, :thread, :config_str, - :settings - attr_accessor :metric + :settings, + :metric MAX_INFLIGHT_WARN_THRESHOLD = 10_000 @@ -41,7 +41,7 @@ module LogStash; class Pipeline "LogStash::Inputs::Stdin" ] - def initialize(config_str, settings = LogStash::SETTINGS) + def initialize(config_str, settings = LogStash::SETTINGS, provided_metric = nil) @config_str = config_str @logger = Cabin::Channel.get(LogStash) @settings = settings @@ -56,7 +56,7 @@ module LogStash; class Pipeline # This needs to be configured before we evaluate the code to make # sure the metric instance is correctly send to the plugins to make the namespace scoping work - @metric = settings.get_value("metric.collect") ? Instrument::Metric.new : Instrument::NullMetric.new + @metric = provided_metric.nil? ? LogStash::Instrument::NullMetric.new : provided_metric grammar = LogStashConfigParser.new @config = grammar.parse(config_str) diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 5a40ee9c0..3bc59233b 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -328,11 +328,11 @@ describe LogStash::Agent do end end + context "metrics after config reloading" do - let(:dummy_output) { DummyOutput.new } let(:config) { "input { generator { } } output { dummyoutput { } }" } - let(:new_config_generator_counter) { 50 } - let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput {} }" } + let(:new_config_generator_counter) { 500 } + let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput2 {} }" } let(:config_path) do f = Stud::Temporary.file f.write(config) @@ -353,11 +353,24 @@ describe LogStash::Agent do "metric.collect" => true }) end + # We need to create theses dummy classes to know how many + # events where actually generated by the pipeline and successfully send to the output. + # Theses values are compared with what we store in the metric store. + let!(:dummy_output) { DummyOutput.new } + let!(:dummy_output2) { DummyOutput.new } + class DummyOutput2 < LogStash::Outputs::Base; end + before :each do allow(DummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output) + allow(DummyOutput2).to receive(:new).at_least(:once).with(anything).and_return(dummy_output2) + allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator) allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput2").and_return(DummyOutput2) + + @abort_on_exception = Thread.abort_on_exception + Thread.abort_on_exception = true @t = Thread.new do subject.register_pipeline("main", pipeline_settings) @@ -371,25 +384,32 @@ describe LogStash::Agent do subject.shutdown Stud.stop!(@t) @t.join + Thread.abort_on_exception = @abort_on_exception end it "resets the metric collector" do # We know that the store has more events coming in. - sleep(0.01) while dummy_output.events.size < new_config_generator_counter - snapshot = LogStash::Instrument::Collector.instance.snapshot_metric + while dummy_output.events.size <= new_config_generator_counter + sleep(0.1) + end + + snapshot = subject.metric.collector.snapshot_metric expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to be > new_config_generator_counter # update the configuration and give some time to logstash to pick it up and do the work - IO.write(config_path, new_config) + # Also force a flush to disk to make sure ruby reload it. + File.open(config_path, "w") do |f| + f.write(new_config) + f.fsync + end sleep(interval * 3) # Give time to reload the config - # Since there is multiple threads involved with the configuration reload, - # It can take some time to the stats be visible in the store but it will # be eventually consistent. - sleep(0.01) while dummy_output.events.size < new_config_generator_counter + sleep(0.01) while dummy_output2.events.size < new_config_generator_counter - value = LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value + snapshot = subject.metric.collector.snapshot_metric + value = snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value expect(value).to eq(new_config_generator_counter) end end diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 0fb6be836..506e12167 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -554,10 +554,12 @@ describe LogStash::Pipeline do end context "when collecting metrics in the pipeline" do + let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) } + + subject { described_class.new(config, pipeline_settings_obj, metric) } + let(:pipeline_settings) { { "pipeline.id" => pipeline_id } } - subject { described_class.new(config, pipeline_settings_obj) } let(:pipeline_id) { "main" } - let(:metric) { LogStash::Instrument::Metric.new } let(:number_of_events) { 1000 } let(:multiline_id) { "my-multiline" } let(:multiline_id_other) { "my-multiline_other" } @@ -591,6 +593,7 @@ describe LogStash::Pipeline do EOS end let(:dummyoutput) { DummyOutput.new({ "id" => dummy_output_id }) } + let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store } before :each do allow(DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput) @@ -599,9 +602,6 @@ describe LogStash::Pipeline do allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) - # Reset the metric store - LogStash::Instrument::Collector.instance.clear - Thread.new { subject.run } # make sure we have received all the generated events sleep 1 while dummyoutput.events.size < number_of_events @@ -612,7 +612,7 @@ describe LogStash::Pipeline do end context "global metric" do - let(:collected_metric) { LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("stats/events") } + let(:collected_metric) { metric_store.get_with_path("stats/events") } it "populates the differents" do expect(collected_metric[:stats][:events][:in].value).to eq(number_of_events) @@ -622,7 +622,7 @@ describe LogStash::Pipeline do end context "pipelines" do - let(:collected_metric) { LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("stats/pipelines/") } + let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") } it "populates the pipelines core metrics" do expect(collected_metric[:stats][:pipelines][:main][:events][:in].value).to eq(number_of_events) diff --git a/logstash-core/spec/support/mocks_classes.rb b/logstash-core/spec/support/mocks_classes.rb index c481e8be2..3d4ed28f4 100644 --- a/logstash-core/spec/support/mocks_classes.rb +++ b/logstash-core/spec/support/mocks_classes.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/outputs/base" +require "thread" class DummyOutput < LogStash::Outputs::Base config_name "dummyoutput" @@ -10,7 +11,7 @@ class DummyOutput < LogStash::Outputs::Base def initialize(params={}) super @num_closes = 0 - @events = [] + @events = Queue.new end def register