Fix stopped pipeline unable to be deleted in registry (#14018) (#14033)

Prior to the change, pipeline `stop` and `delete` happen in two converge cycles, which
has a gap letting the stopped pipeline compare with the same pipeline definition
in central pipeline management, hence Logstash see the stopped pipeline as graceful finish
and not to delete in registry

This commit creates StopAndDelete action to delete running pipeline in one converge cycle

Fixed: #14017
(cherry picked from commit e8cd0d3039)

Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com>
This commit is contained in:
github-actions[bot] 2022-04-27 16:30:35 +01:00 committed by GitHub
parent 788c72b67e
commit fe5172bf20
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 89 additions and 31 deletions

View file

@ -20,12 +20,14 @@ require "logstash/pipeline_action/create"
require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
require "logstash/pipeline_action/delete"
require "logstash/pipeline_action/stop_and_delete"
module LogStash module PipelineAction
ORDERING = {
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300,
LogStash::PipelineAction::StopAndDelete => 350,
LogStash::PipelineAction::Delete => 400
}
end end

View file

@ -0,0 +1,42 @@
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
require "logstash/pipeline_action/base"
module LogStash module PipelineAction
class StopAndDelete < Base
attr_reader :pipeline_id
def initialize(pipeline_id)
@pipeline_id = pipeline_id
end
def execute(agent, pipelines_registry)
pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline|
pipeline.shutdown
end
success = pipelines_registry.delete_pipeline(@pipeline_id)
LogStash::ConvergeResult::ActionResult.create(self, success)
end
def to_s
"PipelineAction::StopAndDelete<#{pipeline_id}>"
end
end
end end

View file

@ -44,10 +44,10 @@ module LogStash
configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) }
# If one of the running pipeline is not in the pipeline_configs, we assume that we need to
# stop it.
pipelines_registry.running_pipelines.keys
# stop it and delete it in registry.
pipelines_registry.running_pipelines(include_loading: true).keys
.select { |pipeline_id| !configured_pipelines.include?(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) }
.each { |pipeline_id| actions << LogStash::PipelineAction::StopAndDelete.new(pipeline_id) }
# If one of the terminated pipeline is not in the pipeline_configs, delete it in registry.
pipelines_registry.non_running_pipelines.keys

View file

@ -289,7 +289,7 @@ describe LogStash::Agent do
expect(subject.converge_state_and_update).to be_a_successful_converge
}.not_to change { subject.running_pipelines_count }
expect(subject).to have_running_pipeline?(modified_pipeline_config)
expect(subject).not_to have_pipeline?(pipeline_config)
expect(subject).to have_stopped_pipeline?(pipeline_config)
end
end

View file

@ -52,7 +52,7 @@ describe LogStash::StateResolver do
it "returns some actions" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:create, :hello_world],
[:Create, :hello_world],
)
end
end
@ -73,17 +73,17 @@ describe LogStash::StateResolver do
it "creates the new one and keep the other one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:create, :hello_world],
[:Create, :hello_world],
)
end
context "when the pipeline config contains only the new one" do
let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] }
it "creates the new one and stop the old one one" do
it "creates the new one and stop and delete the old one one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:create, :hello_world],
[:stop, :main]
[:Create, :hello_world],
[:StopAndDelete, :main]
)
end
end
@ -91,9 +91,9 @@ describe LogStash::StateResolver do
context "when the pipeline config contains no pipeline" do
let(:pipeline_configs) { [] }
it "stops the old one one" do
it "stops and delete the old one one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:stop, :main]
[:StopAndDelete, :main]
)
end
end
@ -103,7 +103,7 @@ describe LogStash::StateResolver do
it "reloads the old one one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:reload, :main]
[:Reload, :main]
)
end
end
@ -135,13 +135,13 @@ describe LogStash::StateResolver do
it "generates actions required to converge" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:create, :main7],
[:create, :main9],
[:reload, :main3],
[:reload, :main5],
[:stop, :main2],
[:stop, :main4],
[:stop, :main6]
[:Create, :main7],
[:Create, :main9],
[:Reload, :main3],
[:Reload, :main5],
[:StopAndDelete, :main2],
[:StopAndDelete, :main4],
[:StopAndDelete, :main6]
)
end
end
@ -160,14 +160,14 @@ describe LogStash::StateResolver do
it "creates the system pipeline before user defined pipelines" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions(
[:create, :monitoring],
[:create, :main7],
[:create, :main9],
[:reload, :main3],
[:reload, :main5],
[:stop, :main2],
[:stop, :main4],
[:stop, :main6]
[:Create, :monitoring],
[:Create, :main7],
[:Create, :main9],
[:Reload, :main3],
[:Reload, :main5],
[:StopAndDelete, :main2],
[:StopAndDelete, :main4],
[:StopAndDelete, :main6]
)
end
end
@ -190,7 +190,7 @@ describe LogStash::StateResolver do
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] }
it "creates the new one and keep the other one stop" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world])
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Create, :hello_world])
expect(pipelines.non_running_pipelines.size).to eq(1)
end
end
@ -199,7 +199,7 @@ describe LogStash::StateResolver do
let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] }
it "should reload the stopped pipeline" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main])
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Reload, :main])
end
end
@ -207,7 +207,7 @@ describe LogStash::StateResolver do
let(:pipeline_configs) { [] }
it "should delete the stopped one" do
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main])
expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Delete, :main])
end
end
end

View file

@ -51,7 +51,7 @@ RSpec::Matchers.define :have_actions do |*expected|
expect(actual.size).to eq(expected.size)
expected_values = expected.each_with_object([]) do |i, obj|
klass_name = "LogStash::PipelineAction::#{i.first.capitalize}"
klass_name = "LogStash::PipelineAction::#{i.first}"
obj << [klass_name, i.last]
end
@ -76,6 +76,16 @@ RSpec::Matchers.define :have_pipeline? do |pipeline_config|
end
match_when_negated do |agent|
pipeline = nil
try(30) do
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
expect(pipeline).to be_nil
end
end
end
RSpec::Matchers.define :have_stopped_pipeline? do |pipeline_config|
match do |agent|
pipeline = nil
try(30) do
pipeline = agent.get_pipeline(pipeline_config.pipeline_id)
@ -84,6 +94,10 @@ RSpec::Matchers.define :have_pipeline? do |pipeline_config|
# either the pipeline_id is not in the running pipelines OR it is but have different configurations
expect(!agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s) || pipeline.config_str != pipeline_config.config_string).to be_truthy
end
match_when_negated do
raise "Not implemented"
end
end
RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|