Fix a thread safety issue with the agent.

Make sure we pass the metric when we create a pipeline this make sure
the metric is correctly visible in the pipeline and remove the need to
lock the variables.

Make the expectation more robust in the metric reload scenario.

Fixes #5400 for the greater good.

Fixes #5446
This commit is contained in:
Pier-Hugues Pellerin 2016-06-02 15:16:00 -04:00
parent 21a47b8db9
commit d81ad6172f
6 changed files with 79 additions and 45 deletions

View file

@ -45,8 +45,10 @@ class LogStash::Agent
@upgrade_mutex = Mutex.new
@collect_metric = setting("metric.collect")
@metric = create_metric_collector
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(create_metric_collector)
# Create the collectors and configured it with the library
configure_metrics_collectors
end
def execute
@ -156,14 +158,25 @@ class LogStash::Agent
end
end
def create_metric_collector
if collect_metrics?
def configure_metrics_collectors
@collector = LogStash::Instrument::Collector.new
@metric = if collect_metrics?
@logger.debug("Agent: Configuring metric collection")
LogStash::Instrument::Collector.instance.agent = self
LogStash::Instrument::Metric.new
LogStash::Instrument::Metric.new(@collector)
else
LogStash::Instrument::NullMetric.new
end
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric)
end
def reset_metrics_collectors
@periodic_pollers.stop
@collector.stop
configure_metrics_collectors
@periodic_pollers.start
end
def collect_metrics?
@ -171,7 +184,6 @@ class LogStash::Agent
end
def create_pipeline(settings, config=nil)
if config.nil?
begin
config = fetch_config(settings)
@ -182,7 +194,7 @@ class LogStash::Agent
end
begin
LogStash::Pipeline.new(config, settings)
LogStash::Pipeline.new(config, settings, metric)
rescue => e
@logger.error("fetched an invalid config", :config => config, :reason => e.message)
return
@ -204,6 +216,11 @@ class LogStash::Agent
return
end
# Reset the current collected stats,
# starting a pipeline with a new configuration should be the same as restarting
# logstash.
reset_metrics_collectors
new_pipeline = create_pipeline(old_pipeline.settings, new_config)
return if new_pipeline.nil?
@ -225,12 +242,6 @@ class LogStash::Agent
return unless pipeline.is_a?(LogStash::Pipeline)
return if pipeline.ready?
@logger.info("starting pipeline", :id => id)
# Reset the current collected stats,
# starting a pipeline with a new configuration should be the same as restarting
# logstash.
reset_collector
Thread.new do
LogStash::Util.set_thread_name("pipeline.#{id}")
begin
@ -266,6 +277,8 @@ class LogStash::Agent
def upgrade_pipeline(pipeline_id, new_pipeline)
stop_pipeline(pipeline_id)
@pipelines[pipeline_id] = new_pipeline
new_pipeline = metric
start_pipeline(pipeline_id)
end
@ -273,10 +286,6 @@ class LogStash::Agent
@pipelines.empty?
end
def reset_collector
LogStash::Instrument::Collector.instance.clear
end
def setting(key)
@settings.get(key)
end

View file

@ -8,7 +8,7 @@ require "singleton"
require "thread"
module LogStash module Instrument
# The Collector singleton is the single point of reference for all
# The Collector is the single point of reference for all
# the metrics collection inside logstash, the metrics library will make
# direct calls to this class.
#
@ -91,6 +91,10 @@ module LogStash module Instrument
@snapshot_task.execute
end
def stop
@snapshot_task.shutdown
end
# Create a snapshot of the MetricStore and send it to to the registered observers
# The observer will receive the following signature in the update methode.
#

View file

@ -32,8 +32,8 @@ module LogStash; class Pipeline
:started_at,
:thread,
:config_str,
:settings
attr_accessor :metric
:settings,
:metric
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
@ -41,7 +41,7 @@ module LogStash; class Pipeline
"LogStash::Inputs::Stdin"
]
def initialize(config_str, settings = LogStash::SETTINGS)
def initialize(config_str, settings = LogStash::SETTINGS, provided_metric = nil)
@config_str = config_str
@logger = Cabin::Channel.get(LogStash)
@settings = settings
@ -56,7 +56,7 @@ module LogStash; class Pipeline
# This needs to be configured before we evaluate the code to make
# sure the metric instance is correctly send to the plugins to make the namespace scoping work
@metric = settings.get_value("metric.collect") ? Instrument::Metric.new : Instrument::NullMetric.new
@metric = provided_metric.nil? ? LogStash::Instrument::NullMetric.new : provided_metric
grammar = LogStashConfigParser.new
@config = grammar.parse(config_str)

