diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb index e9c9e817d..e59d5c8e1 100644 --- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb @@ -132,10 +132,19 @@ module LogStash; module Util def set_events_metric(metric) @event_metric = metric + define_initial_metrics_values(@event_metric) end def set_pipeline_metric(metric) @pipeline_metric = metric + define_initial_metrics_values(@pipeline_metric) + end + + def define_initial_metrics_values(namespaced_metric) + namespaced_metric.gauge(:duration_in_millis, 0) + namespaced_metric.increment(:filtered, 0) + namespaced_metric.increment(:in, 0) + namespaced_metric.increment(:out, 0) end def inflight_batches diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb index 98503d960..af32b9207 100644 --- a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -79,10 +79,19 @@ module LogStash; module Util def set_events_metric(metric) @event_metric = metric + define_initial_metrics_values(@event_metric) end def set_pipeline_metric(metric) @pipeline_metric = metric + define_initial_metrics_values(@pipeline_metric) + end + + def define_initial_metrics_values(namespaced_metric) + namespaced_metric.gauge(:duration_in_millis, 0) + namespaced_metric.increment(:filtered, 0) + namespaced_metric.increment(:in, 0) + namespaced_metric.increment(:out, 0) end def inflight_batches diff --git a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb index d7b403ed0..8571fc673 100644 --- a/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb +++ b/logstash-core/spec/logstash/util/wrapped_synchronous_queue_spec.rb @@ -63,7 +63,15 @@ describe LogStash::Util::WrappedSynchronousQueue do batch = read_client.take_batch read_client.close_batch(batch) store = collector.snapshot_metric.metric_store - expect(store.size).to eq(0) + + expect(store.get_shallow(:events, :in).value).to eq(0) + expect(store.get_shallow(:events, :out).value).to eq(0) + expect(store.get_shallow(:events, :filtered).value).to eq(0) + expect(store.get_shallow(:events, :duration_in_millis).value).to eq(0) + expect(store.get_shallow(:pipeline, :in).value).to eq(0) + expect(store.get_shallow(:pipeline, :duration_in_millis).value).to eq(0) + expect(store.get_shallow(:pipeline, :out).value).to eq(0) + expect(store.get_shallow(:pipeline, :filtered).value).to eq(0) end end @@ -73,15 +81,22 @@ describe LogStash::Util::WrappedSynchronousQueue do 5.times {|i| batch.push("value-#{i}")} write_client.push_batch(batch) read_batch = read_client.take_batch - sleep(0.1) # simulate some work? - read_client.close_batch(batch) + sleep(0.1) # simulate some work for the `duration_in_millis` + # TODO: this interaction should be cleaned in an upcoming PR, + # This is what the current pipeline does. + read_client.add_filtered_metrics(read_batch) + read_client.add_output_metrics(read_batch) + read_client.close_batch(read_batch) store = collector.snapshot_metric.metric_store - expect(store.size).to eq(4) expect(store.get_shallow(:events, :in).value).to eq(5) + expect(store.get_shallow(:events, :out).value).to eq(5) + expect(store.get_shallow(:events, :filtered).value).to eq(5) expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0 expect(store.get_shallow(:pipeline, :in).value).to eq(5) expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0 + expect(store.get_shallow(:pipeline, :out).value).to eq(5) + expect(store.get_shallow(:pipeline, :filtered).value).to eq(5) end end end