Make sure we start system pipeline before normal pipeline

PipelineAction now have an `execution_priority`, we use this method to change the priority of a create action when we are creating a system pipeline

Fixes #6885
This commit is contained in:
Pier-Hugues Pellerin 2017-04-20 16:21:29 -04:00
parent 094fe10c6a
commit a2bfcd85d3
8 changed files with 105 additions and 32 deletions

View file

@ -23,6 +23,10 @@ module LogStash module Config
@config_string = config_parts.collect(&:config_string).join("\n")
end
def system?
@settings.get("pipeline.system")
end
def ==(other)
config_hash == other.config_hash && pipeline_id == other.pipeline_id
end

View file

@ -5,9 +5,9 @@ require "logstash/pipeline_action/stop"
require "logstash/pipeline_action/reload"
module LogStash module PipelineAction
ORDERING = [
LogStash::PipelineAction::Create,
LogStash::PipelineAction::Reload,
LogStash::PipelineAction::Stop
]
ORDERING = {
LogStash::PipelineAction::Create => 100,
LogStash::PipelineAction::Reload => 200,
LogStash::PipelineAction::Stop => 300
}
end end

View file

@ -6,7 +6,6 @@
# Some actions could be retryable, or have a delay or timeout.
module LogStash module PipelineAction
class Base
# Only used for debugging purpose and in the logger statement.
def inspect
"#{self.class.name}/pipeline_id:#{pipeline_id}"
@ -17,8 +16,13 @@ module LogStash module PipelineAction
raise "`#execute` Not implemented!"
end
# See the definition in `logstash/pipeline_action.rb` for the default ordering
def execution_priority
ORDERING.fetch(self.class)
end
def <=>(other)
order = ORDERING.index(self.class) <=> ORDERING.index(other.class)
order = self.execution_priority <=> other.execution_priority
order.nonzero? ? order : self.pipeline_id <=> other.pipeline_id
end
end

View file

@ -21,6 +21,14 @@ module LogStash module PipelineAction
@pipeline_config.pipeline_id
end
# Make sure we execution system pipeline like the monitoring
# before any user defined pipelines, system pipeline register hooks into the system that will be
# triggered by the user defined pipeline.
def execution_priority
default_priority = super
@pipeline_config.system? ? default_priority * -1 : default_priority
end
# The execute assume that the thread safety access of the pipeline
# is managed by the caller.
def execute(agent, pipelines)

View file

@ -57,4 +57,20 @@ describe LogStash::Config::PipelineConfig do
not_same_pipeline_id = described_class.new(source, :another_pipeline, unordered_config_parts, settings)
expect(subject).not_to eq(not_same_pipeline_id)
end
describe "#system?" do
context "when the pipeline is a system pipeline" do
let(:settings) { mock_settings({ "pipeline.system" => true })}
it "returns true if the pipeline is a system pipeline" do
expect(subject.system?).to be_truthy
end
end
context "when is not a system pipeline" do
it "returns false if the pipeline is not a system pipeline" do
expect(subject.system?).to be_falsey
end
end
end
end

View file

@ -1,6 +1,7 @@
# encoding: utf-8
require "spec_helper"
require_relative "../../support/helpers"
require_relative "../../support/matchers"
require "logstash/pipeline_action/create"
require "logstash/instrument/null_metric"
require "logstash/inputs/generator"
@ -66,4 +67,17 @@ describe LogStash::PipelineAction::Create do
end
end
end
context "when sorting create action" do
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { id => '123' } } output { null {} }") }
let(:system_pipeline_config) { mock_pipeline_config(:main_2, "input { generator { id => '123' } } output { null {} }", { "pipeline.system" => true }) }
it "should give higher priority to system pipeline" do
action_user_pipeline = described_class.new(pipeline_config, metric)
action_system_pipeline = described_class.new(system_pipeline_config, metric)
sorted = [action_user_pipeline, action_system_pipeline].sort
expect(sorted).to eq([action_system_pipeline, action_user_pipeline])
end
end
end

View file

@ -18,6 +18,11 @@ describe LogStash::StateResolver do
clear_data_dir
end
after do
# ensure that the the created pipeline are closed
running_pipelines.each { |_, pipeline| pipeline.close }
end
context "when no pipeline is running" do
let(:running_pipelines) { {} }
@ -44,12 +49,6 @@ describe LogStash::StateResolver do
context "when a pipeline is running" do
let(:running_pipelines) { { :main => mock_pipeline(:main) } }
after do
# ensure that the the created pipeline are closed
running_pipelines.each { |_, pipeline| pipeline.close }
end
context "when the pipeline config contains a new one and the existing" do
let(:pipeline_configs) { [mock_pipeline_config(:hello_world), mock_pipeline_config(:main)] }
@ -104,6 +103,7 @@ describe LogStash::StateResolver do
}
end
context "without system pipeline" do
let(:pipeline_configs) do
[
mock_pipeline_config(:main1),
@ -126,5 +126,32 @@ describe LogStash::StateResolver do
)
end
end
context "with system pipeline" do
let(:pipeline_configs) do
[
mock_pipeline_config(:main1),
mock_pipeline_config(:main9),
mock_pipeline_config(:main5, "input { generator {}}"),
mock_pipeline_config(:main3, "input { generator {}}"),
mock_pipeline_config(:main7),
mock_pipeline_config(:monitoring, "input { generator {}}", { "pipeline.system" => true }),
]
end
it "creates the system pipeline before user defined pipelines" do
expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions(
[:create, :monitoring],
[:create, :main7],
[:create, :main9],
[:reload, :main3],
[:reload, :main5],
[:stop, :main2],
[:stop, :main4],
[:stop, :main6]
)
end
end
end
end
end