don't perform long-running pipeline actions inside calls to ConcurrentHashMap.compute to avoid deadlocks

Fixes #10872
This commit is contained in:
Dan Hermann 2019-06-27 17:54:35 -05:00
parent d1e92862c3
commit 0ff6d11344

View file

@ -35,6 +35,7 @@ module LogStash
# will block until the other compute finishes so no mutex is necessary
# for synchronizing compute calls
@states = java.util.concurrent.ConcurrentHashMap.new
@locks = java.util.concurrent.ConcurrentHashMap.new
end
# Execute the passed creation logic block and create a new state upon success
@ -46,9 +47,13 @@ module LogStash
#
# @return [Boolean] new pipeline creation success
def create_pipeline(pipeline_id, pipeline, &create_block)
success = false
lock = get_lock(pipeline_id)
begin
lock.lock
@states.compute(pipeline_id) do |_, state|
success = false
state = @states.get(pipeline_id)
if state
if state.terminated?
success = yield
@ -56,14 +61,16 @@ module LogStash
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
end
state
@states.put(pipeline_id, state)
else
success = yield
success ? PipelineState.new(pipeline_id, pipeline) : nil
@states.put(pipeline_id, success ? PipelineState.new(pipeline_id, pipeline) : nil)
end
end
success
success
ensure
lock.unlock
end
end
# Execute the passed termination logic block
@ -72,14 +79,20 @@ module LogStash
#
# @yieldparam [Pipeline] the pipeline to terminate
def terminate_pipeline(pipeline_id, &stop_block)
@states.compute(pipeline_id) do |_, state|
lock = get_lock(pipeline_id)
begin
lock.lock
state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
nil
@states.put(pipeline_id, nil)
else
yield(state.pipeline)
state
@states.put(pipeline_id, state)
end
ensure
lock.unlock
end
end
@ -91,12 +104,14 @@ module LogStash
#
# @return [Boolean] new pipeline creation success
def reload_pipeline(pipeline_id, &reload_block)
success = false
lock = get_lock(pipeline_id)
begin
success = false
@states.compute(pipeline_id) do |_, state|
state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
nil
@states.put(pipeline_id, nil)
else
state.set_reloading(true)
begin
@ -105,11 +120,13 @@ module LogStash
ensure
state.set_reloading(false)
end
state
@states.put(pipeline_id, state)
end
end
success
ensure
lock.unlock
end
end
# @param pipeline_id [String, Symbol] the pipeline id
@ -162,5 +179,11 @@ module LogStash
end
end
end
def get_lock(pipeline_id)
@locks.compute_if_absent(pipeline_id) do |k|
java.util.concurrent.locks.ReentrantLock.new
end
end
end
end