mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
parent
2ba8bfd55b
commit
d3083de43c
6 changed files with 37 additions and 22 deletions
|
@ -26,6 +26,8 @@ module LogStash
|
|||
@filter.execution_context = execution_context
|
||||
|
||||
@metric_events = namespaced_metric.namespace(:events)
|
||||
@metric_events_in = @metric_events.counter(:in)
|
||||
@metric_events_out = @metric_events.counter(:out)
|
||||
namespaced_metric.gauge(:name, config_name)
|
||||
|
||||
# Not all the filters will do bufferings
|
||||
|
@ -37,7 +39,7 @@ module LogStash
|
|||
end
|
||||
|
||||
def multi_filter(events)
|
||||
@metric_events.increment(:in, events.size)
|
||||
@metric_events_in.increment(events.size)
|
||||
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
new_events = @filter.multi_filter(events)
|
||||
|
@ -47,9 +49,9 @@ module LogStash
|
|||
# that EVENTS_INT == EVENTS_OUT, see the aggregates and
|
||||
# the split filter
|
||||
c = new_events.count { |event| !event.cancelled? }
|
||||
@metric_events.increment(:out, c) if c > 0
|
||||
|
||||
return new_events
|
||||
@metric_events_out.increment(:out) if c > 0
|
||||
new_events
|
||||
end
|
||||
|
||||
private
|
||||
|
|
|
@ -33,11 +33,7 @@ module LogStash module Instrument
|
|||
#
|
||||
def push(namespaces_path, key, type, *metric_type_params)
|
||||
begin
|
||||
metric = @metric_store.fetch_or_store(namespaces_path, key) do
|
||||
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
|
||||
end
|
||||
|
||||
metric.execute(*metric_type_params)
|
||||
get(namespaces_path, key, type).execute(*metric_type_params)
|
||||
rescue MetricStore::NamespacesExpectedError => e
|
||||
logger.error("Collector: Cannot record metric", :exception => e)
|
||||
rescue NameError => e
|
||||
|
@ -51,6 +47,12 @@ module LogStash module Instrument
|
|||
end
|
||||
end
|
||||
|
||||
def get(namespaces_path, key, type)
|
||||
@metric_store.fetch_or_store(namespaces_path, key) do
|
||||
LogStash::Instrument::MetricType.create(type, namespaces_path, key)
|
||||
end
|
||||
end
|
||||
|
||||
# Snapshot the current Metric Store and return it immediately,
|
||||
# This is useful if you want to get access to the current metric store without
|
||||
# waiting for a periodic call.
|
||||
|
|
|
@ -43,6 +43,10 @@ module LogStash module Instrument
|
|||
def collector
|
||||
@metric.collector
|
||||
end
|
||||
|
||||
def counter(key)
|
||||
collector.get(@namespace_name, key, :counter)
|
||||
end
|
||||
|
||||
def namespace(name)
|
||||
NamespacedMetric.new(metric, namespace_name + Array(name))
|
||||
|
|
|
@ -10,7 +10,9 @@ module LogStash module Instrument
|
|||
@events_metrics = metric.namespace([:stats, :events])
|
||||
@pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
|
||||
@plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events])
|
||||
|
||||
@events_metrics_counter = @events_metrics.counter(:in)
|
||||
@pipeline_metrics_counter = @pipeline_metrics.counter(:in)
|
||||
@plugin_events_metrics_counter = @plugin_events_metrics.counter(:out)
|
||||
define_initial_metrics_values
|
||||
end
|
||||
|
||||
|
@ -29,9 +31,9 @@ module LogStash module Instrument
|
|||
|
||||
private
|
||||
def record_metric(size = 1)
|
||||
@events_metrics.increment(:in, size)
|
||||
@pipeline_metrics.increment(:in, size)
|
||||
@plugin_events_metrics.increment(:out, size)
|
||||
@events_metrics_counter.increment(size)
|
||||
@pipeline_metrics_counter.increment(size)
|
||||
@plugin_events_metrics_counter.increment(size)
|
||||
|
||||
clock = @events_metrics.time(:queue_push_duration_in_millis)
|
||||
|
||||
|
@ -47,9 +49,9 @@ module LogStash module Instrument
|
|||
end
|
||||
|
||||
def define_initial_metrics_values
|
||||
@events_metrics.increment(:in, 0)
|
||||
@pipeline_metrics.increment(:in, 0)
|
||||
@plugin_events_metrics.increment(:out, 0)
|
||||
@events_metrics_counter.increment(0)
|
||||
@pipeline_metrics_counter.increment(0)
|
||||
@plugin_events_metrics_counter.increment(0)
|
||||
|
||||
@events_metrics.report_time(:queue_push_duration_in_millis, 0)
|
||||
@pipeline_metrics.report_time(:queue_push_duration_in_millis, 0)
|
||||
|
|
|
@ -19,7 +19,8 @@ module LogStash class OutputDelegator
|
|||
@namespaced_metric = metric.namespace(id.to_sym)
|
||||
@namespaced_metric.gauge(:name, config_name)
|
||||
@metric_events = @namespaced_metric.namespace(:events)
|
||||
|
||||
@in_counter = @metric_events.counter(:in)
|
||||
@out_counter = @metric_events.counter(:out)
|
||||
@strategy = strategy_registry.
|
||||
class_for(self.concurrency).
|
||||
new(@logger, @output_class, @namespaced_metric, execution_context, plugin_args)
|
||||
|
@ -42,11 +43,11 @@ module LogStash class OutputDelegator
|
|||
end
|
||||
|
||||
def multi_receive(events)
|
||||
@metric_events.increment(:in, events.length)
|
||||
@in_counter.increment(events.length)
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
@strategy.multi_receive(events)
|
||||
clock.stop
|
||||
@metric_events.increment(:out, events.length)
|
||||
@out_counter.increment(events.length)
|
||||
end
|
||||
|
||||
def do_close
|
||||
|
|
|
@ -68,11 +68,15 @@ module LogStash; module Util
|
|||
|
||||
def set_events_metric(metric)
|
||||
@event_metric = metric
|
||||
@event_metric_out = @event_metric.counter(:out)
|
||||
@event_metric_filtered = @event_metric.counter(:filtered)
|
||||
define_initial_metrics_values(@event_metric)
|
||||
end
|
||||
|
||||
def set_pipeline_metric(metric)
|
||||
@pipeline_metric = metric
|
||||
@pipeline_metric_out = @pipeline_metric.counter(:out)
|
||||
@pipeline_metric_filtered = @pipeline_metric.counter(:filtered)
|
||||
define_initial_metrics_values(@pipeline_metric)
|
||||
end
|
||||
|
||||
|
@ -159,13 +163,13 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def add_filtered_metrics(batch)
|
||||
@event_metric.increment(:filtered, batch.filtered_size)
|
||||
@pipeline_metric.increment(:filtered, batch.filtered_size)
|
||||
@event_metric_filtered.increment(batch.filtered_size)
|
||||
@pipeline_metric_filtered.increment(batch.filtered_size)
|
||||
end
|
||||
|
||||
def add_output_metrics(batch)
|
||||
@event_metric.increment(:out, batch.filtered_size)
|
||||
@pipeline_metric.increment(:out, batch.filtered_size)
|
||||
@event_metric_out.increment(batch.filtered_size)
|
||||
@pipeline_metric_out.increment(batch.filtered_size)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue