From fe5172bf205e0bee648f335042607fec0d7a08c5 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 27 Apr 2022 16:30:35 +0100 Subject: [PATCH] Fix stopped pipeline unable to be deleted in registry (#14018) (#14033) Prior to the change, pipeline `stop` and `delete` happen in two converge cycles, which has a gap letting the stopped pipeline compare with the same pipeline definition in central pipeline management, hence Logstash see the stopped pipeline as graceful finish and not to delete in registry This commit creates StopAndDelete action to delete running pipeline in one converge cycle Fixed: #14017 (cherry picked from commit e8cd0d3039399fbdd05287421688cc9f5ea5a599) Co-authored-by: kaisecheng <69120390+kaisecheng@users.noreply.github.com> --- logstash-core/lib/logstash/pipeline_action.rb | 2 + .../pipeline_action/stop_and_delete.rb | 42 +++++++++++++++ logstash-core/lib/logstash/state_resolver.rb | 6 +-- .../spec/logstash/agent/converge_spec.rb | 2 +- .../spec/logstash/state_resolver_spec.rb | 52 +++++++++---------- logstash-core/spec/support/matchers.rb | 16 +++++- 6 files changed, 89 insertions(+), 31 deletions(-) create mode 100644 logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb diff --git a/logstash-core/lib/logstash/pipeline_action.rb b/logstash-core/lib/logstash/pipeline_action.rb index 910ce66bf..3ae612ec0 100644 --- a/logstash-core/lib/logstash/pipeline_action.rb +++ b/logstash-core/lib/logstash/pipeline_action.rb @@ -20,12 +20,14 @@ require "logstash/pipeline_action/create" require "logstash/pipeline_action/stop" require "logstash/pipeline_action/reload" require "logstash/pipeline_action/delete" +require "logstash/pipeline_action/stop_and_delete" module LogStash module PipelineAction ORDERING = { LogStash::PipelineAction::Create => 100, LogStash::PipelineAction::Reload => 200, LogStash::PipelineAction::Stop => 300, + LogStash::PipelineAction::StopAndDelete => 350, LogStash::PipelineAction::Delete => 400 } end end diff --git a/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb new file mode 100644 index 000000000..c627087ed --- /dev/null +++ b/logstash-core/lib/logstash/pipeline_action/stop_and_delete.rb @@ -0,0 +1,42 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/pipeline_action/base" + +module LogStash module PipelineAction + class StopAndDelete < Base + attr_reader :pipeline_id + + def initialize(pipeline_id) + @pipeline_id = pipeline_id + end + + def execute(agent, pipelines_registry) + pipelines_registry.terminate_pipeline(pipeline_id) do |pipeline| + pipeline.shutdown + end + + success = pipelines_registry.delete_pipeline(@pipeline_id) + + LogStash::ConvergeResult::ActionResult.create(self, success) + end + + def to_s + "PipelineAction::StopAndDelete<#{pipeline_id}>" + end + end +end end diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb index 8045c4b0b..efa6e44a6 100644 --- a/logstash-core/lib/logstash/state_resolver.rb +++ b/logstash-core/lib/logstash/state_resolver.rb @@ -44,10 +44,10 @@ module LogStash configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) } # If one of the running pipeline is not in the pipeline_configs, we assume that we need to - # stop it. - pipelines_registry.running_pipelines.keys + # stop it and delete it in registry. + pipelines_registry.running_pipelines(include_loading: true).keys .select { |pipeline_id| !configured_pipelines.include?(pipeline_id) } - .each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) } + .each { |pipeline_id| actions << LogStash::PipelineAction::StopAndDelete.new(pipeline_id) } # If one of the terminated pipeline is not in the pipeline_configs, delete it in registry. pipelines_registry.non_running_pipelines.keys diff --git a/logstash-core/spec/logstash/agent/converge_spec.rb b/logstash-core/spec/logstash/agent/converge_spec.rb index 482fc0611..52815d013 100644 --- a/logstash-core/spec/logstash/agent/converge_spec.rb +++ b/logstash-core/spec/logstash/agent/converge_spec.rb @@ -289,7 +289,7 @@ describe LogStash::Agent do expect(subject.converge_state_and_update).to be_a_successful_converge }.not_to change { subject.running_pipelines_count } expect(subject).to have_running_pipeline?(modified_pipeline_config) - expect(subject).not_to have_pipeline?(pipeline_config) + expect(subject).to have_stopped_pipeline?(pipeline_config) end end diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb index 350832797..4ba7ec0e2 100644 --- a/logstash-core/spec/logstash/state_resolver_spec.rb +++ b/logstash-core/spec/logstash/state_resolver_spec.rb @@ -52,7 +52,7 @@ describe LogStash::StateResolver do it "returns some actions" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:create, :hello_world], + [:Create, :hello_world], ) end end @@ -73,17 +73,17 @@ describe LogStash::StateResolver do it "creates the new one and keep the other one" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:create, :hello_world], + [:Create, :hello_world], ) end context "when the pipeline config contains only the new one" do let(:pipeline_configs) { [mock_pipeline_config(:hello_world)] } - it "creates the new one and stop the old one one" do + it "creates the new one and stop and delete the old one one" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:create, :hello_world], - [:stop, :main] + [:Create, :hello_world], + [:StopAndDelete, :main] ) end end @@ -91,9 +91,9 @@ describe LogStash::StateResolver do context "when the pipeline config contains no pipeline" do let(:pipeline_configs) { [] } - it "stops the old one one" do + it "stops and delete the old one one" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:stop, :main] + [:StopAndDelete, :main] ) end end @@ -103,7 +103,7 @@ describe LogStash::StateResolver do it "reloads the old one one" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:reload, :main] + [:Reload, :main] ) end end @@ -135,13 +135,13 @@ describe LogStash::StateResolver do it "generates actions required to converge" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:create, :main7], - [:create, :main9], - [:reload, :main3], - [:reload, :main5], - [:stop, :main2], - [:stop, :main4], - [:stop, :main6] + [:Create, :main7], + [:Create, :main9], + [:Reload, :main3], + [:Reload, :main5], + [:StopAndDelete, :main2], + [:StopAndDelete, :main4], + [:StopAndDelete, :main6] ) end end @@ -160,14 +160,14 @@ describe LogStash::StateResolver do it "creates the system pipeline before user defined pipelines" do expect(subject.resolve(pipelines, pipeline_configs)).to have_actions( - [:create, :monitoring], - [:create, :main7], - [:create, :main9], - [:reload, :main3], - [:reload, :main5], - [:stop, :main2], - [:stop, :main4], - [:stop, :main6] + [:Create, :monitoring], + [:Create, :main7], + [:Create, :main9], + [:Reload, :main3], + [:Reload, :main5], + [:StopAndDelete, :main2], + [:StopAndDelete, :main4], + [:StopAndDelete, :main6] ) end end @@ -190,7 +190,7 @@ describe LogStash::StateResolver do let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] } it "creates the new one and keep the other one stop" do - expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world]) + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Create, :hello_world]) expect(pipelines.non_running_pipelines.size).to eq(1) end end @@ -199,7 +199,7 @@ describe LogStash::StateResolver do let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] } it "should reload the stopped pipeline" do - expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main]) + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Reload, :main]) end end @@ -207,7 +207,7 @@ describe LogStash::StateResolver do let(:pipeline_configs) { [] } it "should delete the stopped one" do - expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main]) + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:Delete, :main]) end end end diff --git a/logstash-core/spec/support/matchers.rb b/logstash-core/spec/support/matchers.rb index 52d78de05..a55ba16c2 100644 --- a/logstash-core/spec/support/matchers.rb +++ b/logstash-core/spec/support/matchers.rb @@ -51,7 +51,7 @@ RSpec::Matchers.define :have_actions do |*expected| expect(actual.size).to eq(expected.size) expected_values = expected.each_with_object([]) do |i, obj| - klass_name = "LogStash::PipelineAction::#{i.first.capitalize}" + klass_name = "LogStash::PipelineAction::#{i.first}" obj << [klass_name, i.last] end @@ -76,6 +76,16 @@ RSpec::Matchers.define :have_pipeline? do |pipeline_config| end match_when_negated do |agent| + pipeline = nil + try(30) do + pipeline = agent.get_pipeline(pipeline_config.pipeline_id) + expect(pipeline).to be_nil + end + end +end + +RSpec::Matchers.define :have_stopped_pipeline? do |pipeline_config| + match do |agent| pipeline = nil try(30) do pipeline = agent.get_pipeline(pipeline_config.pipeline_id) @@ -84,6 +94,10 @@ RSpec::Matchers.define :have_pipeline? do |pipeline_config| # either the pipeline_id is not in the running pipelines OR it is but have different configurations expect(!agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s) || pipeline.config_str != pipeline_config.config_string).to be_truthy end + + match_when_negated do + raise "Not implemented" + end end RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|