synchronize ruby pipeline initialization to fix concurrency bug (#10113)

This commit is contained in:
Colin Surprenant 2018-11-02 19:19:31 -04:00 committed by GitHub
parent 095fa9aa95
commit cc2d54bc16
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 2 deletions

View file

@ -33,6 +33,10 @@ class LogStash::Agent
@auto_reload = setting("config.reload.automatic")
@ephemeral_id = SecureRandom.uuid
# Mutex to synchonize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
@exclusive_lock = Mutex.new
# Special bus object for inter-pipelines communications. Used by the `pipeline` input/output
@pipeline_bus = org.logstash.plugins.pipeline.PipelineBus.new
@ -80,6 +84,10 @@ class LogStash::Agent
@running = Concurrent::AtomicBoolean.new(false)
end
def exclusive(&block)
@exclusive_lock.synchronize { block.call }
end
def execute
@thread = Thread.current # this var is implicitly used by Stud.stop?
logger.debug("Starting agent")

View file

@ -35,7 +35,12 @@ module LogStash module PipelineAction
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchonize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end
status = nil

View file

@ -32,7 +32,12 @@ module LogStash module PipelineAction
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaBasePipeline.new(@pipeline_config, nil, logger, nil)
else
LogStash::BasePipeline.new(@pipeline_config)
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this can gets
# executed simultaneously in different threads and we need to synchonize this initialization.
LogStash::BasePipeline.new(@pipeline_config)
end
end
rescue => e
return LogStash::ConvergeResult::FailedAction.from_exception(e)