View file

@ -328,11 +328,11 @@ describe LogStash::Agent do
end
end
context "metrics after config reloading" do
let(:dummy_output) { DummyOutput.new }
let(:config) { "input { generator { } } output { dummyoutput { } }" }
let(:new_config_generator_counter) { 50 }
let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput {} }" }
let(:new_config_generator_counter) { 500 }
let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput2 {} }" }
let(:config_path) do
f = Stud::Temporary.file
f.write(config)
@ -353,11 +353,24 @@ describe LogStash::Agent do
"metric.collect" => true })
end
# We need to create theses dummy classes to know how many
# events where actually generated by the pipeline and successfully send to the output.
# Theses values are compared with what we store in the metric store.
let!(:dummy_output) { DummyOutput.new }
let!(:dummy_output2) { DummyOutput.new }
class DummyOutput2 < LogStash::Outputs::Base; end
before :each do
allow(DummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output)
allow(DummyOutput2).to receive(:new).at_least(:once).with(anything).and_return(dummy_output2)
allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(LogStash::Inputs::Generator)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(LogStash::Codecs::Plain)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput2").and_return(DummyOutput2)
@abort_on_exception = Thread.abort_on_exception
Thread.abort_on_exception = true
@t = Thread.new do
subject.register_pipeline("main", pipeline_settings)
@ -371,25 +384,32 @@ describe LogStash::Agent do
subject.shutdown
Stud.stop!(@t)
@t.join
Thread.abort_on_exception = @abort_on_exception
end
it "resets the metric collector" do
# We know that the store has more events coming in.
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
snapshot = LogStash::Instrument::Collector.instance.snapshot_metric
while dummy_output.events.size <= new_config_generator_counter
sleep(0.1)
end
snapshot = subject.metric.collector.snapshot_metric
expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to be > new_config_generator_counter
# update the configuration and give some time to logstash to pick it up and do the work
IO.write(config_path, new_config)
# Also force a flush to disk to make sure ruby reload it.
File.open(config_path, "w") do |f|
f.write(new_config)
f.fsync
end
sleep(interval * 3) # Give time to reload the config
# Since there is multiple threads involved with the configuration reload,
# It can take some time to the stats be visible in the store but it will
# be eventually consistent.
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
sleep(0.01) while dummy_output2.events.size < new_config_generator_counter
value = LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value
snapshot = subject.metric.collector.snapshot_metric
value = snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value
expect(value).to eq(new_config_generator_counter)
end
end

View file

@ -554,10 +554,12 @@ describe LogStash::Pipeline do
end
context "when collecting metrics in the pipeline" do
let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) }
subject { described_class.new(config, pipeline_settings_obj, metric) }
let(:pipeline_settings) { { "pipeline.id" => pipeline_id } }
subject { described_class.new(config, pipeline_settings_obj) }
let(:pipeline_id) { "main" }
let(:metric) { LogStash::Instrument::Metric.new }
let(:number_of_events) { 1000 }
let(:multiline_id) { "my-multiline" }
let(:multiline_id_other) { "my-multiline_other" }
@ -591,6 +593,7 @@ describe LogStash::Pipeline do
EOS
end
let(:dummyoutput) { DummyOutput.new({ "id" => dummy_output_id }) }
let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store }
before :each do
allow(DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput)
@ -599,9 +602,6 @@ describe LogStash::Pipeline do
allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
# Reset the metric store
LogStash::Instrument::Collector.instance.clear
Thread.new { subject.run }
# make sure we have received all the generated events
sleep 1 while dummyoutput.events.size < number_of_events
@ -612,7 +612,7 @@ describe LogStash::Pipeline do
end
context "global metric" do
let(:collected_metric) { LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("stats/events") }
let(:collected_metric) { metric_store.get_with_path("stats/events") }
it "populates the differents" do
expect(collected_metric[:stats][:events][:in].value).to eq(number_of_events)
@ -622,7 +622,7 @@ describe LogStash::Pipeline do
end
context "pipelines" do
let(:collected_metric) { LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("stats/pipelines/") }
let(:collected_metric) { metric_store.get_with_path("stats/pipelines/") }
it "populates the pipelines core metrics" do
expect(collected_metric[:stats][:pipelines][:main][:events][:in].value).to eq(number_of_events)

View file

@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/outputs/base"
require "thread"
class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
@ -10,7 +11,7 @@ class DummyOutput < LogStash::Outputs::Base
def initialize(params={})
super
@num_closes = 0
@events = []
@events = Queue.new
end
def register