diff --git a/logstash-core/lib/logstash/config/pipeline_config.rb b/logstash-core/lib/logstash/config/pipeline_config.rb index e4bbcd53c..1713de900 100644 --- a/logstash-core/lib/logstash/config/pipeline_config.rb +++ b/logstash-core/lib/logstash/config/pipeline_config.rb @@ -23,6 +23,10 @@ module LogStash module Config @config_string = config_parts.collect(&:config_string).join("\n") end + def system? + @settings.get("pipeline.system") + end + def ==(other) config_hash == other.config_hash && pipeline_id == other.pipeline_id end diff --git a/logstash-core/lib/logstash/environment.rb b/logstash-core/lib/logstash/environment.rb index e5bee6c8a..708c18ef9 100644 --- a/logstash-core/lib/logstash/environment.rb +++ b/logstash-core/lib/logstash/environment.rb @@ -25,7 +25,7 @@ module LogStash Setting::Numeric.new("config.reload.interval", 3), # in seconds Setting::Boolean.new("metric.collect", true), Setting::String.new("pipeline.id", "main"), - Setting::Boolean.new("pipeline.system", false), + Setting::Boolean.new("pipeline.system", false), Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum), Setting::PositiveInteger.new("pipeline.output.workers", 1), Setting::PositiveInteger.new("pipeline.batch.size", 125), diff --git a/logstash-core/lib/logstash/pipeline_action.rb b/logstash-core/lib/logstash/pipeline_action.rb index 2cc6f2dc4..79ce2fb80 100644 --- a/logstash-core/lib/logstash/pipeline_action.rb +++ b/logstash-core/lib/logstash/pipeline_action.rb @@ -5,9 +5,9 @@ require "logstash/pipeline_action/stop" require "logstash/pipeline_action/reload" module LogStash module PipelineAction - ORDERING = [ - LogStash::PipelineAction::Create, - LogStash::PipelineAction::Reload, - LogStash::PipelineAction::Stop - ] + ORDERING = { + LogStash::PipelineAction::Create => 100, + LogStash::PipelineAction::Reload => 200, + LogStash::PipelineAction::Stop => 300 + } end end diff --git a/logstash-core/lib/logstash/pipeline_action/base.rb b/logstash-core/lib/logstash/pipeline_action/base.rb index 23c53bb38..81931da25 100644 --- a/logstash-core/lib/logstash/pipeline_action/base.rb +++ b/logstash-core/lib/logstash/pipeline_action/base.rb @@ -6,7 +6,6 @@ # Some actions could be retryable, or have a delay or timeout. module LogStash module PipelineAction class Base - # Only used for debugging purpose and in the logger statement. def inspect "#{self.class.name}/pipeline_id:#{pipeline_id}" @@ -17,8 +16,13 @@ module LogStash module PipelineAction raise "`#execute` Not implemented!" end + # See the definition in `logstash/pipeline_action.rb` for the default ordering + def execution_priority + ORDERING.fetch(self.class) + end + def <=>(other) - order = ORDERING.index(self.class) <=> ORDERING.index(other.class) + order = self.execution_priority <=> other.execution_priority order.nonzero? ? order : self.pipeline_id <=> other.pipeline_id end end diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index 2c7aea549..1e5628d0c 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -21,6 +21,14 @@ module LogStash module PipelineAction @pipeline_config.pipeline_id end + # Make sure we execution system pipeline like the monitoring + # before any user defined pipelines, system pipeline register hooks into the system that will be + # triggered by the user defined pipeline. + def execution_priority + default_priority = super + @pipeline_config.system? ? default_priority * -1 : default_priority + end + # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines) diff --git a/logstash-core/spec/logstash/config/pipeline_config_spec.rb b/logstash-core/spec/logstash/config/pipeline_config_spec.rb index abfa327b4..7e4bb78f9 100644 --- a/logstash-core/spec/logstash/config/pipeline_config_spec.rb +++ b/logstash-core/spec/logstash/config/pipeline_config_spec.rb @@ -57,4 +57,20 @@ describe LogStash::Config::PipelineConfig do not_same_pipeline_id = described_class.new(source, :another_pipeline, unordered_config_parts, settings) expect(subject).not_to eq(not_same_pipeline_id) end + + describe "#system?" do + context "when the pipeline is a system pipeline" do + let(:settings) { mock_settings({ "pipeline.system" => true })} + + it "returns true if the pipeline is a system pipeline" do + expect(subject.system?).to be_truthy + end + end + + context "when is not a system pipeline" do + it "returns false if the pipeline is not a system pipeline" do + expect(subject.system?).to be_falsey + end + end + end end diff --git a/logstash-core/spec/logstash/pipeline_action/create_spec.rb b/logstash-core/spec/logstash/pipeline_action/create_spec.rb index ce915cbc0..c67bc209c 100644 --- a/logstash-core/spec/logstash/pipeline_action/create_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/create_spec.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "spec_helper" require_relative "../../support/helpers" +require_relative "../../support/matchers" require "logstash/pipeline_action/create" require "logstash/instrument/null_metric" require "logstash/inputs/generator" @@ -66,4 +67,17 @@ describe LogStash::PipelineAction::Create do end end end + + context "when sorting create action" do + let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") } + let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { generator { id => '123' } } output { null {} }", { "pipeline.system" => true }) } + + it "should give higher priority to system pipeline" do + action_user_pipeline = described_class.new(pipeline_config, metric) + action_system_pipeline = described_class.new(system_pipeline_config, metric) + + sorted = [action_user_pipeline, action_system_pipeline].sort + expect(sorted).to eq([action_system_pipeline, action_user_pipeline]) + end + end end diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb index a6103d4b5..392d1c13b 100644 --- a/logstash-core/spec/logstash/state_resolver_spec.rb +++ b/logstash-core/spec/logstash/state_resolver_spec.rb @@ -18,6 +18,11 @@ describe LogStash::StateResolver do clear_data_dir end + after do + # ensure that the the created pipeline are closed + running_pipelines.each { |_, pipeline| pipeline.close } + end + context "when no pipeline is running" do let(:running_pipelines) { {} } @@ -44,12 +49,6 @@ describe LogStash::StateResolver do context "when a pipeline is running" do let(:running_pipelines) { { :main => mock_pipeline(:main) } } - - after do - # ensure that the the created pipeline are closed - running_pipelines.each { |_, pipeline| pipeline.close } - end - context "when the pipeline config contains a new one and the existing" do let(:pipeline_configs) { [mock_pipeline_config(:hello_world), mock_pipeline_config(:main)] } @@ -104,26 +103,54 @@ describe LogStash::StateResolver do } end - let(:pipeline_configs) do - [ - mock_pipeline_config(:main1), - mock_pipeline_config(:main9), - mock_pipeline_config(:main5, "input { generator {}}"), - mock_pipeline_config(:main3, "input { generator {}}"), - mock_pipeline_config(:main7) - ] + context "without system pipeline" do + let(:pipeline_configs) do + [ + mock_pipeline_config(:main1), + mock_pipeline_config(:main9), + mock_pipeline_config(:main5, "input { generator {}}"), + mock_pipeline_config(:main3, "input { generator {}}"), + mock_pipeline_config(:main7) + ] + end + + it "generates actions required to converge" do + expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions( + [:create, :main7], + [:create, :main9], + [:reload, :main3], + [:reload, :main5], + [:stop, :main2], + [:stop, :main4], + [:stop, :main6] + ) + end end - it "generates actions required to converge" do - expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions( - [:create, :main7], - [:create, :main9], - [:reload, :main3], - [:reload, :main5], - [:stop, :main2], - [:stop, :main4], - [:stop, :main6] - ) + context "with system pipeline" do + let(:pipeline_configs) do + [ + mock_pipeline_config(:main1), + mock_pipeline_config(:main9), + mock_pipeline_config(:main5, "input { generator {}}"), + mock_pipeline_config(:main3, "input { generator {}}"), + mock_pipeline_config(:main7), + mock_pipeline_config(:monitoring, "input { generator {}}", { "pipeline.system" => true }), + ] + end + + it "creates the system pipeline before user defined pipelines" do + expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions( + [:create, :monitoring], + [:create, :main7], + [:create, :main9], + [:reload, :main3], + [:reload, :main5], + [:stop, :main2], + [:stop, :main4], + [:stop, :main6] + ) + end end end end