mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
7733dd0f86
commit
842658f3a6
1 changed files with 20 additions and 6 deletions
|
@ -57,7 +57,7 @@ module LogStash; module Util
|
|||
|
||||
def initialize(queue, batch_size = 125, wait_for = 250)
|
||||
@queue = queue
|
||||
@mutex = Mutex.new
|
||||
@mutex = java.util.concurrent.locks.ReentrantLock.new
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients in the pipeline.
|
||||
@inflight_batches = {}
|
||||
|
@ -98,8 +98,11 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def inflight_batches
|
||||
@mutex.synchronize do
|
||||
@mutex.lock
|
||||
begin
|
||||
yield(@inflight_batches)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -115,16 +118,24 @@ module LogStash; module Util
|
|||
|
||||
def read_batch
|
||||
batch = new_batch
|
||||
@mutex.synchronize { batch.read_next }
|
||||
@mutex.lock
|
||||
begin
|
||||
batch.read_next
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
start_metrics(batch)
|
||||
batch
|
||||
end
|
||||
|
||||
def start_metrics(batch)
|
||||
@mutex.synchronize do
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
@mutex.lock
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
begin
|
||||
set_current_thread_inflight_batch(batch)
|
||||
start_clock
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -133,10 +144,13 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def close_batch(batch)
|
||||
@mutex.synchronize do
|
||||
@mutex.lock
|
||||
begin
|
||||
# there seems to be concurrency issues with metrics, keep it in the mutex
|
||||
@inflight_batches.delete(Thread.current)
|
||||
stop_clock(batch)
|
||||
ensure
|
||||
@mutex.unlock
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue