Record the events.in related statistic in the right place

This PR changes where the `events.in` are calculated, previously the
values were calculated in the `ReadClient` which was fine before the
addition of the PQ, but this make the stats not accurate when the PQ was
enabled and the producer are a lot faster than the consumer.

These commits change the collection of the metric inside an
instrumented `WriteClient` so both implementation of the client queues will use
the same code.

This also make possible to record `events.out` for every inputs and the
time waiting to push to the queue.

The API is now exposing theses values for each plugins, the events level
and and the pipeline.

Using a pipeline with a sleep filter and PQ we will see this kind of
response from the API.

```json
{
  "duration_in_millis": 438624,
  "in": 3011436,
  "filtered": 2189,
  "out": 2189,
  "queue_push_duration_in_millis": 49845
}
```

Fixes: #6512

Fixes #6532
This commit is contained in:
Pier-Hugues Pellerin 2017-01-13 10:01:48 -05:00
parent 88eb425b00
commit aa2a97f6f6
12 changed files with 271 additions and 84 deletions

View file

@ -96,7 +96,9 @@ module LogStash module Instrument
end
def stop
@metric.report_time(@namespace, @key, (MILLISECONDS * (Time.now - @start_time)).to_i)
execution_time = (MILLISECONDS * (Time.now - @start_time)).to_i
@metric.report_time(@namespace, @key, execution_time)
execution_time
end
end
end

View file

@ -54,6 +54,7 @@ module LogStash module Instrument
# @see LogStash::Instrument::TimedExecution`
class NullTimedExecution
def self.stop
0
end
end
end

View file

@ -0,0 +1,59 @@
# encoding: utf-8
module LogStash module Instrument
class WrappedWriteClient
def initialize(write_client, pipeline, metric, plugin)
@write_client = write_client
pipeline_id = pipeline.pipeline_id.to_s.to_sym
plugin_type = "#{plugin.class.plugin_type}s".to_sym
@events_metrics = metric.namespace([:stats, :events])
@pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
@plugin_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym])
define_initial_metrics_values
end
def get_new_batch
@write_client.get_new_batch
end
def push(event)
record_metric { @write_client.push(event) }
end
alias_method(:<<, :push)
def push_batch(batch)
record_metric(batch.size) { @write_client.push_batch(batch) }
end
private
def record_metric(size = 1)
@events_metrics.increment(:in, size)
@pipeline_metrics.increment(:in, size)
@plugin_metrics.increment(:out, size)
clock = @events_metrics.time(:queue_push_duration_in_millis)
result = yield
# Reuse the same values for all the endpoints to make sure we don't have skew in times.
execution_time = clock.stop
@pipeline_metrics.report_time(:queue_push_duration_in_millis, execution_time)
@plugin_metrics.report_time(:queue_push_duration_in_millis, execution_time)
result
end
def define_initial_metrics_values
@events_metrics.increment(:in, 0)
@pipeline_metrics.increment(:in, 0)
@plugin_metrics.increment(:out, 0)
@events_metrics.report_time(:queue_push_duration_in_millis, 0)
@pipeline_metrics.report_time(:queue_push_duration_in_millis, 0)
@plugin_metrics.report_time(:queue_push_duration_in_millis, 0)
end
end
end end

View file

@ -17,6 +17,7 @@ require "logstash/instrument/namespaced_metric"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/instrument/wrapped_write_client"
require "logstash/output_delegator"
require "logstash/filter_delegator"
require "logstash/queue_factory"
@ -429,7 +430,8 @@ module LogStash; class Pipeline < BasePipeline
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
begin
plugin.run(@input_queue_client)
input_queue_client = wrapped_write_client(plugin)
plugin.run(input_queue_client)
rescue => e
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
@ -617,4 +619,8 @@ module LogStash; class Pipeline < BasePipeline
def draining_queue?
@drain_queue ? !@filter_queue_client.empty? : false
end
def wrapped_write_client(plugin)
LogStash::Instrument::WrappedWriteClient.new(@input_queue_client, self, metric, plugin)
end
end; end

View file

@ -148,7 +148,6 @@ module LogStash; module Util
def define_initial_metrics_values(namespaced_metric)
namespaced_metric.report_time(:duration_in_millis, 0)
namespaced_metric.increment(:filtered, 0)
namespaced_metric.increment(:in, 0)
namespaced_metric.increment(:out, 0)
end
@ -182,7 +181,6 @@ module LogStash; module Util
def start_metrics(batch)
@mutex.synchronize do
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)
start_clock
end
@ -372,6 +370,10 @@ module LogStash; module Util
@events = []
end
def size
@events.size
end
def push(event)
@events.push(event)
end

