mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
e6424f3e97
commit
324da31837
8 changed files with 67 additions and 28 deletions
|
@ -28,6 +28,7 @@ module LogStash
|
|||
@metric_events = namespaced_metric.namespace(:events)
|
||||
@metric_events_in = @metric_events.counter(:in)
|
||||
@metric_events_out = @metric_events.counter(:out)
|
||||
@metric_events_time = @metric_events.counter(:duration_in_millis)
|
||||
namespaced_metric.gauge(:name, config_name)
|
||||
|
||||
# Not all the filters will do bufferings
|
||||
|
@ -43,15 +44,13 @@ module LogStash
|
|||
|
||||
start_time = java.lang.System.current_time_millis
|
||||
new_events = @filter.multi_filter(events)
|
||||
@metric_events.report_time(
|
||||
:duration_in_millis, java.lang.System.current_time_millis - start_time
|
||||
)
|
||||
@metric_events_time.increment(java.lang.System.current_time_millis - start_time)
|
||||
|
||||
# There is no guarantee in the context of filter
|
||||
# that EVENTS_IN == EVENTS_OUT, see the aggregates and
|
||||
# the split filter
|
||||
c = new_events.count { |event| !event.cancelled? }
|
||||
@metric_events_out.increment(:out) if c > 0
|
||||
@metric_events_out.increment(c) if c > 0
|
||||
new_events
|
||||
end
|
||||
|
||||
|
@ -64,7 +63,7 @@ module LogStash
|
|||
|
||||
# Filter plugins that does buffering or spooling of events like the
|
||||
# `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks.
|
||||
@metric_events.increment(:out, new_events.size) if new_events && new_events.size > 0
|
||||
@metric_events_out.increment(new_events.size) if new_events && new_events.size > 0
|
||||
new_events
|
||||
end
|
||||
end
|
||||
|
|
|
@ -44,6 +44,10 @@ module LogStash module Instrument
|
|||
@metric.collector
|
||||
end
|
||||
|
||||
def counter(_)
|
||||
::LogStash::Instrument::NullMetric::NullGauge
|
||||
end
|
||||
|
||||
def namespace(name)
|
||||
NamespacedNullMetric.new(metric, namespace_name + Array(name))
|
||||
end
|
||||
|
|
|
@ -39,6 +39,10 @@ module LogStash module Instrument
|
|||
end
|
||||
end
|
||||
|
||||
def counter(_)
|
||||
NullGauge
|
||||
end
|
||||
|
||||
def namespace(name)
|
||||
raise MetricNoNamespaceProvided if name.nil? || name.empty?
|
||||
NamespacedNullMetric.new(self, name)
|
||||
|
@ -49,6 +53,12 @@ module LogStash module Instrument
|
|||
end
|
||||
|
||||
private
|
||||
|
||||
class NullGauge
|
||||
def self.increment(_)
|
||||
end
|
||||
end
|
||||
|
||||
# Null implementation of the internal timer class
|
||||
#
|
||||
# @see LogStash::Instrument::TimedExecution`
|
||||
|
|
|
@ -11,8 +11,11 @@ module LogStash module Instrument
|
|||
@pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
|
||||
@plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events])
|
||||
@events_metrics_counter = @events_metrics.counter(:in)
|
||||
@events_metrics_time = @events_metrics.counter(:queue_push_duration_in_millis)
|
||||
@pipeline_metrics_counter = @pipeline_metrics.counter(:in)
|
||||
@pipeline_metrics_time = @pipeline_metrics.counter(:queue_push_duration_in_millis)
|
||||
@plugin_events_metrics_counter = @plugin_events_metrics.counter(:out)
|
||||
@plugin_events_metrics_time = @plugin_events_metrics.counter(:queue_push_duration_in_millis)
|
||||
define_initial_metrics_values
|
||||
end
|
||||
|
||||
|
@ -48,20 +51,18 @@ module LogStash module Instrument
|
|||
|
||||
def report_execution_time(start_time)
|
||||
execution_time = java.lang.System.current_time_millis - start_time
|
||||
@events_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
# Reuse the same values for all the endpoints to make sure we don't have skew in times.
|
||||
@pipeline_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
@plugin_events_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
@events_metrics_time.increment(execution_time)
|
||||
@pipeline_metrics_time.increment(execution_time)
|
||||
@plugin_events_metrics_time.increment(execution_time)
|
||||
end
|
||||
|
||||
def define_initial_metrics_values
|
||||
@events_metrics_counter.increment(0)
|
||||
@pipeline_metrics_counter.increment(0)
|
||||
@plugin_events_metrics_counter.increment(0)
|
||||
|
||||
@events_metrics.report_time(:queue_push_duration_in_millis, 0)
|
||||
@pipeline_metrics.report_time(:queue_push_duration_in_millis, 0)
|
||||
@plugin_events_metrics.report_time(:queue_push_duration_in_millis, 0)
|
||||
@events_metrics_time.increment(0)
|
||||
@pipeline_metrics_time.increment(0)
|
||||
@plugin_events_metrics_time.increment(0)
|
||||
end
|
||||
end
|
||||
end end
|
||||
|
|
|
@ -21,6 +21,7 @@ module LogStash class OutputDelegator
|
|||
@metric_events = @namespaced_metric.namespace(:events)
|
||||
@in_counter = @metric_events.counter(:in)
|
||||
@out_counter = @metric_events.counter(:out)
|
||||
@time_metric = @metric_events.counter(:duration_in_millis)
|
||||
@strategy = strategy_registry.
|
||||
class_for(self.concurrency).
|
||||
new(@logger, @output_class, @namespaced_metric, execution_context, plugin_args)
|
||||
|
@ -46,9 +47,7 @@ module LogStash class OutputDelegator
|
|||
@in_counter.increment(events.length)
|
||||
start_time = java.lang.System.current_time_millis
|
||||
@strategy.multi_receive(events)
|
||||
@metric_events.report_time(
|
||||
:duration_in_millis, java.lang.System.current_time_millis - start_time
|
||||
)
|
||||
@time_metric.increment(java.lang.System.current_time_millis - start_time)
|
||||
@out_counter.increment(events.length)
|
||||
end
|
||||
|
||||
|
|
|
@ -73,6 +73,7 @@ module LogStash; module Util
|
|||
@event_metric = metric
|
||||
@event_metric_out = @event_metric.counter(:out)
|
||||
@event_metric_filtered = @event_metric.counter(:filtered)
|
||||
@event_metric_time = @event_metric.counter(:duration_in_millis)
|
||||
define_initial_metrics_values(@event_metric)
|
||||
end
|
||||
|
||||
|
@ -80,6 +81,7 @@ module LogStash; module Util
|
|||
@pipeline_metric = metric
|
||||
@pipeline_metric_out = @pipeline_metric.counter(:out)
|
||||
@pipeline_metric_filtered = @pipeline_metric.counter(:filtered)
|
||||
@pipeline_metric_time = @pipeline_metric.counter(:duration_in_millis)
|
||||
define_initial_metrics_values(@pipeline_metric)
|
||||
end
|
||||
|
||||
|
@ -152,8 +154,8 @@ module LogStash; module Util
|
|||
# start_clock is now called at empty batch creation and an empty batch could
|
||||
# stay empty all the way down to the close_batch call.
|
||||
time_taken = java.lang.System.current_time_millis - @inflight_clocks[Thread.current]
|
||||
@event_metric.report_time(:duration_in_millis, time_taken)
|
||||
@pipeline_metric.report_time(:duration_in_millis, time_taken)
|
||||
@event_metric_time.increment(time_taken)
|
||||
@pipeline_metric_time.increment(time_taken)
|
||||
end
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
|
|
|
@ -7,6 +7,12 @@ require "logstash/execution_context"
|
|||
require "support/shared_contexts"
|
||||
|
||||
describe LogStash::FilterDelegator do
|
||||
|
||||
class MockGauge
|
||||
def increment(_)
|
||||
end
|
||||
end
|
||||
|
||||
include_context "execution_context"
|
||||
|
||||
let(:logger) { double(:logger) }
|
||||
|
@ -15,12 +21,18 @@ describe LogStash::FilterDelegator do
|
|||
{ "host" => "127.0.0.1", "id" => filter_id }
|
||||
end
|
||||
let(:collector) { [] }
|
||||
let(:counter_in) { MockGauge.new }
|
||||
let(:counter_out) { MockGauge.new }
|
||||
let(:counter_time) { MockGauge.new }
|
||||
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) }
|
||||
let(:events) { [LogStash::Event.new, LogStash::Event.new] }
|
||||
|
||||
before :each do
|
||||
allow(pipeline).to receive(:id).and_return(pipeline_id)
|
||||
allow(metric).to receive(:namespace).with(anything).and_return(metric)
|
||||
allow(metric).to receive(:counter).with(:in).and_return(counter_in)
|
||||
allow(metric).to receive(:counter).with(:out).and_return(counter_out)
|
||||
allow(metric).to receive(:counter).with(:duration_in_millis).and_return(counter_time)
|
||||
end
|
||||
|
||||
let(:plugin_klass) do
|
||||
|
@ -60,7 +72,7 @@ describe LogStash::FilterDelegator do
|
|||
context "when the flush return events" do
|
||||
it "increments the out" do
|
||||
subject.multi_filter([LogStash::Event.new])
|
||||
expect(metric).to receive(:increment).with(:out, 1)
|
||||
expect(counter_out).to receive(:increment).with(1)
|
||||
subject.flush({})
|
||||
end
|
||||
end
|
||||
|
@ -78,12 +90,12 @@ describe LogStash::FilterDelegator do
|
|||
end
|
||||
|
||||
it "has incremented :in" do
|
||||
expect(metric).to receive(:increment).with(:in, events.size)
|
||||
expect(counter_in).to receive(:increment).with(events.size)
|
||||
subject.multi_filter(events)
|
||||
end
|
||||
|
||||
it "has not incremented :out" do
|
||||
expect(metric).not_to receive(:increment).with(:out, anything)
|
||||
expect(counter_out).not_to receive(:increment).with(anything)
|
||||
subject.multi_filter(events)
|
||||
end
|
||||
end
|
||||
|
@ -109,8 +121,8 @@ describe LogStash::FilterDelegator do
|
|||
end
|
||||
|
||||
it "increments the in/out of the metric" do
|
||||
expect(metric).to receive(:increment).with(:in, events.size)
|
||||
expect(metric).to receive(:increment).with(:out, events.size * 2)
|
||||
expect(counter_in).to receive(:increment).with(events.size)
|
||||
expect(counter_out).to receive(:increment).with(events.size * 2)
|
||||
|
||||
subject.multi_filter(events)
|
||||
end
|
||||
|
@ -138,8 +150,8 @@ describe LogStash::FilterDelegator do
|
|||
end
|
||||
|
||||
it "increments the in/out of the metric" do
|
||||
expect(metric).to receive(:increment).with(:in, events.size)
|
||||
expect(metric).to receive(:increment).with(:out, events.size)
|
||||
expect(counter_in).to receive(:increment).with(events.size)
|
||||
expect(counter_out).to receive(:increment).with(events.size)
|
||||
|
||||
subject.multi_filter(events)
|
||||
end
|
||||
|
|
|
@ -5,10 +5,19 @@ require "spec_helper"
|
|||
require "support/shared_contexts"
|
||||
|
||||
describe LogStash::OutputDelegator do
|
||||
|
||||
class MockGauge
|
||||
def increment(_)
|
||||
end
|
||||
end
|
||||
|
||||
let(:logger) { double("logger") }
|
||||
let(:events) { 7.times.map { LogStash::Event.new }}
|
||||
let(:plugin_args) { {"id" => "foo", "arg1" => "val1"} }
|
||||
let(:collector) { [] }
|
||||
let(:counter_in) { MockGauge.new }
|
||||
let(:counter_out) { MockGauge.new }
|
||||
let(:counter_time) { MockGauge.new }
|
||||
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) }
|
||||
|
||||
include_context "execution_context"
|
||||
|
@ -23,6 +32,9 @@ describe LogStash::OutputDelegator do
|
|||
before(:each) do
|
||||
# use the same metric instance
|
||||
allow(metric).to receive(:namespace).with(any_args).and_return(metric)
|
||||
allow(metric).to receive(:counter).with(:in).and_return(counter_in)
|
||||
allow(metric).to receive(:counter).with(:out).and_return(counter_out)
|
||||
allow(metric).to receive(:counter).with(:duration_in_millis).and_return(counter_time)
|
||||
|
||||
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
|
||||
allow(out_klass).to receive(:name).and_return("example")
|
||||
|
@ -58,13 +70,13 @@ describe LogStash::OutputDelegator do
|
|||
end
|
||||
|
||||
it "should increment the number of events received" do
|
||||
expect(subject.metric_events).to receive(:increment).with(:in, events.length)
|
||||
expect(subject.metric_events).to receive(:increment).with(:out, events.length)
|
||||
expect(counter_in).to receive(:increment).with(events.length)
|
||||
expect(counter_out).to receive(:increment).with(events.length)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
|
||||
it "should record the `duration_in_millis`" do
|
||||
expect(subject.metric_events).to receive(:report_time).with(:duration_in_millis, Integer)
|
||||
expect(counter_time).to receive(:increment).with(Integer)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue