diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb index 2ea98056c..2fb19df01 100644 --- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb @@ -149,10 +149,6 @@ module LogStash; module Util end end - def current_inflight_batch - @inflight_batches.fetch(Thread.current, []) - end - # create a new empty batch # @return [ReadBatch] a new empty read batch def new_batch @@ -171,36 +167,26 @@ module LogStash; module Util end def start_metrics(batch) + thread = Thread.current @mutex.lock begin - set_current_thread_inflight_batch(batch) + @inflight_batches[thread] = batch ensure @mutex.unlock end - start_clock - end - - def set_current_thread_inflight_batch(batch) - @inflight_batches[Thread.current] = batch + @inflight_clocks[thread] = java.lang.System.nano_time end def close_batch(batch) + thread = Thread.current @mutex.lock begin batch.close - @inflight_batches.delete(Thread.current) + @inflight_batches.delete(thread) ensure @mutex.unlock end - stop_clock(batch) - end - - def start_clock - @inflight_clocks[Thread.current] = java.lang.System.nano_time - end - - def stop_clock(batch) - start_time = @inflight_clocks.get_and_set(Thread.current, nil) + start_time = @inflight_clocks.get_and_set(thread, nil) unless start_time.nil? if batch.size > 0 # only stop (which also records) the metrics if the batch is non-empty. diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb index a77210cfa..bdaabd8ed 100644 --- a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -80,10 +80,6 @@ module LogStash; module Util yield(@inflight_batches) end - def current_inflight_batch - @inflight_batches.fetch(Thread.current, []) - end - # create a new empty batch # @return [ReadBatch] a new empty read batch def new_batch @@ -97,25 +93,15 @@ module LogStash; module Util end def start_metrics(batch) - set_current_thread_inflight_batch(batch) - start_clock - end - - def set_current_thread_inflight_batch(batch) - @inflight_batches[Thread.current] = batch + thread = Thread.current + @inflight_batches[thread] = batch + @inflight_clocks[thread] = java.lang.System.nano_time end def close_batch(batch) - @inflight_batches.delete(Thread.current) - stop_clock(batch) - end - - def start_clock - @inflight_clocks[Thread.current] = java.lang.System.nano_time - end - - def stop_clock(batch) - start_time = @inflight_clocks.get_and_set(Thread.current, nil) + thread = Thread.current + @inflight_batches.delete(thread) + start_time = @inflight_clocks.get_and_set(thread, nil) unless start_time.nil? if batch.size > 0 # only stop (which also records) the metrics if the batch is non-empty.