View file

@ -94,7 +94,6 @@ module LogStash; module Util
def define_initial_metrics_values(namespaced_metric)
namespaced_metric.report_time(:duration_in_millis, 0)
namespaced_metric.increment(:filtered, 0)
namespaced_metric.increment(:in, 0)
namespaced_metric.increment(:out, 0)
end
@ -124,7 +123,6 @@ module LogStash; module Util
def start_metrics(batch)
@mutex.synchronize do
# there seems to be concurrency issues with metrics, keep it in the mutex
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch)
start_clock
end
@ -161,11 +159,6 @@ module LogStash; module Util
end
end
def add_starting_metrics(batch)
@event_metric.increment(:in, batch.starting_size)
@pipeline_metric.increment(:in, batch.starting_size)
end
def add_filtered_metrics(batch)
@event_metric.increment(:filtered, batch.filtered_size)
@pipeline_metric.increment(:filtered, batch.filtered_size)
@ -293,6 +286,10 @@ module LogStash; module Util
@events = []
end
def size
@events.size
end
def push(event)
@events.push(event)
end

View file

@ -78,7 +78,8 @@ describe LogStash::Api::Modules::NodeStats do
"duration_in_millis" => Numeric,
"in" => Numeric,
"filtered" => Numeric,
"out" => Numeric
"out" => Numeric,
"queue_push_duration_in_millis" => Numeric
}
},
"reloads" => {

View file

@ -85,8 +85,9 @@ describe LogStash::Instrument::Metric do
it "return a TimedExecution" do
execution = subject.time(:root, :duration_ms)
sleep(sleep_time)
execution.stop
execution_time = execution.stop
expect(execution_time).to eq(collector.last)
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 0.1)
expect(collector[0]).to match(:root)
expect(collector[1]).to be(:duration_ms)

View file

@ -78,8 +78,9 @@ describe LogStash::Instrument::NamespacedMetric do
it "return a TimedExecution" do
execution = subject.time(:duration_ms)
sleep(sleep_time)
execution.stop
execution_time = execution.stop
expect(execution_time).to eq(collector.last)
expect(collector.last).to be_within(sleep_time_ms).of(sleep_time_ms + 0.1)
expect(collector[0]).to match([:root])
expect(collector[1]).to be(:duration_ms)

View file

@ -0,0 +1,113 @@
# encoding: utf-8
require "logstash/instrument/metric"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/event"
require_relative "../../support/mocks_classes"
require "spec_helper"
describe LogStash::Instrument::WrappedWriteClient do
let(:write_client) { queue.write_client }
let(:read_client) { queue.read_client }
let(:pipeline) { double("pipeline", :pipeline_id => :main) }
let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
let(:plugin) { LogStash::Inputs::DummyInput.new({ "id" => myid }) }
let(:event) { LogStash::Event.new }
let(:myid) { "1234myid" }
subject { described_class.new(write_client, pipeline, metric, plugin) }
shared_examples "queue tests" do
it "pushes single event to the `WriteClient`" do
t = Thread.new do
subject.push(event)
end
sleep(0.01) while !t.status
expect(read_client.read_batch.size).to eq(1)
t.kill rescue nil
end
it "pushes batch to the `WriteClient`" do
batch = write_client.get_new_batch
batch << event
t = Thread.new do
subject.push_batch(batch)
end
sleep(0.01) while !t.status
expect(read_client.read_batch.size).to eq(1)
t.kill rescue nil
end
context "recorded metrics" do
before do
t = Thread.new do
subject.push(event)
end
sleep(0.01) while !t.status
sleep(0.250) # make it block for some time, so duration isn't 0
read_client.read_batch.size
t.kill rescue nil
end
let(:snapshot_store) { collector.snapshot_metric.metric_store }
let(:snapshot_metric) { snapshot_store.get_shallow(:stats) }
it "records instance level events `in`" do
expect(snapshot_metric[:events][:in].value).to eq(1)
end
it "records pipeline level `in`" do
expect(snapshot_metric[:pipelines][:main][:events][:in].value).to eq(1)
end
it "record input `out`" do
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid.to_sym][:out].value).to eq(1)
end
context "recording of the duration of pushing to the queue" do
it "records at the `global events` level" do
expect(snapshot_metric[:events][:queue_push_duration_in_millis].value).to be_kind_of(Integer)
end
it "records at the `pipeline` level" do
expect(snapshot_metric[:pipelines][:main][:events][:queue_push_duration_in_millis].value).to be_kind_of(Integer)
end
it "records at the `plugin level" do
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid.to_sym][:queue_push_duration_in_millis].value).to be_kind_of(Integer)
end
end
end
end
context "WrappedSynchronousQueue" do
let(:queue) { LogStash::Util::WrappedSynchronousQueue.new }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
read_client.set_pipeline_metric(metric.namespace([:stats, :pipelines, :main, :events]))
end
include_examples "queue tests"
end
context "AckedMemoryQueue" do
let(:queue) { LogStash::Util::WrappedAckedQueue.create_memory_based("", 1024, 10, 1024) }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))
read_client.set_pipeline_metric(metric.namespace([:stats, :pipelines, :main, :events]))
end
after do
queue.close
end
include_examples "queue tests"
end
end

