PERFORMANCE: Cache some metrics

Fixes #7919
This commit is contained in:
Armin 2017-08-05 10:14:14 +02:00 committed by Armin Braun
parent 21c178cbe5
commit e10af362be
6 changed files with 36 additions and 21 deletions

View file

@ -26,6 +26,8 @@ module LogStash
@filter.execution_context = execution_context @filter.execution_context = execution_context
@metric_events = namespaced_metric.namespace(:events) @metric_events = namespaced_metric.namespace(:events)
@metric_events_in = @metric_events.counter(:in)
@metric_events_out = @metric_events.counter(:out)
namespaced_metric.gauge(:name, config_name) namespaced_metric.gauge(:name, config_name)
# Not all the filters will do bufferings # Not all the filters will do bufferings
@ -37,7 +39,7 @@ module LogStash
end end
def multi_filter(events) def multi_filter(events)
@metric_events.increment(:in, events.size) @metric_events_in.increment(events.size)
clock = @metric_events.time(:duration_in_millis) clock = @metric_events.time(:duration_in_millis)
new_events = @filter.multi_filter(events) new_events = @filter.multi_filter(events)
@ -47,7 +49,7 @@ module LogStash
# that EVENTS_IN == EVENTS_OUT, see the aggregates and # that EVENTS_IN == EVENTS_OUT, see the aggregates and
# the split filter # the split filter
c = new_events.count { |event| !event.cancelled? } c = new_events.count { |event| !event.cancelled? }
@metric_events.increment(:out, c) if c > 0 @metric_events_out.increment(:out) if c > 0
new_events new_events
end end

View file

@ -33,11 +33,7 @@ module LogStash module Instrument
# #
def push(namespaces_path, key, type, *metric_type_params) def push(namespaces_path, key, type, *metric_type_params)
begin begin
metric = @metric_store.fetch_or_store(namespaces_path, key) do get(namespaces_path, key, type).execute(*metric_type_params)
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
end
metric.execute(*metric_type_params)
rescue MetricStore::NamespacesExpectedError => e rescue MetricStore::NamespacesExpectedError => e
logger.error("Collector: Cannot record metric", :exception => e) logger.error("Collector: Cannot record metric", :exception => e)
rescue NameError => e rescue NameError => e
@ -51,6 +47,12 @@ module LogStash module Instrument
end end
end end
def get(namespaces_path, key, type)
@metric_store.fetch_or_store(namespaces_path, key) do
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
end
end
# Snapshot the current Metric Store and return it immediately, # Snapshot the current Metric Store and return it immediately,
# This is useful if you want to get access to the current metric store without # This is useful if you want to get access to the current metric store without
# waiting for a periodic call. # waiting for a periodic call.

View file

@ -43,6 +43,10 @@ module LogStash module Instrument
def collector def collector
@metric.collector @metric.collector
end end
def counter(key)
collector.get(@namespace_name, key, :counter)
end
def namespace(name) def namespace(name)
NamespacedMetric.new(metric, namespace_name + Array(name)) NamespacedMetric.new(metric, namespace_name + Array(name))

View file

@ -10,7 +10,9 @@ module LogStash module Instrument
@events_metrics = metric.namespace([:stats, :events]) @events_metrics = metric.namespace([:stats, :events])
@pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events]) @pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
@plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events]) @plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events])
@events_metrics_counter = @events_metrics.counter(:in)
@pipeline_metrics_counter = @pipeline_metrics.counter(:in)
@plugin_events_metrics_counter = @plugin_events_metrics.counter(:out)
define_initial_metrics_values define_initial_metrics_values
end end
@ -29,9 +31,9 @@ module LogStash module Instrument
private private
def record_metric(size = 1) def record_metric(size = 1)
@events_metrics.increment(:in, size) @events_metrics_counter.increment(size)
@pipeline_metrics.increment(:in, size) @pipeline_metrics_counter.increment(size)
@plugin_events_metrics.increment(:out, size) @plugin_events_metrics_counter.increment(size)
clock = @events_metrics.time(:queue_push_duration_in_millis) clock = @events_metrics.time(:queue_push_duration_in_millis)
@ -47,9 +49,9 @@ module LogStash module Instrument
end end
def define_initial_metrics_values def define_initial_metrics_values
@events_metrics.increment(:in, 0) @events_metrics_counter.increment(0)
@pipeline_metrics.increment(:in, 0) @pipeline_metrics_counter.increment(0)
@plugin_events_metrics.increment(:out, 0) @plugin_events_metrics_counter.increment(0)
@events_metrics.report_time(:queue_push_duration_in_millis, 0) @events_metrics.report_time(:queue_push_duration_in_millis, 0)
@pipeline_metrics.report_time(:queue_push_duration_in_millis, 0) @pipeline_metrics.report_time(:queue_push_duration_in_millis, 0)

View file

@ -19,7 +19,8 @@ module LogStash class OutputDelegator
@namespaced_metric = metric.namespace(id.to_sym) @namespaced_metric = metric.namespace(id.to_sym)
@namespaced_metric.gauge(:name, config_name) @namespaced_metric.gauge(:name, config_name)
@metric_events = @namespaced_metric.namespace(:events) @metric_events = @namespaced_metric.namespace(:events)
@in_counter = @metric_events.counter(:in)
@out_counter = @metric_events.counter(:out)
@strategy = strategy_registry. @strategy = strategy_registry.
class_for(self.concurrency). class_for(self.concurrency).
new(@logger, @output_class, @namespaced_metric, execution_context, plugin_args) new(@logger, @output_class, @namespaced_metric, execution_context, plugin_args)
@ -42,11 +43,11 @@ module LogStash class OutputDelegator
end end
def multi_receive(events) def multi_receive(events)
@metric_events.increment(:in, events.length) @in_counter.increment(events.length)
clock = @metric_events.time(:duration_in_millis) clock = @metric_events.time(:duration_in_millis)
@strategy.multi_receive(events) @strategy.multi_receive(events)
clock.stop clock.stop
@metric_events.increment(:out, events.length) @out_counter.increment(events.length)
end end
def do_close def do_close

View file

@ -71,11 +71,15 @@ module LogStash; module Util
def set_events_metric(metric) def set_events_metric(metric)
@event_metric = metric @event_metric = metric
@event_metric_out = @event_metric.counter(:out)
@event_metric_filtered = @event_metric.counter(:filtered)
define_initial_metrics_values(@event_metric) define_initial_metrics_values(@event_metric)
end end
def set_pipeline_metric(metric) def set_pipeline_metric(metric)
@pipeline_metric = metric @pipeline_metric = metric
@pipeline_metric_out = @pipeline_metric.counter(:out)
@pipeline_metric_filtered = @pipeline_metric.counter(:filtered)
define_initial_metrics_values(@pipeline_metric) define_initial_metrics_values(@pipeline_metric)
end end
@ -157,13 +161,13 @@ module LogStash; module Util
end end
def add_filtered_metrics(batch) def add_filtered_metrics(batch)
@event_metric.increment(:filtered, batch.filtered_size) @event_metric_filtered.increment(batch.filtered_size)
@pipeline_metric.increment(:filtered, batch.filtered_size) @pipeline_metric_filtered.increment(batch.filtered_size)
end end
def add_output_metrics(batch) def add_output_metrics(batch)
@event_metric.increment(:out, batch.filtered_size) @event_metric_out.increment(batch.filtered_size)
@pipeline_metric.increment(:out, batch.filtered_size) @pipeline_metric_out.increment(batch.filtered_size)
end end
end end