mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
PERFORMANCE: Don't use slow metric clocks where avoidable
Fixes #7919 Fixes #7946
This commit is contained in:
parent
d3083de43c
commit
a1457df649
6 changed files with 35 additions and 29 deletions
|
@ -41,9 +41,11 @@ module LogStash
|
|||
def multi_filter(events)
|
||||
@metric_events_in.increment(events.size)
|
||||
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
start_time = java.lang.System.current_time_millis
|
||||
new_events = @filter.multi_filter(events)
|
||||
clock.stop
|
||||
@metric_events.report_time(
|
||||
:duration_in_millis, java.lang.System.current_time_millis - start_time
|
||||
)
|
||||
|
||||
# There is no guarantee in the context of filter
|
||||
# that EVENTS_INT == EVENTS_OUT, see the aggregates and
|
||||
|
|
|
@ -21,31 +21,37 @@ module LogStash module Instrument
|
|||
end
|
||||
|
||||
def push(event)
|
||||
record_metric { @write_client.push(event) }
|
||||
increment_counters(1)
|
||||
start_time = java.lang.System.current_time_millis
|
||||
result = @write_client.push(event)
|
||||
report_execution_time(start_time)
|
||||
result
|
||||
end
|
||||
|
||||
alias_method(:<<, :push)
|
||||
|
||||
def push_batch(batch)
|
||||
record_metric(batch.size) { @write_client.push_batch(batch) }
|
||||
increment_counters(batch.size)
|
||||
start_time = java.lang.System.current_time_millis
|
||||
result = @write_client.push_batch(batch)
|
||||
report_execution_time(start_time)
|
||||
result
|
||||
end
|
||||
|
||||
private
|
||||
def record_metric(size = 1)
|
||||
|
||||
def increment_counters(size)
|
||||
@events_metrics_counter.increment(size)
|
||||
@pipeline_metrics_counter.increment(size)
|
||||
@plugin_events_metrics_counter.increment(size)
|
||||
end
|
||||
|
||||
clock = @events_metrics.time(:queue_push_duration_in_millis)
|
||||
|
||||
result = yield
|
||||
|
||||
def report_execution_time(start_time)
|
||||
execution_time = java.lang.System.current_time_millis - start_time
|
||||
@events_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
# Reuse the same values for all the endpoints to make sure we don't have skew in times.
|
||||
execution_time = clock.stop
|
||||
|
||||
@pipeline_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
@plugin_events_metrics.report_time(:queue_push_duration_in_millis, execution_time)
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
def define_initial_metrics_values
|
||||
|
|
|
@ -44,9 +44,11 @@ module LogStash class OutputDelegator
|
|||
|
||||
def multi_receive(events)
|
||||
@in_counter.increment(events.length)
|
||||
clock = @metric_events.time(:duration_in_millis)
|
||||
start_time = java.lang.System.current_time_millis
|
||||
@strategy.multi_receive(events)
|
||||
clock.stop
|
||||
@metric_events.report_time(
|
||||
:duration_in_millis, java.lang.System.current_time_millis - start_time
|
||||
)
|
||||
@out_counter.increment(events.length)
|
||||
end
|
||||
|
||||
|
|
|
@ -205,19 +205,18 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def start_clock
|
||||
@inflight_clocks[Thread.current] = [
|
||||
@event_metric.time(:duration_in_millis),
|
||||
@pipeline_metric.time(:duration_in_millis)
|
||||
]
|
||||
@inflight_clocks[Thread.current] = java.lang.System.current_time_millis
|
||||
end
|
||||
|
||||
def stop_clock(batch)
|
||||
unless @inflight_clocks[Thread.current].nil?
|
||||
if batch.size > 0
|
||||
# onl/y stop (which also records) the metrics if the batch is non-empty.
|
||||
# only stop (which also records) the metrics if the batch is non-empty.
|
||||
# start_clock is now called at empty batch creation and an empty batch could
|
||||
# stay empty all the way down to the close_batch call.
|
||||
@inflight_clocks[Thread.current].each(&:stop)
|
||||
time_taken = java.lang.System.current_time_millis - @inflight_clocks[Thread.current]
|
||||
@event_metric.report_time(:duration_in_millis, time_taken)
|
||||
@pipeline_metric.report_time(:duration_in_millis, time_taken)
|
||||
end
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
|
|
|
@ -144,10 +144,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def start_clock
|
||||
@inflight_clocks[Thread.current] = [
|
||||
@event_metric.time(:duration_in_millis),
|
||||
@pipeline_metric.time(:duration_in_millis)
|
||||
]
|
||||
@inflight_clocks[Thread.current] = java.lang.System.current_time_millis
|
||||
end
|
||||
|
||||
def stop_clock(batch)
|
||||
|
@ -156,7 +153,9 @@ module LogStash; module Util
|
|||
# only stop (which also records) the metrics if the batch is non-empty.
|
||||
# start_clock is now called at empty batch creation and an empty batch could
|
||||
# stay empty all the way down to the close_batch call.
|
||||
@inflight_clocks[Thread.current].each(&:stop)
|
||||
time_taken = java.lang.System.current_time_millis - @inflight_clocks[Thread.current]
|
||||
@event_metric.report_time(:duration_in_millis, time_taken)
|
||||
@pipeline_metric.report_time(:duration_in_millis, time_taken)
|
||||
end
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
|
|
|
@ -64,9 +64,7 @@ describe LogStash::OutputDelegator do
|
|||
end
|
||||
|
||||
it "should record the `duration_in_millis`" do
|
||||
clock = spy("clock")
|
||||
expect(subject.metric_events).to receive(:time).with(:duration_in_millis).and_return(clock)
|
||||
expect(clock).to receive(:stop)
|
||||
expect(subject.metric_events).to receive(:report_time).with(:duration_in_millis, Integer)
|
||||
subject.multi_receive(events)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue