Initialize with default values global events and pipeline events related metric

The metric store has no concept is a metric need to exist so as a rule
of thumb we need to defined them with 0 values and send them to the
store when we initialize something.

This PR make sure the batch object is recording the right default values

Fixes: #6449

Fixes #6450
This commit is contained in:
Pier-Hugues Pellerin 2016-12-20 14:44:39 -05:00
parent 08cebd5c16
commit 13599ca64a
3 changed files with 37 additions and 4 deletions

View file

@ -132,10 +132,19 @@ module LogStash; module Util
def set_events_metric(metric)
@event_metric = metric
define_initial_metrics_values(@event_metric)
end
def set_pipeline_metric(metric)
@pipeline_metric = metric
define_initial_metrics_values(@pipeline_metric)
end
def define_initial_metrics_values(namespaced_metric)
namespaced_metric.gauge(:duration_in_millis, 0)
namespaced_metric.increment(:filtered, 0)
namespaced_metric.increment(:in, 0)
namespaced_metric.increment(:out, 0)
end
def inflight_batches

View file

@ -79,10 +79,19 @@ module LogStash; module Util
def set_events_metric(metric)
@event_metric = metric
define_initial_metrics_values(@event_metric)
end
def set_pipeline_metric(metric)
@pipeline_metric = metric
define_initial_metrics_values(@pipeline_metric)
end
def define_initial_metrics_values(namespaced_metric)
namespaced_metric.gauge(:duration_in_millis, 0)
namespaced_metric.increment(:filtered, 0)
namespaced_metric.increment(:in, 0)
namespaced_metric.increment(:out, 0)
end
def inflight_batches

View file

@ -63,7 +63,15 @@ describe LogStash::Util::WrappedSynchronousQueue do
batch = read_client.take_batch
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
expect(store.size).to eq(0)
expect(store.get_shallow(:events, :in).value).to eq(0)
expect(store.get_shallow(:events, :out).value).to eq(0)
expect(store.get_shallow(:events, :filtered).value).to eq(0)
expect(store.get_shallow(:events, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:pipeline, :in).value).to eq(0)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:pipeline, :out).value).to eq(0)
expect(store.get_shallow(:pipeline, :filtered).value).to eq(0)
end
end
@ -73,15 +81,22 @@ describe LogStash::Util::WrappedSynchronousQueue do
5.times {|i| batch.push("value-#{i}")}
write_client.push_batch(batch)
read_batch = read_client.take_batch
sleep(0.1) # simulate some work?
read_client.close_batch(batch)
sleep(0.1) # simulate some work for the `duration_in_millis`
# TODO: this interaction should be cleaned in an upcoming PR,
# This is what the current pipeline does.
read_client.add_filtered_metrics(read_batch)
read_client.add_output_metrics(read_batch)
read_client.close_batch(read_batch)
store = collector.snapshot_metric.metric_store
expect(store.size).to eq(4)
expect(store.get_shallow(:events, :in).value).to eq(5)
expect(store.get_shallow(:events, :out).value).to eq(5)
expect(store.get_shallow(:events, :filtered).value).to eq(5)
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :out).value).to eq(5)
expect(store.get_shallow(:pipeline, :filtered).value).to eq(5)
end
end
end