mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
Correctly calculate the time spend in the filter + output block
This PR fix an issue where the time was calculated but no work was done on the event. This code make sure we have at least one event to start recording the time spend. This was causing the `events/duratin_in_millis` to not be in sync with the time spend on the filtera, since `take_batch` was called in a tight loop and could return an empty array this made the duration was way off. Fixes: #5952 Fixes #5953
This commit is contained in:
parent
5fddbce449
commit
cfd39c377e
4 changed files with 62 additions and 8 deletions
|
@ -188,6 +188,10 @@ module LogStash module Instrument
|
|||
end
|
||||
end
|
||||
|
||||
def size
|
||||
@fast_lookup.size
|
||||
end
|
||||
|
||||
private
|
||||
def get_all
|
||||
@fast_lookup.values
|
||||
|
|
|
@ -90,9 +90,14 @@ module LogStash; module Util
|
|||
def take_batch
|
||||
@mutex.synchronize do
|
||||
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
|
||||
add_starting_metrics(batch)
|
||||
set_current_thread_inflight_batch(batch)
|
||||
start_clock
|
||||
|
||||
# We dont actually have any events to work on so lets
|
||||
# not bother with recording metrics for them
|
||||
if batch.size > 0
|
||||
add_starting_metrics(batch)
|
||||
start_clock
|
||||
end
|
||||
batch
|
||||
end
|
||||
end
|
||||
|
@ -116,8 +121,10 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def stop_clock
|
||||
@inflight_clocks[Thread.current].each(&:stop)
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
unless @inflight_clocks[Thread.current].nil?
|
||||
@inflight_clocks[Thread.current].each(&:stop)
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
end
|
||||
|
||||
def add_starting_metrics(batch)
|
||||
|
|
|
@ -203,6 +203,12 @@ describe LogStash::Instrument::MetricStore do
|
|||
end
|
||||
end
|
||||
|
||||
describe "#size" do
|
||||
it "returns the number of unique metrics" do
|
||||
expect(subject.size).to eq(metric_events.size)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#each" do
|
||||
it "retrieves all the metric" do
|
||||
expect(subject.each.size).to eq(metric_events.size)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/util/wrapped_synchronous_queue"
|
||||
require "logstash/instrument/collector"
|
||||
|
||||
describe LogStash::Util::WrappedSynchronousQueue do
|
||||
context "#offer" do
|
||||
|
@ -45,11 +46,47 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
|||
end
|
||||
|
||||
describe "WriteClient | ReadClient" do
|
||||
context "when writing to the queue" do
|
||||
let(:queue) { DummyQueue.new }
|
||||
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)}
|
||||
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)}
|
||||
let(:queue) { DummyQueue.new }
|
||||
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(queue)}
|
||||
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(queue)}
|
||||
|
||||
context "when reading from the queue" do
|
||||
let(:collector) { LogStash::Instrument::Collector.new }
|
||||
|
||||
before do
|
||||
read_client.set_events_metric(LogStash::Instrument::Metric.new(collector).namespace(:events))
|
||||
read_client.set_pipeline_metric(LogStash::Instrument::Metric.new(collector).namespace(:pipeline))
|
||||
end
|
||||
|
||||
context "when the queue is empty" do
|
||||
it "doesnt record the `duration_in_millis`" do
|
||||
batch = read_client.take_batch
|
||||
read_client.close_batch(batch)
|
||||
store = collector.snapshot_metric.metric_store
|
||||
expect(store.size).to eq(0)
|
||||
end
|
||||
end
|
||||
|
||||
context "when we have item in the queue" do
|
||||
it "records the `duration_in_millis`" do
|
||||
batch = write_client.get_new_batch
|
||||
5.times {|i| batch.push("value-#{i}")}
|
||||
write_client.push_batch(batch)
|
||||
read_batch = read_client.take_batch
|
||||
sleep(0.1) # simulate some work?
|
||||
read_client.close_batch(batch)
|
||||
store = collector.snapshot_metric.metric_store
|
||||
|
||||
expect(store.size).to eq(4)
|
||||
expect(store.get_shallow(:events, :in).value).to eq(5)
|
||||
expect(store.get_shallow(:events, :duration_in_millis).value).to be > 0
|
||||
expect(store.get_shallow(:pipeline, :in).value).to eq(5)
|
||||
expect(store.get_shallow(:pipeline, :duration_in_millis).value).to be > 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when writing to the queue" do
|
||||
before :each do
|
||||
read_client.set_events_metric(LogStash::Instrument::NullMetric.new)
|
||||
read_client.set_pipeline_metric(LogStash::Instrument::NullMetric.new)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue