mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
prevent concurrent convergence (e.g., SIGHUP during in-flight convergence)
There are several scenarios in which we can trigger concurrent convergence in the agent, resulting in two or more threads working to perform interleaved and potentially conflicting or overlapping pipeline actions. Notably, our trap on `SIGHUP` will be resolved in its own thread, so if we are sent `SIGHUP` while in the process of converging, the second in-flight convergence may get its starting state before, during, or after the effects of the first convergence. By mutually excluding execution of the convergence cycle, we eliminate the class of bugs in which one convergence acquires actions that cannot succeed due to the prior success of actions given to the other convergence. Fixes #10537
This commit is contained in:
parent
3c3e769bb0
commit
80cf579e59
1 changed files with 15 additions and 8 deletions
|
@ -38,6 +38,8 @@ class LogStash::Agent
|
|||
# Initial usage for the Ruby pipeline initialization which is not thread safe
|
||||
@webserver_control_lock = Mutex.new
|
||||
|
||||
@convergence_lock = Mutex.new
|
||||
|
||||
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
|
||||
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
|
||||
|
||||
|
@ -154,12 +156,7 @@ class LogStash::Agent
|
|||
end
|
||||
end
|
||||
|
||||
# We Lock any access on the pipelines, since the actions will modify the
|
||||
# content of it.
|
||||
converge_result = nil
|
||||
|
||||
pipeline_actions = resolve_actions(results.response)
|
||||
converge_result = converge_state(pipeline_actions)
|
||||
converge_result = resolve_actions_and_converge_state(results.response)
|
||||
update_metrics(converge_result)
|
||||
|
||||
logger.info(
|
||||
|
@ -283,6 +280,15 @@ class LogStash::Agent
|
|||
@running.make_true
|
||||
end
|
||||
|
||||
# @param pipeline_configs [Array<Config::PipelineConfig>]
|
||||
# @return [ConvergeResult]
|
||||
def resolve_actions_and_converge_state(pipeline_configs)
|
||||
@convergence_lock.synchronize do
|
||||
pipeline_actions = resolve_actions(pipeline_configs)
|
||||
converge_state(pipeline_actions)
|
||||
end
|
||||
end
|
||||
|
||||
# We depends on a series of task derived from the internal state and what
|
||||
# need to be run, theses actions are applied to the current pipelines to converge to
|
||||
# the desired state.
|
||||
|
@ -295,6 +301,7 @@ class LogStash::Agent
|
|||
#
|
||||
def converge_state(pipeline_actions)
|
||||
logger.debug("Converging pipelines state", :actions_count => pipeline_actions.size)
|
||||
fail("Illegal access to `LogStash::Agent#converge_state()` without exclusive lock at #{caller[1]}") unless @convergence_lock.owned?
|
||||
|
||||
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
|
||||
|
||||
|
@ -343,6 +350,7 @@ class LogStash::Agent
|
|||
end
|
||||
|
||||
def resolve_actions(pipeline_configs)
|
||||
fail("Illegal access to `LogStash::Agent#resolve_actions()` without exclusive lock at #{caller[1]}") unless @convergence_lock.owned?
|
||||
@state_resolver.resolve(@pipelines_registry, pipeline_configs)
|
||||
end
|
||||
|
||||
|
@ -410,8 +418,7 @@ class LogStash::Agent
|
|||
# In this context I could just call shutdown, but I've decided to
|
||||
# use the stop action implementation for that so we have the same code.
|
||||
# This also give us some context into why a shutdown is failing
|
||||
pipeline_actions = resolve_actions([]) # We stop all the pipeline, so we converge to a empty state
|
||||
converge_state(pipeline_actions)
|
||||
resolve_actions_and_converge_state([]) # We stop all the pipeline, so we converge to a empty state
|
||||
end
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue