partially reset counters during pipeline reload

Fixes #5819
This commit is contained in:
Joao Duarte 2016-08-24 12:16:16 +01:00 committed by João Duarte
parent bc080ca7b5
commit 9fd35ad4bb
5 changed files with 94 additions and 15 deletions

View file

@ -162,9 +162,8 @@ class LogStash::Agent
@periodic_pollers.start
end
def reset_metrics_collectors
stop_collecting_metrics
configure_metrics_collectors
def reset_pipeline_metrics(id)
@collector.clear("stats/pipelines/#{id}")
end
def collect_metrics?
@ -208,11 +207,6 @@ class LogStash::Agent
return
end
# Reset the current collected stats,
# starting a pipeline with a new configuration should be the same as restarting
# logstash.
reset_metrics_collectors
new_pipeline = create_pipeline(old_pipeline.settings, new_config)
return if new_pipeline.nil?
@ -268,6 +262,7 @@ class LogStash::Agent
def upgrade_pipeline(pipeline_id, new_pipeline)
stop_pipeline(pipeline_id)
reset_pipeline_metrics(pipeline_id)
@pipelines[pipeline_id] = new_pipeline
start_pipeline(pipeline_id)
end

View file

@ -104,5 +104,9 @@ module LogStash module Instrument
logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug?
notify_observers(snapshot_metric)
end
def clear(keypath)
@metric_store.prune(keypath)
end
end
end; end

View file

@ -80,8 +80,7 @@ module LogStash module Instrument
# @param [Array] The path where values should be located
# @return [Hash]
def get_with_path(path)
key_paths = path.gsub(/^#{KEY_PATH_SEPARATOR}+/, "").split(KEY_PATH_SEPARATOR)
get(*key_paths)
get(*key_paths(path))
end
# Similar to `get_with_path` but use symbols instead of string
@ -180,11 +179,24 @@ module LogStash module Instrument
end
alias_method :all, :each
def prune(path)
key_paths = key_paths(path).map {|k| k.to_sym }
@structured_lookup_mutex.synchronize do
keys_to_delete = @fast_lookup.keys.select {|namespace, _| (key_paths - namespace).empty? }
keys_to_delete.each {|k| @fast_lookup.delete(k) }
delete_from_map(@store, key_paths)
end
end
private
def get_all
@fast_lookup.values
end
def key_paths(path)
path.gsub(/^#{KEY_PATH_SEPARATOR}+/, "").split(KEY_PATH_SEPARATOR)
end
# This method take an array of keys and recursively search the metric store structure
# and return a filtered hash of the structure. This method also take into consideration
# getting two different branchs.
@ -294,5 +306,14 @@ module LogStash module Instrument
new_map = map.fetch_or_store(current) { Concurrent::Map.new }
return fetch_or_store_namespace_recursively(new_map, namespaces_path, idx + 1)
end
def delete_from_map(map, keys)
key = keys.first
if keys.size == 1
map.delete(key)
else
delete_from_map(map[key], keys[1..-1]) unless map[key].nil?
end
end
end
end; end

View file

@ -47,7 +47,7 @@ describe LogStash::Agent do
let(:pipeline_id) { "main" }
let(:config_string) { "input { } filter { } output { }" }
let(:agent_args) do
{
{
"config.string" => config_string,
"config.reload.automatic" => true,
"config.reload.interval" => 0.01,
@ -351,7 +351,7 @@ describe LogStash::Agent do
super.merge({ "config.reload.automatic" => true,
"config.reload.interval" => interval,
"metric.collect" => true })
end
end
# We need to create theses dummy classes to know how many
# events where actually generated by the pipeline and successfully send to the output.
@ -390,7 +390,35 @@ describe LogStash::Agent do
end
end
it "resets the metric collector" do
it "resets the pipeline metric collector" do
# We know that the store has more events coming in.
i = 0
while dummy_output.events.size <= new_config_generator_counter
i += 1
raise "Waiting too long!" if i > 20
sleep(0.1)
end
snapshot = subject.metric.collector.snapshot_metric
expect(snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:events][:in].value).to be > new_config_generator_counter
# update the configuration and give some time to logstash to pick it up and do the work
# Also force a flush to disk to make sure ruby reload it.
File.open(config_path, "w") do |f|
f.write(new_config)
f.fsync
end
sleep(interval * 3) # Give time to reload the config
# be eventually consistent.
sleep(0.01) while dummy_output2.events.size < new_config_generator_counter
snapshot = subject.metric.collector.snapshot_metric
value = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:events][:in].value
expect(value).to eq(new_config_generator_counter)
end
it "does not reset the global event count" do
# We know that the store has more events coming in.
i = 0
while dummy_output.events.size <= new_config_generator_counter
@ -410,13 +438,13 @@ describe LogStash::Agent do
end
sleep(interval * 3) # Give time to reload the config
# be eventually consistent.
sleep(0.01) while dummy_output2.events.size < new_config_generator_counter
snapshot = subject.metric.collector.snapshot_metric
value = snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value
expect(value).to eq(new_config_generator_counter)
expect(value).to be > new_config_generator_counter
end
end
end

View file

@ -221,4 +221,35 @@ describe LogStash::Instrument::MetricStore do
end
end
end
describe "#prune" do
let(:metric_events) {
[
[[:node, :sashimi, :pipelines, :pipeline01, :plugins, :"logstash-output-elasticsearch"], :event_in, :increment],
[[:node, :sashimi, :pipelines, :pipeline01], :processed_events_in, :increment],
[[:node, :sashimi, :pipelines, :pipeline01], :processed_events_out, :increment],
[[:node, :sashimi, :pipelines, :pipeline02], :processed_events_out, :increment],
]
}
before :each do
# Lets add a few metrics in the store before trying to find them
metric_events.each do |namespaces, metric_key, action|
metric = subject.fetch_or_store(namespaces, metric_key, LogStash::Instrument::MetricType::Counter.new(namespaces, metric_key))
metric.execute(action)
end
end
it "should remove all keys with the same starting path as the argument" do
expect(subject.get(:node, :sashimi, :pipelines, :pipeline01)).to be_a(Hash)
subject.prune("/node/sashimi/pipelines/pipeline01")
expect { subject.get(:node, :sashimi, :pipelines, :pipeline01) }.to raise_error LogStash::Instrument::MetricStore::MetricNotFound
end
it "should keep other metrics on different path branches" do
expect(subject.get(:node, :sashimi, :pipelines, :pipeline02)).to be_a(Hash)
subject.prune("/node/sashimi/pipelines/pipeline01")
expect { subject.get(:node, :sashimi, :pipelines, :pipeline02) }.to_not raise_error
end
end
end