correctly handle pipeline actions (#10331)

This commit is contained in:
Colin Surprenant 2019-01-16 14:42:26 -05:00 committed by GitHub
parent ce80262d02
commit 528112c67e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 25 deletions

View file

@ -31,29 +31,30 @@ module LogStash module PipelineAction
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines)
pipeline =
new_pipeline =
if @pipeline_config.settings.get_value("pipeline.java_execution")
LogStash::JavaPipeline.new(@pipeline_config, @metric, agent)
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchonize this initialization.
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::Pipeline.new(@pipeline_config, @metric, agent)
end
end
status = nil
pipelines.compute(pipeline_id) do |id,value|
if value
LogStash::ConvergeResult::ActionResult.create(self, true)
result = nil
pipelines.compute(pipeline_id) do |_, current_pipeline|
if current_pipeline
result = LogStash::ConvergeResult::FailedAction.new("Attempted to create a pipeline that already exists")
current_pipeline
else
result = new_pipeline.start # block until the pipeline is correctly started or crashed
result ? new_pipeline : nil
end
status = pipeline.start # block until the pipeline is correctly started or crashed
pipeline # The pipeline is successfully started we can add it to the map
end
LogStash::ConvergeResult::ActionResult.create(self, status)
LogStash::ConvergeResult::ActionResult.create(self, result)
end

View file

@ -34,8 +34,8 @@ module LogStash module PipelineAction
else
agent.exclusive do
# The Ruby pipeline initialization is not thread safe because of the module level
# shared state in LogsStash::Config::AST. When using multiple pipelines this can gets
# executed simultaneously in different threads and we need to synchonize this initialization.
# shared state in LogsStash::Config::AST. When using multiple pipelines this gets
# executed simultaneously in different threads and we need to synchronize this initialization.
LogStash::BasePipeline.new(@pipeline_config)
end
end
@ -49,15 +49,12 @@ module LogStash module PipelineAction
logger.info("Reloading pipeline", "pipeline.id" => pipeline_id)
pipelines.compute(pipeline_id) do |_,pipeline|
status = Stop.new(pipeline_id).execute(agent, pipelines)
stop_result = Stop.new(pipeline_id).execute(agent, pipelines)
if status
return Create.new(@pipeline_config, @metric).execute(agent, pipelines)
else
return status
end
pipeline
if stop_result.successful?
Create.new(@pipeline_config, @metric).execute(agent, pipelines)
else
stop_result
end
end
end

View file

@ -10,14 +10,13 @@ module LogStash module PipelineAction
end
def execute(agent, pipelines)
pipelines.compute(pipeline_id) do |_,pipeline|
pipelines.compute(pipeline_id) do |_, pipeline|
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
pipeline.thread.join
nil # delete the pipeline
nil # remove pipeline from pipelines
end
# If we reach this part of the code we have succeeded because
# the shutdown call will block.
return LogStash::ConvergeResult::SuccessfulAction.new
LogStash::ConvergeResult::SuccessfulAction.new
end
def to_s