use method-level ensure blocks

Fixes #10872
This commit is contained in:
Dan Hermann 2019-06-28 07:12:15 -05:00
parent 1802869d80
commit 46c1611dff

View file

@ -48,29 +48,27 @@ module LogStash
# @return [Boolean] new pipeline creation success # @return [Boolean] new pipeline creation success
def create_pipeline(pipeline_id, pipeline, &create_block) def create_pipeline(pipeline_id, pipeline, &create_block)
lock = get_lock(pipeline_id) lock = get_lock(pipeline_id)
begin lock.lock
lock.lock
success = false success = false
state = @states.get(pipeline_id) state = @states.get(pipeline_id)
if state if state
if state.terminated? if state.terminated?
success = yield
state.set_pipeline(pipeline)
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
end
@states.put(pipeline_id, state)
else
success = yield success = yield
@states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success state.set_pipeline(pipeline)
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
end end
@states.put(pipeline_id, state)
success else
ensure success = yield
lock.unlock @states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success
end end
success
ensure
lock.unlock
end end
# Execute the passed termination logic block # Execute the passed termination logic block
@ -80,20 +78,18 @@ module LogStash
# @yieldparam [Pipeline] the pipeline to terminate # @yieldparam [Pipeline] the pipeline to terminate
def terminate_pipeline(pipeline_id, &stop_block) def terminate_pipeline(pipeline_id, &stop_block)
lock = get_lock(pipeline_id) lock = get_lock(pipeline_id)
begin lock.lock
lock.lock
state = @states.get(pipeline_id) state = @states.get(pipeline_id)
if state.nil? if state.nil?
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id) logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id) @states.remove(pipeline_id)
else else
yield(state.pipeline) yield(state.pipeline)
@states.put(pipeline_id, state) @states.put(pipeline_id, state)
end
ensure
lock.unlock
end end
ensure
lock.unlock
end end
# Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state # Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
@ -105,29 +101,27 @@ module LogStash
# @return [Boolean] new pipeline creation success # @return [Boolean] new pipeline creation success
def reload_pipeline(pipeline_id, &reload_block) def reload_pipeline(pipeline_id, &reload_block)
lock = get_lock(pipeline_id) lock = get_lock(pipeline_id)
begin lock.lock
lock.lock success = false
success = false
state = @states.get(pipeline_id) state = @states.get(pipeline_id)
if state.nil? if state.nil?
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id) logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id) @states.remove(pipeline_id)
else else
state.set_reloading(true) state.set_reloading(true)
begin begin
success, new_pipeline = yield success, new_pipeline = yield
state.set_pipeline(new_pipeline) state.set_pipeline(new_pipeline)
ensure ensure
state.set_reloading(false) state.set_reloading(false)
end
@states.put(pipeline_id, state)
end end
@states.put(pipeline_id, state)
success
ensure
lock.unlock
end end
success
ensure
lock.unlock
end end
# @param pipeline_id [String, Symbol] the pipeline id # @param pipeline_id [String, Symbol] the pipeline id