mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
PERFORMANCE: remove blocking on inflight batches map during batch reads
Fixes #8430
This commit is contained in:
parent
af0fbcfa46
commit
9a453abb54
1 changed files with 4 additions and 20 deletions
|
@ -46,10 +46,9 @@ module LogStash; module Util
|
|||
|
||||
def initialize(queue, batch_size = 125, wait_for = 250)
|
||||
@queue = queue
|
||||
@mutex = Mutex.new
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients in the pipeline.
|
||||
@inflight_batches = {}
|
||||
@inflight_batches = Concurrent::Map.new
|
||||
|
||||
# allow the worker thread to report the execution time of the filter + output
|
||||
@inflight_clocks = Concurrent::Map.new
|
||||
|
@ -93,12 +92,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def inflight_batches
|
||||
@mutex.lock
|
||||
begin
|
||||
yield(@inflight_batches)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
yield(@inflight_batches)
|
||||
end
|
||||
|
||||
def current_inflight_batch
|
||||
|
@ -118,12 +112,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def start_metrics(batch)
|
||||
@mutex.lock
|
||||
begin
|
||||
set_current_thread_inflight_batch(batch)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
set_current_thread_inflight_batch(batch)
|
||||
start_clock
|
||||
end
|
||||
|
||||
|
@ -132,12 +121,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def close_batch(batch)
|
||||
@mutex.lock
|
||||
begin
|
||||
@inflight_batches.delete(Thread.current)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
@inflight_batches.delete(Thread.current)
|
||||
stop_clock(batch)
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue