Correctly calculate the time spend in the filter + output block

This PR fix an issue where the time was calculated but no work was done
on the event. This code make sure we have at least one event to start
recording the time spend.

This was causing the `events/duratin_in_millis` to not be in sync with
the time spend on the filtera, since `take_batch` was called in a tight
loop and could return an empty array this made the duration was way off.

Fixes: #5952

Fixes #5953
This commit is contained in:
Pier-Hugues Pellerin 2016-09-20 15:11:40 -04:00 committed by Suyog Rao
parent 789970d5ab
commit b040bf9470
4 changed files with 62 additions and 8 deletions

View file

@ -188,6 +188,10 @@ module LogStash module Instrument
end end
end end
def size
@fast_lookup.size
end
private private
def get_all def get_all
@fast_lookup.values @fast_lookup.values

View file

@ -90,9 +90,14 @@ module LogStash; module Util
def take_batch def take_batch
@mutex.synchronize do @mutex.synchronize do
batch = ReadBatch.new(@queue, @batch_size, @wait_for) batch = ReadBatch.new(@queue, @batch_size, @wait_for)
add_starting_metrics(batch)
set_current_thread_inflight_batch(batch) set_current_thread_inflight_batch(batch)
start_clock
# We dont actually have any events to work on so lets
# not bother with recording metrics for them
if batch.size > 0
add_starting_metrics(batch)
start_clock
end
batch batch
end end
end end
@ -116,8 +121,10 @@ module LogStash; module Util
end end
def stop_clock def stop_clock
@inflight_clocks[Thread.current].each(&:stop) unless @inflight_clocks[Thread.current].nil?
@inflight_clocks.delete(Thread.current) @inflight_clocks[Thread.current].each(&:stop)
@inflight_clocks.delete(Thread.current)
end
end end
def add_starting_metrics(batch) def add_starting_metrics(batch)

View file

@ -203,6 +203,12 @@ describe LogStash::Instrument::MetricStore do
end end
end end
describe "#size" do
it "returns the number of unique metrics" do
expect(subject.size).to eq(metric_events.size)
end
end
describe "#each" do describe "#each" do
it "retrieves all the metric" do it "retrieves all the metric" do
expect(subject.each.size).to eq(metric_events.size) expect(subject.each.size).to eq(metric_events.size)

View file

@ -1,6 +1,7 @@
# encoding: utf-8 # encoding: utf-8
require "spec_helper" require "spec_helper"
require "logstash/util/wrapped_synchronous_queue" require "logstash/util/wrapped_synchronous_queue"
require "logstash/instrument/collector"
describe LogStash::Util::WrappedSynchronousQueue do describe LogStash::Util::WrappedSynchronousQueue do
context "#offer" do context "#offer" do
@ -45,11 +46,47 @@ describe LogStash::Util::WrappedSynchronousQueue do
end end
describe "WriteClient | ReadClient" do describe "WriteClient | ReadClient" do
context "when writing to the queue" do let(:queue) { DummyQueue.new }
let(:queue) { DummyQueue.new } let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)}
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)} let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)}
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)}
context "when reading from the queue" do
let(:collector) { LogStash::Instrument::Collector.new }
before do
read_client.set_events_metric(LogStash::Instrument::Metric.new(collector).namespace(:events))
read_client.set_pipeline_metric(LogStash::Instrument::Metric.new(collector).namespace(:pipeline))
end
context "when the queue is empty" do
it "doesnt record the `duration_in_millis`" do
batch = read_client.take_batch
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
expect(store.size).to eq(0)
end
end
context "when we have item in the queue" do
it "records the `duration_in_millis`" do
batch = write_client.get_new_batch
5.times {|i| batch.push("value-#{i}")}
write_client.push_batch(batch)
read_batch = read_client.take_batch
sleep(0.1) # simulate some work?
read_client.close_batch(batch)
store = collector.snapshot_metric.metric_store
expect(store.size).to eq(4)
expect(store.get_shallow(:events, :in).value).to eq(5)
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
end
end
end
context "when writing to the queue" do
before :each do before :each do
read_client.set_events_metric(LogStash::Instrument::NullMetric.new) read_client.set_events_metric(LogStash::Instrument::NullMetric.new)
read_client.set_pipeline_metric(LogStash::Instrument::NullMetric.new) read_client.set_pipeline_metric(LogStash::Instrument::NullMetric.new)