View file

@ -64,9 +64,6 @@ describe LogStash::Util::WrappedSynchronousQueue do
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
expect(store.get_shallow(:events, :in).value).to eq(0)
expect(store.get_shallow(:events, :in)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:events, :out).value).to eq(0)
expect(store.get_shallow(:events, :out)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
@ -76,9 +73,6 @@ describe LogStash::Util::WrappedSynchronousQueue do
expect(store.get_shallow(:events, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:events, :duration_in_millis)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:pipeline, :in).value).to eq(0)
expect(store.get_shallow(:pipeline, :in)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to eq(0)
expect(store.get_shallow(:pipeline, :duration_in_millis)).to be_kind_of(LogStash::Instrument::MetricType::Counter)
@ -105,11 +99,9 @@ describe LogStash::Util::WrappedSynchronousQueue do
read_client.close_batch(read_batch)
store = collector.snapshot_metric.metric_store
expect(store.get_shallow(:events, :in).value).to eq(5)
expect(store.get_shallow(:events, :out).value).to eq(5)
expect(store.get_shallow(:events, :filtered).value).to eq(5)
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :out).value).to eq(5)
expect(store.get_shallow(:pipeline, :filtered).value).to eq(5)

View file

@ -1,79 +1,91 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/inputs/base"
require "thread"
module LogStash module Outputs
class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
milestone 2
module LogStash
module Inputs
class DummyInput < LogStash::Inputs::Base
config_name "dummyinput"
attr_reader :num_closes, :events
def initialize(params={})
super
@num_closes = 0
@events = []
end
def register
end
def receive(event)
@events << event
end
def close
@num_closes += 1
def run(queue)
# noop
end
end
end
module Outputs
class DummyOutput < LogStash::Outputs::Base
config_name "dummyoutput"
milestone 2
class DummyOutputWithEventsArray < LogStash::Outputs::Base
config_name "dummyoutput2"
milestone 2
attr_reader :num_closes, :events
attr_reader :events
def initialize(params={})
super
@num_closes = 0
@events = []
end
def initialize(params={})
super
@events = []
def register
end
def receive(event)
@events << event
end
def close
@num_closes += 1
end
end
def register
class DummyOutputWithEventsArray < LogStash::Outputs::Base
config_name "dummyoutput2"
milestone 2
attr_reader :events
def initialize(params={})
super
@events = []
end
def register
end
def receive(event)
@events << event
end
def close
end
end
def receive(event)
@events << event
end
class DroppingDummyOutput < LogStash::Outputs::Base
config_name "droppingdummyoutput"
milestone 2
def close
attr_reader :num_closes
def initialize(params={})
super
@num_closes = 0
@events_received = Concurrent::AtomicFixnum.new(0)
end
def register
end
def receive(event)
@events_received.increment
end
def events_received
@events_received.value
end
def close
@num_closes = 1
end
end
end
class DroppingDummyOutput < LogStash::Outputs::Base
config_name "droppingdummyoutput"
milestone 2
attr_reader :num_closes
def initialize(params={})
super
@num_closes = 0
@events_received = Concurrent::AtomicFixnum.new(0)
end
def register
end
def receive(event)
@events_received.increment
end
def events_received
@events_received.value
end
def close
@num_closes = 1
end
end
end end
end