mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
5d35a8f315
commit
c3d5332d7d
2 changed files with 12 additions and 16 deletions
|
@ -103,7 +103,7 @@ module LogStash; module Util
|
|||
# batches will fail if we have multiple read clients in the pipeline.
|
||||
@inflight_batches = {}
|
||||
# allow the worker thread to report the execution time of the filter + output
|
||||
@inflight_clocks = {}
|
||||
@inflight_clocks = Concurrent::Map.new
|
||||
@batch_size = batch_size
|
||||
@wait_for = wait_for
|
||||
end
|
||||
|
@ -180,12 +180,11 @@ module LogStash; module Util
|
|||
def start_metrics(batch)
|
||||
@mutex.lock
|
||||
begin
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
set_current_thread_inflight_batch(batch)
|
||||
start_clock
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
start_clock
|
||||
end
|
||||
|
||||
def set_current_thread_inflight_batch(batch)
|
||||
|
@ -196,12 +195,11 @@ module LogStash; module Util
|
|||
@mutex.lock
|
||||
begin
|
||||
batch.close
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
@inflight_batches.delete(Thread.current)
|
||||
stop_clock(batch)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
stop_clock(batch)
|
||||
end
|
||||
|
||||
def start_clock
|
||||
|
@ -209,16 +207,16 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def stop_clock(batch)
|
||||
unless @inflight_clocks[Thread.current].nil?
|
||||
start_time = @inflight_clocks.get_and_set(Thread.current, nil)
|
||||
unless start_time.nil?
|
||||
if batch.size > 0
|
||||
# only stop (which also records) the metrics if the batch is non-empty.
|
||||
# start_clock is now called at empty batch creation and an empty batch could
|
||||
# stay empty all the way down to the close_batch call.
|
||||
time_taken = (java.lang.System.nano_time - @inflight_clocks[Thread.current]) / 1_000_000
|
||||
time_taken = (java.lang.System.nano_time - start_time) / 1_000_000
|
||||
@event_metric.report_time(:duration_in_millis, time_taken)
|
||||
@pipeline_metric.report_time(:duration_in_millis, time_taken)
|
||||
end
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ module LogStash; module Util
|
|||
@inflight_batches = {}
|
||||
|
||||
# allow the worker thread to report the execution time of the filter + output
|
||||
@inflight_clocks = {}
|
||||
@inflight_clocks = Concurrent::Map.new
|
||||
@batch_size = batch_size
|
||||
@wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS)
|
||||
end
|
||||
|
@ -119,13 +119,12 @@ module LogStash; module Util
|
|||
|
||||
def start_metrics(batch)
|
||||
@mutex.lock
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
begin
|
||||
set_current_thread_inflight_batch(batch)
|
||||
start_clock
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
start_clock
|
||||
end
|
||||
|
||||
def set_current_thread_inflight_batch(batch)
|
||||
|
@ -135,12 +134,11 @@ module LogStash; module Util
|
|||
def close_batch(batch)
|
||||
@mutex.lock
|
||||
begin
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
@inflight_batches.delete(Thread.current)
|
||||
stop_clock(batch)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
stop_clock(batch)
|
||||
end
|
||||
|
||||
def start_clock
|
||||
|
@ -148,16 +146,16 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def stop_clock(batch)
|
||||
unless @inflight_clocks[Thread.current].nil?
|
||||
start_time = @inflight_clocks.get_and_set(Thread.current, nil)
|
||||
unless start_time.nil?
|
||||
if batch.size > 0
|
||||
# only stop (which also records) the metrics if the batch is non-empty.
|
||||
# start_clock is now called at empty batch creation and an empty batch could
|
||||
# stay empty all the way down to the close_batch call.
|
||||
time_taken = (java.lang.System.nano_time - @inflight_clocks[Thread.current]) / 1_000_000
|
||||
time_taken = (java.lang.System.nano_time - start_time) / 1_000_000
|
||||
@event_metric_time.increment(time_taken)
|
||||
@pipeline_metric_time.increment(time_taken)
|
||||
end
|
||||
@inflight_clocks.delete(Thread.current)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue