diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index f05134748..d8ffa82bb 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -38,6 +38,8 @@ class LogStash::Agent # Initial usage for the Ruby pipeline initialization which is not thread safe @webserver_control_lock = Mutex.new + @convergence_lock = Mutex.new + # Special bus object for inter-pipelines communications. Used by the `pipeline` input/output @pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new @@ -154,12 +156,7 @@ class LogStash::Agent end end - # We Lock any access on the pipelines, since the actions will modify the - # content of it. - converge_result = nil - - pipeline_actions = resolve_actions(results.response) - converge_result = converge_state(pipeline_actions) + converge_result = resolve_actions_and_converge_state(results.response) update_metrics(converge_result) logger.info( @@ -283,6 +280,15 @@ class LogStash::Agent @running.make_true end + # @param pipeline_configs [Array] + # @return [ConvergeResult] + def resolve_actions_and_converge_state(pipeline_configs) + @convergence_lock.synchronize do + pipeline_actions = resolve_actions(pipeline_configs) + converge_state(pipeline_actions) + end + end + # We depends on a series of task derived from the internal state and what # need to be run, theses actions are applied to the current pipelines to converge to # the desired state. @@ -295,6 +301,7 @@ class LogStash::Agent # def converge_state(pipeline_actions) logger.debug("Converging pipelines state", :actions_count => pipeline_actions.size) + fail("Illegal access to `LogStash::Agent#converge_state()` without exclusive lock at #{caller[1]}") unless @convergence_lock.owned? converge_result = LogStash::ConvergeResult.new(pipeline_actions.size) @@ -343,6 +350,7 @@ class LogStash::Agent end def resolve_actions(pipeline_configs) + fail("Illegal access to `LogStash::Agent#resolve_actions()` without exclusive lock at #{caller[1]}") unless @convergence_lock.owned? @state_resolver.resolve(@pipelines_registry, pipeline_configs) end @@ -410,8 +418,7 @@ class LogStash::Agent # In this context I could just call shutdown, but I've decided to # use the stop action implementation for that so we have the same code. # This also give us some context into why a shutdown is failing - pipeline_actions = resolve_actions([]) # We stop all the pipeline, so we converge to a empty state - converge_state(pipeline_actions) + resolve_actions_and_converge_state([]) # We stop all the pipeline, so we converge to a empty state end