From 1ea4837b71858f8706c353c9f1fbd5e05caa798c Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Mon, 9 Nov 2020 13:21:40 +0100 Subject: [PATCH] [backport 7x] delete pipeline in registry (#12419) deletes the pipeline in the pipelines_registry if it is terminated and is removed in the source Fixed: #12414 --- logstash-core/lib/logstash/pipeline_action.rb | 4 +- .../lib/logstash/pipeline_action/delete.rb | 38 +++++++++++++++++ .../lib/logstash/pipelines_registry.rb | 27 +++++++++++- logstash-core/lib/logstash/state_resolver.rb | 7 +++- .../spec/logstash/pipelines_registry_spec.rb | 42 +++++++++++++++++++ .../spec/logstash/state_resolver_spec.rb | 40 ++++++++++++++++++ 6 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 logstash-core/lib/logstash/pipeline_action/delete.rb diff --git a/logstash-core/lib/logstash/pipeline_action.rb b/logstash-core/lib/logstash/pipeline_action.rb index f4c1ba9d9..910ce66bf 100644 --- a/logstash-core/lib/logstash/pipeline_action.rb +++ b/logstash-core/lib/logstash/pipeline_action.rb @@ -19,11 +19,13 @@ require "logstash/pipeline_action/base" require "logstash/pipeline_action/create" require "logstash/pipeline_action/stop" require "logstash/pipeline_action/reload" +require "logstash/pipeline_action/delete" module LogStash module PipelineAction ORDERING = { LogStash::PipelineAction::Create => 100, LogStash::PipelineAction::Reload => 200, - LogStash::PipelineAction::Stop => 300 + LogStash::PipelineAction::Stop => 300, + LogStash::PipelineAction::Delete => 400 } end end diff --git a/logstash-core/lib/logstash/pipeline_action/delete.rb b/logstash-core/lib/logstash/pipeline_action/delete.rb new file mode 100644 index 000000000..1a19509ba --- /dev/null +++ b/logstash-core/lib/logstash/pipeline_action/delete.rb @@ -0,0 +1,38 @@ +# Licensed to Elasticsearch B.V. under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Elasticsearch B.V. licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require "logstash/pipeline_action/base" + +module LogStash module PipelineAction + class Delete < Base + attr_reader :pipeline_id + + def initialize(pipeline_id) + @pipeline_id = pipeline_id + end + + def execute(agent, pipelines_registry) + success = pipelines_registry.delete_pipeline(@pipeline_id) + + LogStash::ConvergeResult::ActionResult.create(self, success) + end + + def to_s + "PipelineAction::Delete<#{pipeline_id}>" + end + end +end end diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 6143f3ee3..5fc67e086 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -76,7 +76,6 @@ module LogStash def remove(pipeline_id) @lock.synchronize do @states.delete(pipeline_id) - @locks.delete(pipeline_id) end end @@ -209,6 +208,32 @@ module LogStash lock.unlock end + # Delete the pipeline that is terminated + # @param pipeline_id [String, Symbol] the pipeline id + # @return [Boolean] pipeline delete success + def delete_pipeline(pipeline_id) + lock = @states.get_lock(pipeline_id) + lock.lock + + state = @states.get(pipeline_id) + + if state.nil? + logger.error("Attempted to delete a pipeline that does not exists", :pipeline_id => pipeline_id) + return false + end + + if state.terminated? + @states.remove(pipeline_id) + logger.info("Removed pipeline from registry successfully", :pipeline_id => pipeline_id) + return true + else + logger.info("Attempted to delete a pipeline that is not terminated", :pipeline_id => pipeline_id) + return false + end + ensure + lock.unlock + end + # @param pipeline_id [String, Symbol] the pipeline id # @return [Pipeline] the pipeline object or nil if none for pipeline_id def get_pipeline(pipeline_id) diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb index 386921b1f..8045c4b0b 100644 --- a/logstash-core/lib/logstash/state_resolver.rb +++ b/logstash-core/lib/logstash/state_resolver.rb @@ -41,7 +41,7 @@ module LogStash end end - configured_pipelines = pipeline_configs.map { |config| config.pipeline_id.to_sym } + configured_pipelines = pipeline_configs.each_with_object(Set.new) { |config, set| set.add(config.pipeline_id.to_sym) } # If one of the running pipeline is not in the pipeline_configs, we assume that we need to # stop it. @@ -49,6 +49,11 @@ module LogStash .select { |pipeline_id| !configured_pipelines.include?(pipeline_id) } .each { |pipeline_id| actions << LogStash::PipelineAction::Stop.new(pipeline_id) } + # If one of the terminated pipeline is not in the pipeline_configs, delete it in registry. + pipelines_registry.non_running_pipelines.keys + .select { |pipeline_id| !configured_pipelines.include?(pipeline_id) } + .each { |pipeline_id| actions << LogStash::PipelineAction::Delete.new(pipeline_id)} + actions.sort # See logstash/pipeline_action.rb end end diff --git a/logstash-core/spec/logstash/pipelines_registry_spec.rb b/logstash-core/spec/logstash/pipelines_registry_spec.rb index 3af1ff78a..b068a4e5a 100644 --- a/logstash-core/spec/logstash/pipelines_registry_spec.rb +++ b/logstash-core/spec/logstash/pipelines_registry_spec.rb @@ -228,6 +228,48 @@ describe LogStash::PipelinesRegistry do end end + context "deleting a pipeline" do + context "when pipeline is in registry" do + before :each do + subject.create_pipeline(pipeline_id, pipeline) { true } + end + + it "should not delete pipeline if pipeline is not terminated" do + expect(pipeline).to receive(:finished_execution?).and_return(false) + expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger) + expect(logger).to receive(:info) + expect(subject.delete_pipeline(pipeline_id)).to be_falsey + expect(subject.get_pipeline(pipeline_id)).not_to be_nil + end + + it "should delete pipeline if pipeline is terminated" do + expect(pipeline).to receive(:finished_execution?).and_return(true) + expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger) + expect(logger).to receive(:info) + expect(subject.delete_pipeline(pipeline_id)).to be_truthy + expect(subject.get_pipeline(pipeline_id)).to be_nil + end + + it "should recreate pipeline if pipeline is delete and create again" do + expect(pipeline).to receive(:finished_execution?).and_return(true) + expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger) + expect(logger).to receive(:info) + expect(subject.delete_pipeline(pipeline_id)).to be_truthy + expect(subject.get_pipeline(pipeline_id)).to be_nil + subject.create_pipeline(pipeline_id, pipeline) { true } + expect(subject.get_pipeline(pipeline_id)).not_to be_nil + end + end + + context "when pipeline is not in registry" do + it "should log error" do + expect(LogStash::PipelinesRegistry).to receive(:logger).and_return(logger) + expect(logger).to receive(:error) + expect(subject.delete_pipeline(pipeline_id)).to be_falsey + end + end + end + context "pipelines collections" do context "with a non terminated pipelines" do before :each do diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb index 8775bc3ac..350832797 100644 --- a/logstash-core/spec/logstash/state_resolver_spec.rb +++ b/logstash-core/spec/logstash/state_resolver_spec.rb @@ -172,5 +172,45 @@ describe LogStash::StateResolver do end end end + + context "when a pipeline stops" do + let(:main_pipeline) { mock_pipeline(:main) } + let(:main_pipeline_config) { main_pipeline.pipeline_config } + let(:pipelines) do + r = LogStash::PipelinesRegistry.new + r.create_pipeline(:main, main_pipeline) { true } + r + end + + before do + expect(main_pipeline).to receive(:finished_execution?).at_least(:once).and_return(true) + end + + context "when pipeline config contains a new one and the existing" do + let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] } + + it "creates the new one and keep the other one stop" do + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:create, :hello_world]) + expect(pipelines.non_running_pipelines.size).to eq(1) + end + end + + context "when pipeline config contains an updated pipeline" do + let(:pipeline_configs) { [mock_pipeline_config(:main, "input { generator {}}")] } + + it "should reload the stopped pipeline" do + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:reload, :main]) + end + end + + context "when pipeline config contains no pipeline" do + let(:pipeline_configs) { [] } + + it "should delete the stopped one" do + expect(subject.resolve(pipelines, pipeline_configs)).to have_actions([:delete, :main]) + end + end + end + end end