Fix non derterministic test when accessing the metric store

This commit introduce a mutex around the structured hash for the metric,
this hash is not updated frequently and its used mostly for the api to
be able to do search on the collected metric. Adding a mutex make sure
the changes are visible accross thread.

Fixes #5152

Fixes #5178
This commit is contained in:
Pier-Hugues Pellerin 2016-04-21 16:45:34 -04:00
parent d44032c721
commit 2a7b5668ca
2 changed files with 21 additions and 13 deletions

View file

@ -2,6 +2,7 @@
require "concurrent"
require "logstash/event"
require "logstash/instrument/metric_type"
require "thread"
module LogStash module Instrument
# The Metric store the data structure that make sure the data is
@ -25,6 +26,12 @@ module LogStash module Instrument
# This hash has only one dimension
# and allow fast retrieval of the metrics
@fast_lookup = Concurrent::Map.new
# This Mutex block the critical section for the
# structured hash, it block the zone when we first insert a metric
# in the structured hash or when we query it for search or to make
# the result available in the API.
@structured_lookup_mutex = Mutex.new
end
# This method use the namespace and key to search the corresponding value of
@ -46,16 +53,13 @@ module LogStash module Instrument
# BUT. If the value is not present in the `@fast_lookup` the value will be inserted and
# `#puf_if_absent` will return nil. With this returned value of nil we assume that we don't
# have it in the `@metric_store` for structured search so we add it there too.
#
# The problem with only using the `@metric_store` directly all the time would require us
# to use the mutex around the structure since its a multi-level hash, without that it wont
# return a consistent value of the metric and this would slow down the code and would
# complixity the code.
if found_value = @fast_lookup.put_if_absent([namespaces, key], provided_value)
return found_value
else
# If we cannot find the value this mean we need to save it in the store.
fetch_or_store_namespaces(namespaces).fetch_or_store(key, provided_value)
@structured_lookup_mutex.synchronize do
# If we cannot find the value this mean we need to save it in the store.
fetch_or_store_namespaces(namespaces).fetch_or_store(key, provided_value)
end
return provided_value
end
end
@ -89,7 +93,9 @@ module LogStash module Instrument
key_paths.map(&:to_sym)
new_hash = Hash.new
get_recursively(key_paths, @store, new_hash)
@structured_lookup_mutex.synchronize do
get_recursively(key_paths, @store, new_hash)
end
new_hash
end

View file

@ -357,7 +357,7 @@ describe LogStash::Agent do
end
it "resets the metric collector" do
# We know that the store has more events that the next expect
# We know that the store has more events coming in.
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
snapshot = LogStash::Instrument::Collector.instance.snapshot_metric
expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to be > new_config_generator_counter
@ -367,11 +367,13 @@ describe LogStash::Agent do
sleep(interval * 3) # Give time to reload the config
# Since thre is multiple threads involved and with the configuration reload,
# It can take some time to the states be visible in the store
# Since there is multiple threads involved with the configuration reload,
# It can take some time to the stats be visible in the store but it will
# be eventually consistent.
sleep(0.01) while dummy_output.events.size < new_config_generator_counter
snapshot = LogStash::Instrument::Collector.instance.snapshot_metric
expect(snapshot.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value).to eq(new_config_generator_counter)
value = LogStash::Instrument::Collector.instance.snapshot_metric.metric_store.get_with_path("/stats/events")[:stats][:events][:in].value
expect(value).to eq(new_config_generator_counter)
end
end
end