mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
MODULES: Fix broken multi-pipelines because, you know, modules (#7466)
* Update runner, source/local, source/modules, source/multi_local improve startup config problem detection across all sources. * fix multi_local spec to use not `empty` values.
This commit is contained in:
parent
6e165a216c
commit
d13d54c377
9 changed files with 276 additions and 126 deletions
|
@ -4,84 +4,9 @@ require "logstash/logging"
|
|||
|
||||
module LogStash module BootstrapCheck
|
||||
class DefaultConfig
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
end
|
||||
|
||||
def config_reload?
|
||||
@settings.get("config.reload.automatic")
|
||||
end
|
||||
|
||||
def config_string?
|
||||
@settings.get("config.string")
|
||||
end
|
||||
|
||||
def path_config?
|
||||
@settings.get("path.config")
|
||||
end
|
||||
|
||||
def config_modules?
|
||||
# We want it to report true if not empty
|
||||
!@settings.get("modules").empty?
|
||||
end
|
||||
|
||||
def cli_modules?
|
||||
# We want it to report true if not empty
|
||||
!@settings.get("modules.cli").empty?
|
||||
end
|
||||
|
||||
def both_config_flags?
|
||||
config_string? && path_config?
|
||||
end
|
||||
|
||||
def both_module_configs?
|
||||
cli_modules? && config_modules?
|
||||
end
|
||||
|
||||
def config_defined?
|
||||
config_string? || path_config?
|
||||
end
|
||||
|
||||
def modules_defined?
|
||||
cli_modules? || config_modules?
|
||||
end
|
||||
|
||||
def any_config?
|
||||
config_defined? || modules_defined?
|
||||
end
|
||||
|
||||
def check
|
||||
# Check if both -f and -e are present
|
||||
if both_config_flags?
|
||||
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.config-string-path-exclusive")
|
||||
end
|
||||
|
||||
# Make note that if modules are configured in both cli and logstash.yml that cli module
|
||||
# settings will be used, and logstash.yml modules settings ignored
|
||||
if both_module_configs?
|
||||
logger.info(I18n.t("logstash.runner.cli-module-override"))
|
||||
end
|
||||
|
||||
# Check if both config (-f or -e) and modules are configured
|
||||
if config_defined? && modules_defined?
|
||||
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.config-module-exclusive")
|
||||
end
|
||||
|
||||
# Check for absence of any configuration
|
||||
if !any_config?
|
||||
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.missing-configuration")
|
||||
end
|
||||
|
||||
# Check to ensure that if configuration auto-reload is used that -f is specified
|
||||
if config_reload? && !path_config?
|
||||
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.reload-without-config-path")
|
||||
end
|
||||
end
|
||||
|
||||
def self.check(settings)
|
||||
DefaultConfig.new(settings).check
|
||||
# currently none of the checks applies if there are multiple pipelines
|
||||
# See LogStash::Config::Source::Base for any further settings conflict checks
|
||||
end
|
||||
end
|
||||
end end
|
||||
|
|
|
@ -1,8 +1,11 @@
|
|||
# encoding: utf-8
|
||||
module LogStash module Config module Source
|
||||
class Base
|
||||
attr_reader :conflict_messages
|
||||
|
||||
def initialize(settings)
|
||||
@settings = settings
|
||||
@conflict_messages = []
|
||||
end
|
||||
|
||||
def pipeline_configs
|
||||
|
@ -12,5 +15,77 @@ module LogStash module Config module Source
|
|||
def match?
|
||||
raise NotImplementedError, "`match?` must be implemented!"
|
||||
end
|
||||
|
||||
def config_conflict?
|
||||
raise NotImplementedError, "`config_conflict?` must be implemented!"
|
||||
end
|
||||
|
||||
def config_reload_automatic_setting
|
||||
@settings.get_setting("config.reload.automatic")
|
||||
end
|
||||
|
||||
def config_reload_automatic
|
||||
config_reload_automatic_setting.value
|
||||
end
|
||||
|
||||
def config_reload_automatic?
|
||||
config_reload_automatic_setting.set?
|
||||
end
|
||||
|
||||
def config_string_setting
|
||||
@settings.get_setting("config.string")
|
||||
end
|
||||
|
||||
def config_string
|
||||
config_string_setting.value
|
||||
end
|
||||
|
||||
def config_string?
|
||||
!(config_string.nil? || config_string.empty?)
|
||||
end
|
||||
|
||||
def config_path_setting
|
||||
@settings.get_setting("path.config")
|
||||
end
|
||||
|
||||
def config_path
|
||||
config_path_setting.value
|
||||
end
|
||||
|
||||
def config_path?
|
||||
!(config_path.nil? || config_path.empty?)
|
||||
end
|
||||
|
||||
def modules_cli_setting
|
||||
@settings.get_setting("modules.cli")
|
||||
end
|
||||
|
||||
def modules_cli
|
||||
modules_cli_setting.value
|
||||
end
|
||||
|
||||
def modules_cli?
|
||||
!(modules_cli.nil? || modules_cli.empty?)
|
||||
end
|
||||
|
||||
def modules_setting
|
||||
@settings.get_setting("modules")
|
||||
end
|
||||
|
||||
def modules
|
||||
modules_setting.value
|
||||
end
|
||||
|
||||
def modules?
|
||||
!(modules.nil? || modules.empty?)
|
||||
end
|
||||
|
||||
def both_module_configs?
|
||||
modules_cli? && modules?
|
||||
end
|
||||
|
||||
def modules_defined?
|
||||
modules_cli? || modules?
|
||||
end
|
||||
end
|
||||
end end end
|
||||
|
|
|
@ -143,13 +143,35 @@ module LogStash module Config module Source
|
|||
OUTPUT_BLOCK_RE = /output *{/
|
||||
|
||||
def pipeline_configs
|
||||
if config_conflict?
|
||||
raise ConfigurationError, @conflict_messages.join(", ")
|
||||
end
|
||||
local_pipeline_configs
|
||||
end
|
||||
|
||||
if config_path? && config_string?
|
||||
raise ConfigurationError.new("Settings 'config.string' and 'path.config' can't be used simultaneously.")
|
||||
elsif !config_path? && !config_string?
|
||||
raise ConfigurationError.new("Either 'config.string' or 'path.config' must be set.")
|
||||
def match?
|
||||
# see basic settings predicates and getters defined in the base class
|
||||
(config_string? || config_path?) && !(modules_cli? || modules?) && !automatic_reload_with_config_string?
|
||||
end
|
||||
|
||||
def config_conflict?
|
||||
@conflict_messages.clear
|
||||
|
||||
# Check if configuration auto-reload is used that -f is specified
|
||||
if automatic_reload_with_config_string?
|
||||
@conflict_messages << I18n.t("logstash.runner.reload-with-config-string")
|
||||
end
|
||||
# Check if both -f and -e are present
|
||||
if config_string? && config_path?
|
||||
@conflict_messages << I18n.t("logstash.runner.config-string-path-exclusive")
|
||||
end
|
||||
|
||||
@conflict_messages.any?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def local_pipeline_configs
|
||||
config_parts = if config_string?
|
||||
ConfigStringLoader.read(config_string)
|
||||
elsif local_config?
|
||||
|
@ -167,11 +189,10 @@ module LogStash module Config module Source
|
|||
[PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)]
|
||||
end
|
||||
|
||||
def match?
|
||||
config_string? || config_path?
|
||||
def automatic_reload_with_config_string?
|
||||
config_reload_automatic? && !config_path? && config_string?
|
||||
end
|
||||
|
||||
private
|
||||
# Make sure we have an input and at least 1 output
|
||||
# if its not the case we will add stdin and stdout
|
||||
# this is for backward compatibility reason
|
||||
|
@ -186,22 +207,6 @@ module LogStash module Config module Source
|
|||
end
|
||||
end
|
||||
|
||||
def config_string
|
||||
@settings.get("config.string")
|
||||
end
|
||||
|
||||
def config_string?
|
||||
!config_string.nil?
|
||||
end
|
||||
|
||||
def config_path
|
||||
@settings.get("path.config")
|
||||
end
|
||||
|
||||
def config_path?
|
||||
!config_path.nil? && !config_path.empty?
|
||||
end
|
||||
|
||||
def local_config?
|
||||
return false unless config_path?
|
||||
|
||||
|
|
|
@ -11,6 +11,10 @@ module LogStash module Config module Source
|
|||
class Modules < Base
|
||||
include LogStash::Util::Loggable
|
||||
def pipeline_configs
|
||||
if config_conflict? # double check
|
||||
raise ConfigurationError, @conflict_messages.join(", ")
|
||||
end
|
||||
|
||||
pipelines = LogStash::Config::ModulesCommon.pipeline_configs(@settings)
|
||||
pipelines.map do |hash|
|
||||
PipelineConfig.new(self, hash["pipeline_id"].to_sym,
|
||||
|
@ -20,8 +24,34 @@ module LogStash module Config module Source
|
|||
end
|
||||
|
||||
def match?
|
||||
# will fill this later
|
||||
true
|
||||
# see basic settings predicates and getters defined in the base class
|
||||
(modules_cli? || modules?) && !(config_string? || config_path?) && !automatic_reload_with_modules?
|
||||
end
|
||||
|
||||
def config_conflict?
|
||||
@conflict_messages.clear
|
||||
# Make note that if modules are configured in both cli and logstash.yml that cli module
|
||||
# settings will be used, and logstash.yml modules settings ignored
|
||||
if modules_cli? && modules?
|
||||
logger.info(I18n.t("logstash.runner.cli-module-override"))
|
||||
end
|
||||
|
||||
if automatic_reload_with_modules?
|
||||
@conflict_messages << I18n.t("logstash.runner.reload-with-modules")
|
||||
end
|
||||
|
||||
# Check if config (-f or -e) and modules are configured
|
||||
if (modules_cli? || modules?) && (config_string? || config_path?)
|
||||
@conflict_messages << I18n.t("logstash.runner.config-module-exclusive")
|
||||
end
|
||||
|
||||
@conflict_messages.any?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def automatic_reload_with_modules?
|
||||
(modules_cli? || modules?) && config_reload_automatic?
|
||||
end
|
||||
end
|
||||
end end end
|
||||
|
|
|
@ -10,6 +10,7 @@ module LogStash module Config module Source
|
|||
def initialize(settings)
|
||||
@original_settings = settings
|
||||
super(settings)
|
||||
@match_warning_done = false
|
||||
end
|
||||
|
||||
def pipeline_configs
|
||||
|
@ -23,29 +24,41 @@ module LogStash module Config module Source
|
|||
# this relies on instance variable @settings and the parent class' pipeline_configs
|
||||
# method. The alternative is to refactor most of the Local source methods to accept
|
||||
# a settings object instead of relying on @settings.
|
||||
super # create a PipelineConfig object based on @settings
|
||||
local_pipeline_configs # create a PipelineConfig object based on @settings
|
||||
end.flatten
|
||||
end
|
||||
|
||||
def match?
|
||||
uses_config_string = @original_settings.get_setting("config.string").set?
|
||||
uses_path_config = @original_settings.get_setting("path.config").set?
|
||||
uses_modules_cli = @original_settings.get_setting("modules.cli").set?
|
||||
uses_modules_yml = @original_settings.get_setting("modules").set?
|
||||
return true if !uses_config_string && !uses_path_config && !uses_modules_cli && !uses_modules_yml
|
||||
if uses_path_config
|
||||
logger.warn("Ignoring the 'pipelines.yml' file because 'path.config' (-f) is being used.")
|
||||
elsif uses_config_string
|
||||
logger.warn("Ignoring the 'pipelines.yml' file because 'config.string' (-e) is being used.")
|
||||
elsif uses_modules_cli
|
||||
logger.warn("Ignoring the 'pipelines.yml' file because 'modules.cli' (--modules) is being used.")
|
||||
elsif uses_modules_yml
|
||||
logger.warn("Ignoring the 'pipelines.yml' file because modules are defined in the 'logstash.yml' file.")
|
||||
detect_pipelines if !@detect_pipelines_called
|
||||
# see basic settings predicates and getters defined in the base class
|
||||
return !(invalid_pipelines_detected? || modules_cli? || modules? || config_string? || config_path?)
|
||||
end
|
||||
|
||||
def invalid_pipelines_detected?
|
||||
!@detected_marker || @detected_marker.is_a?(Class)
|
||||
end
|
||||
|
||||
def config_conflict?
|
||||
detect_pipelines if !@detect_pipelines_called
|
||||
@conflict_messages.clear
|
||||
# are there any auto-reload conflicts?
|
||||
if !(modules_cli? || modules? || config_string? || config_path?)
|
||||
if @detected_marker.nil?
|
||||
@conflict_messages << I18n.t("logstash.runner.config-pipelines-failed-read", :path => pipelines_yaml_location)
|
||||
elsif @detected_marker == false
|
||||
@conflict_messages << I18n.t("logstash.runner.config-pipelines-empty", :path => pipelines_yaml_location)
|
||||
elsif @detected_marker.is_a?(Class)
|
||||
@conflict_messages << I18n.t("logstash.runner.config-pipelines-invalid", :invalid_class => @detected_marker, :path => pipelines_yaml_location)
|
||||
end
|
||||
else
|
||||
do_warning? && logger.warn("Ignoring the 'pipelines.yml' file because modules or command line options are specified")
|
||||
end
|
||||
false
|
||||
@conflict_messages.any?
|
||||
end
|
||||
|
||||
def retrieve_yaml_pipelines
|
||||
# by now, either the config_conflict? or the match? should have ruled out any config problems
|
||||
# but we don't rely on this, we can still get IO errors or
|
||||
result = read_pipelines_from_yaml(pipelines_yaml_location)
|
||||
case result
|
||||
when Array
|
||||
|
@ -74,5 +87,28 @@ module LogStash module Config module Source
|
|||
raise ConfigurationError.new("Pipelines YAML file contains duplicate pipeline ids: #{duplicate_ids.inspect}. Location: #{pipelines_yaml_location}")
|
||||
end
|
||||
end
|
||||
|
||||
def detect_pipelines
|
||||
result = read_pipelines_from_yaml(pipelines_yaml_location) rescue nil
|
||||
if result.is_a?(Array)
|
||||
@detected_marker = true
|
||||
elsif result.nil?
|
||||
@detected_marker = nil
|
||||
elsif !result
|
||||
@detected_marker = false
|
||||
else
|
||||
@detected_marker = result.class
|
||||
end
|
||||
@detect_pipelines_called = true
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def do_warning?
|
||||
if !(done = true && @match_warning_done)
|
||||
@match_warning_done = true
|
||||
end
|
||||
!done
|
||||
end
|
||||
end
|
||||
end end end
|
||||
|
|
|
@ -297,6 +297,29 @@ class LogStash::Runner < Clamp::StrictCommand
|
|||
|
||||
@settings.format_settings.each {|line| logger.debug(line) }
|
||||
|
||||
# Check for absence of any configuration
|
||||
# not bulletproof because we don't know yet if there
|
||||
# are no pipelines from pipelines.yml
|
||||
sources_without_conflict = []
|
||||
unmatched_sources_conflict_messages = []
|
||||
@source_loader.sources do |source|
|
||||
if source.config_conflict?
|
||||
if source.conflict_messages.any?
|
||||
unmatched_sources_conflict_messages << source.conflict_messages.join(", ")
|
||||
end
|
||||
else
|
||||
sources_without_conflict << source
|
||||
end
|
||||
end
|
||||
if unmatched_sources_conflict_messages.any?
|
||||
# i18n should be done at the sources side
|
||||
signal_usage_error(unmatched_sources_conflict_messages.join(" "))
|
||||
return 1
|
||||
elsif sources_without_conflict.empty?
|
||||
signal_usage_error(I18n.t("logstash.runner.missing-configuration"))
|
||||
return 1
|
||||
end
|
||||
|
||||
if setting("config.test_and_exit")
|
||||
begin
|
||||
results = @source_loader.fetch
|
||||
|
|
|
@ -124,10 +124,18 @@ en:
|
|||
config-module-exclusive: >-
|
||||
Settings 'path.config' (-f) or 'config.string' (-e) can't be used in conjunction with
|
||||
(--modules) or the "modules:" block in the logstash.yml file.
|
||||
reload-with-modules: >-
|
||||
Configuration reloading can't be used with command-line or logstash.yml specified modules.
|
||||
cli-module-override: >-
|
||||
Both command-line and logstash.yml modules configurations detected.
|
||||
Using command-line module configuration and ignoring logstash.yml module
|
||||
configuration.
|
||||
config-pipelines-failed-read: >-
|
||||
Failed to read pipelines yaml file. Location: %{path}
|
||||
config-pipelines-empty: >-
|
||||
Pipelines YAML file is empty. Location: %{path}
|
||||
config-pipelines-invalid: >-
|
||||
Pipelines YAML file must contain an array of pipeline configs. Found "%{invalid_class}" in %{path}
|
||||
reload-without-config-path: >-
|
||||
Configuration reloading also requires passing a configuration path with '-f yourlogstash.conf'
|
||||
reload-with-config-string: >-
|
||||
|
|
|
@ -288,13 +288,15 @@ describe LogStash::Config::Source::Local do
|
|||
let(:settings) do
|
||||
mock_settings(
|
||||
"config.string" => "#{filter_block} #{output_block}",
|
||||
"path.config" => config_file
|
||||
"path.config" => config_file,
|
||||
"modules.cli" => [],
|
||||
"modules" => []
|
||||
)
|
||||
end
|
||||
|
||||
# this should be impossible as the bootstrap checks should catch this
|
||||
it "raises an exception" do
|
||||
expect { subject.pipeline_configs }.to raise_error
|
||||
expect { subject.pipeline_configs }.to raise_error(LogStash::ConfigurationError)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -18,10 +18,52 @@ describe LogStash::Config::Source::MultiLocal do
|
|||
allow(subject).to receive(:pipelines_yaml_location).and_return(pipelines_yaml_location)
|
||||
end
|
||||
|
||||
describe "#config_conflict?" do
|
||||
context "when `config.string` is set" do
|
||||
let(:settings) do
|
||||
mock_settings("config.string" => "input {} output {}")
|
||||
end
|
||||
it "returns false" do
|
||||
expect(subject.config_conflict?).to be_falsey
|
||||
expect(subject.conflict_messages).to be_empty
|
||||
end
|
||||
end
|
||||
|
||||
context "when `config.path` is set" do
|
||||
let(:config_file) { temporary_file("") }
|
||||
|
||||
let(:settings) do
|
||||
mock_settings("path.config" => config_file)
|
||||
end
|
||||
it "returns false" do
|
||||
expect(subject.config_conflict?).to be_falsey
|
||||
expect(subject.conflict_messages).to be_empty
|
||||
end
|
||||
end
|
||||
|
||||
context "when `pipelines.yml` is not set" do
|
||||
let(:pipelines_yaml_location) { ::File.join(Stud::Temporary.pathname, "pipelines.yml") }
|
||||
it "returns true with messages" do
|
||||
expect(subject.config_conflict?).to be_truthy
|
||||
expect(subject.conflict_messages).to include(/Failed to read pipelines yaml file. Location:/)
|
||||
end
|
||||
end
|
||||
|
||||
context "when `pipelines.yml` is only comments" do
|
||||
before(:each) do
|
||||
allow(subject).to receive(:read_pipelines_from_yaml).and_return(::YAML.load("# blah\n# blah\n# blah\n"))
|
||||
end
|
||||
it "returns true with messages" do
|
||||
expect(subject.config_conflict?).to be_truthy
|
||||
expect(subject.conflict_messages).to include(/Pipelines YAML file is empty. Location:/)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#match?" do
|
||||
context "when `config.string` is set" do
|
||||
let(:settings) do
|
||||
mock_settings("config.string" => "")
|
||||
mock_settings("config.string" => "input {} output {}")
|
||||
end
|
||||
it "returns false" do
|
||||
expect(subject.match?).to be_falsey
|
||||
|
@ -42,27 +84,31 @@ describe LogStash::Config::Source::MultiLocal do
|
|||
|
||||
context "when both `config.string` and `path.config` are set" do
|
||||
let(:settings) do
|
||||
mock_settings("config.string" => "", "path.config" => temporary_file(""))
|
||||
mock_settings("config.string" => "input {} output {}", "path.config" => temporary_file("input {} output {}"))
|
||||
end
|
||||
it "returns false" do
|
||||
expect(subject.match?).to be_falsey
|
||||
end
|
||||
end
|
||||
|
||||
context "when neither `config.path` nor `path.config` are set`" do
|
||||
context "when neither `config.path` nor `path.config` are set` and pipelines.yml has configs" do
|
||||
before do
|
||||
allow(subject).to receive(:invalid_pipelines_detected?).and_return(false)
|
||||
end
|
||||
it "returns true" do
|
||||
expect(subject.match?).to be_truthy
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#detect_duplicate_pipelines" do
|
||||
let(:retrieved_pipelines) { [{}] }
|
||||
let(:retrieved_pipelines_configs) { retrieved_pipelines.map {|h| mock_settings(h) } }
|
||||
context "when there are duplicate pipeline ids" do
|
||||
let(:retrieved_pipelines) do
|
||||
[
|
||||
{"pipeline.id" => "main", "config.string" => ""},
|
||||
{"pipeline.id" => "main", "config.string" => ""},
|
||||
{"pipeline.id" => "main", "config.string" => "input {} output {}"},
|
||||
{"pipeline.id" => "main", "config.string" => "input {} output {}"},
|
||||
]
|
||||
end
|
||||
it "should raise a ConfigurationError" do
|
||||
|
@ -72,8 +118,8 @@ describe LogStash::Config::Source::MultiLocal do
|
|||
context "when there are no duplicate pipeline ids" do
|
||||
let(:retrieved_pipelines) do
|
||||
[
|
||||
{"pipeline.id" => "main", "config.string" => ""},
|
||||
{"pipeline.id" => "backup", "config.string" => ""},
|
||||
{"pipeline.id" => "main", "config.string" => "input {} output {}"},
|
||||
{"pipeline.id" => "backup", "config.string" => "input {} output {}"},
|
||||
]
|
||||
end
|
||||
it "should not raise an error" do
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue