diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 51ec9b944..6ba13679f 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -48,29 +48,27 @@ module LogStash # @return [Boolean] new pipeline creation success def create_pipeline(pipeline_id, pipeline, &create_block) lock = get_lock(pipeline_id) - begin - lock.lock + lock.lock - success = false + success = false - state = @states.get(pipeline_id) - if state - if state.terminated? - success = yield - state.set_pipeline(pipeline) - else - logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id) - end - @states.put(pipeline_id, state) - else + state = @states.get(pipeline_id) + if state + if state.terminated? success = yield - @states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success + state.set_pipeline(pipeline) + else + logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id) end - - success - ensure - lock.unlock + @states.put(pipeline_id, state) + else + success = yield + @states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success end + + success + ensure + lock.unlock end # Execute the passed termination logic block @@ -80,20 +78,18 @@ module LogStash # @yieldparam [Pipeline] the pipeline to terminate def terminate_pipeline(pipeline_id, &stop_block) lock = get_lock(pipeline_id) - begin - lock.lock + lock.lock - state = @states.get(pipeline_id) - if state.nil? - logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id) - @states.remove(pipeline_id) - else - yield(state.pipeline) - @states.put(pipeline_id, state) - end - ensure - lock.unlock + state = @states.get(pipeline_id) + if state.nil? + logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id) + @states.remove(pipeline_id) + else + yield(state.pipeline) + @states.put(pipeline_id, state) end + ensure + lock.unlock end # Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state @@ -105,29 +101,27 @@ module LogStash # @return [Boolean] new pipeline creation success def reload_pipeline(pipeline_id, &reload_block) lock = get_lock(pipeline_id) - begin - lock.lock - success = false + lock.lock + success = false - state = @states.get(pipeline_id) - if state.nil? - logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id) - @states.remove(pipeline_id) - else - state.set_reloading(true) - begin - success, new_pipeline = yield - state.set_pipeline(new_pipeline) - ensure - state.set_reloading(false) - end - @states.put(pipeline_id, state) + state = @states.get(pipeline_id) + if state.nil? + logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id) + @states.remove(pipeline_id) + else + state.set_reloading(true) + begin + success, new_pipeline = yield + state.set_pipeline(new_pipeline) + ensure + state.set_reloading(false) end - - success - ensure - lock.unlock + @states.put(pipeline_id, state) end + + success + ensure + lock.unlock end # @param pipeline_id [String, Symbol] the pipeline id