mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
remove exclusive lock for Ruby pipeline initialization (#10462)
7.0 clean backport of #10431
This commit is contained in:
parent
0efa0f2eba
commit
268040c760
3 changed files with 6 additions and 41 deletions
|
@ -36,7 +36,6 @@ class LogStash::Agent
|
|||
|
||||
# Mutex to synchonize in the exclusive method
|
||||
# Initial usage for the Ruby pipeline initialization which is not thread safe
|
||||
@exclusive_lock = Mutex.new
|
||||
@webserver_control_lock = Mutex.new
|
||||
|
||||
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
|
||||
|
@ -86,10 +85,6 @@ class LogStash::Agent
|
|||
@running = Concurrent::AtomicBoolean.new(false)
|
||||
end
|
||||
|
||||
def exclusive(&block)
|
||||
@exclusive_lock.synchronize { block.call }
|
||||
end
|
||||
|
||||
def execute
|
||||
@thread = Thread.current # this var is implicitly used by Stud.stop?
|
||||
logger.debug("Starting agent")
|
||||
|
|
|
@ -32,22 +32,11 @@ module LogStash module PipelineAction
|
|||
# The execute assume that the thread safety access of the pipeline
|
||||
# is managed by the caller.
|
||||
def execute(agent, pipelines_registry)
|
||||
new_pipeline =
|
||||
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
|
||||
else
|
||||
agent.exclusive do
|
||||
# The Ruby pipeline initialization is not thread safe because of the module level
|
||||
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
|
||||
# executed simultaneously in different threads and we need to synchronize this initialization.
|
||||
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
|
||||
end
|
||||
end
|
||||
|
||||
pipeline_class = @pipeline_config.settings.get_value("pipeline.java_execution") ? LogStash::JavaPipeline : LogStash::Pipeline
|
||||
new_pipeline = pipeline_class.new(@pipeline_config, @metric, agent)
|
||||
success = pipelines_registry.create_pipeline(pipeline_id, new_pipeline) do
|
||||
new_pipeline.start # block until the pipeline is correctly started or crashed
|
||||
end
|
||||
|
||||
LogStash::ConvergeResult::ActionResult.create(self, success)
|
||||
end
|
||||
|
||||
|
|
|
@ -31,18 +31,10 @@ module LogStash module PipelineAction
|
|||
return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the existing pipeline is not reloadable")
|
||||
end
|
||||
|
||||
java_exec = @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||
|
||||
begin
|
||||
pipeline_validator =
|
||||
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||
LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
|
||||
else
|
||||
agent.exclusive do
|
||||
# The Ruby pipeline initialization is not thread safe because of the module level
|
||||
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
|
||||
# executed simultaneously in different threads and we need to synchronize this initialization.
|
||||
LogStash::BasePipeline.new(@pipeline_config)
|
||||
end
|
||||
end
|
||||
pipeline_validator = java_exec ? LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil) : LogStash::BasePipeline.new(@pipeline_config)
|
||||
rescue => e
|
||||
return LogStash::ConvergeResult::FailedAction.from_exception(e)
|
||||
end
|
||||
|
@ -62,18 +54,7 @@ module LogStash module PipelineAction
|
|||
old_pipeline.thread.join
|
||||
|
||||
# Then create a new pipeline
|
||||
new_pipeline =
|
||||
if @pipeline_config.settings.get_value("pipeline.java_execution")
|
||||
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
|
||||
else
|
||||
agent.exclusive do
|
||||
# The Ruby pipeline initialization is not thread safe because of the module level
|
||||
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
|
||||
# executed simultaneously in different threads and we need to synchronize this initialization.
|
||||
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
|
||||
end
|
||||
end
|
||||
|
||||
new_pipeline = java_exec ? LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) : LogStash::Pipeline.new(@pipeline_config, @metric, agent)
|
||||
success = new_pipeline.start # block until the pipeline is correctly started or crashed
|
||||
|
||||
# return success and new_pipeline to registry reload_pipeline
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue