mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Initialize with default values global events and pipeline events related metric
The metric store has no concept is a metric need to exist so as a rule of thumb we need to defined them with 0 values and send them to the store when we initialize something. This PR make sure the batch object is recording the right default values Fixes: #6449 Fixes #6450
This commit is contained in:
parent
90c364e903
commit
ad763c81a3
3 changed files with 37 additions and 4 deletions
|
@ -132,10 +132,19 @@ module LogStash; module Util
|
||||||
|
|
||||||
def set_events_metric(metric)
|
def set_events_metric(metric)
|
||||||
@event_metric = metric
|
@event_metric = metric
|
||||||
|
define_initial_metrics_values(@event_metric)
|
||||||
end
|
end
|
||||||
|
|
||||||
def set_pipeline_metric(metric)
|
def set_pipeline_metric(metric)
|
||||||
@pipeline_metric = metric
|
@pipeline_metric = metric
|
||||||
|
define_initial_metrics_values(@pipeline_metric)
|
||||||
|
end
|
||||||
|
|
||||||
|
def define_initial_metrics_values(namespaced_metric)
|
||||||
|
namespaced_metric.gauge(:duration_in_millis, 0)
|
||||||
|
namespaced_metric.increment(:filtered, 0)
|
||||||
|
namespaced_metric.increment(:in, 0)
|
||||||
|
namespaced_metric.increment(:out, 0)
|
||||||
end
|
end
|
||||||
|
|
||||||
def inflight_batches
|
def inflight_batches
|
||||||
|
|
|
@ -79,10 +79,19 @@ module LogStash; module Util
|
||||||
|
|
||||||
def set_events_metric(metric)
|
def set_events_metric(metric)
|
||||||
@event_metric = metric
|
@event_metric = metric
|
||||||
|
define_initial_metrics_values(@event_metric)
|
||||||
end
|
end
|
||||||
|
|
||||||
def set_pipeline_metric(metric)
|
def set_pipeline_metric(metric)
|
||||||
@pipeline_metric = metric
|
@pipeline_metric = metric
|
||||||
|
define_initial_metrics_values(@pipeline_metric)
|
||||||
|
end
|
||||||
|
|
||||||
|
def define_initial_metrics_values(namespaced_metric)
|
||||||
|
namespaced_metric.gauge(:duration_in_millis, 0)
|
||||||
|
namespaced_metric.increment(:filtered, 0)
|
||||||
|
namespaced_metric.increment(:in, 0)
|
||||||
|
namespaced_metric.increment(:out, 0)
|
||||||
end
|
end
|
||||||
|
|
||||||
def inflight_batches
|
def inflight_batches
|
||||||
|
|
|
@ -63,7 +63,15 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
||||||
batch = read_client.take_batch
|
batch = read_client.take_batch
|
||||||
read_client.close_batch(batch)
|
read_client.close_batch(batch)
|
||||||
store = collector.snapshot_metric.metric_store
|
store = collector.snapshot_metric.metric_store
|
||||||
expect(store.size).to eq(0)
|
|
||||||
|
expect(store.get_shallow(:events, :in).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:events, :out).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:events, :filtered).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:events, :duration_in_millis).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:pipeline, :in).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:pipeline, :out).value).to eq(0)
|
||||||
|
expect(store.get_shallow(:pipeline, :filtered).value).to eq(0)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -73,15 +81,22 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
||||||
5.times {|i| batch.push("value-#{i}")}
|
5.times {|i| batch.push("value-#{i}")}
|
||||||
write_client.push_batch(batch)
|
write_client.push_batch(batch)
|
||||||
read_batch = read_client.take_batch
|
read_batch = read_client.take_batch
|
||||||
sleep(0.1) # simulate some work?
|
sleep(0.1) # simulate some work for the `duration_in_millis`
|
||||||
read_client.close_batch(batch)
|
# TODO: this interaction should be cleaned in an upcoming PR,
|
||||||
|
# This is what the current pipeline does.
|
||||||
|
read_client.add_filtered_metrics(read_batch)
|
||||||
|
read_client.add_output_metrics(read_batch)
|
||||||
|
read_client.close_batch(read_batch)
|
||||||
store = collector.snapshot_metric.metric_store
|
store = collector.snapshot_metric.metric_store
|
||||||
|
|
||||||
expect(store.size).to eq(4)
|
|
||||||
expect(store.get_shallow(:events, :in).value).to eq(5)
|
expect(store.get_shallow(:events, :in).value).to eq(5)
|
||||||
|
expect(store.get_shallow(:events, :out).value).to eq(5)
|
||||||
|
expect(store.get_shallow(:events, :filtered).value).to eq(5)
|
||||||
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
|
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
|
||||||
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
|
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
|
||||||
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
|
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
|
||||||
|
expect(store.get_shallow(:pipeline, :out).value).to eq(5)
|
||||||
|
expect(store.get_shallow(:pipeline, :filtered).value).to eq(5)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue