mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
give multiple pipelines all the settings
Previously we'd only give a pipeline the settings related to pipelines The PipelineSettings class was used for this. However a pipeline may need other settings like the keystore location. For this we instead clone the settings object and merge all the pipeline specific settings. This is accomplished with a new method that ensures that only pipeline level settings are overwritten in the clone. Fixes #11076
This commit is contained in:
parent
9bedd9e097
commit
cd91f9b2b3
3 changed files with 45 additions and 57 deletions
|
@ -1,6 +1,6 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/config/source/local"
|
||||
require "logstash/pipeline_settings"
|
||||
require "logstash/settings"
|
||||
|
||||
module LogStash module Config module Source
|
||||
class MultiLocal < Local
|
||||
|
@ -15,7 +15,8 @@ module LogStash module Config module Source
|
|||
def pipeline_configs
|
||||
pipelines = retrieve_yaml_pipelines()
|
||||
pipelines_settings = pipelines.map do |pipeline_settings|
|
||||
::LogStash::PipelineSettings.from_settings(@original_settings.clone).merge(pipeline_settings)
|
||||
clone = @original_settings.clone
|
||||
clone.merge_pipeline_settings(pipeline_settings)
|
||||
end
|
||||
detect_duplicate_pipelines(pipelines_settings)
|
||||
pipeline_configs = pipelines_settings.map do |pipeline_settings|
|
||||
|
|
|
@ -1,55 +0,0 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/settings"
|
||||
|
||||
module LogStash
|
||||
class PipelineSettings < Settings
|
||||
|
||||
# there are settings that the pipeline uses and can be changed per pipeline instance
|
||||
SETTINGS_WHITE_LIST = [
|
||||
"config.debug",
|
||||
"config.support_escapes",
|
||||
"config.reload.automatic",
|
||||
"config.reload.interval",
|
||||
"config.string",
|
||||
"dead_letter_queue.enable",
|
||||
"dead_letter_queue.max_bytes",
|
||||
"metric.collect",
|
||||
"pipeline.java_execution",
|
||||
"pipeline.plugin_classloaders",
|
||||
"path.config",
|
||||
"path.dead_letter_queue",
|
||||
"path.queue",
|
||||
"pipeline.batch.delay",
|
||||
"pipeline.batch.size",
|
||||
"pipeline.id",
|
||||
"pipeline.reloadable",
|
||||
"pipeline.system",
|
||||
"pipeline.workers",
|
||||
"queue.checkpoint.acks",
|
||||
"queue.checkpoint.interval",
|
||||
"queue.checkpoint.writes",
|
||||
"queue.checkpoint.retry",
|
||||
"queue.drain",
|
||||
"queue.max_bytes",
|
||||
"queue.max_events",
|
||||
"queue.page_capacity",
|
||||
"queue.type",
|
||||
]
|
||||
|
||||
# register a set of settings that is used as the default set of pipelines settings
|
||||
def self.from_settings(settings)
|
||||
pipeline_settings = self.new
|
||||
SETTINGS_WHITE_LIST.each do |setting|
|
||||
pipeline_settings.register(settings.get_setting(setting).clone)
|
||||
end
|
||||
pipeline_settings
|
||||
end
|
||||
|
||||
def register(setting)
|
||||
unless SETTINGS_WHITE_LIST.include?(setting.name)
|
||||
raise ArgumentError.new("Only pipeline related settings can be registered in a PipelineSettings object. Received \"#{setting.name}\". Allowed settings: #{SETTINGS_WHITE_LIST}")
|
||||
end
|
||||
super(setting)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -10,6 +10,39 @@ module LogStash
|
|||
include LogStash::Util::SubstitutionVariables
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
# there are settings that the pipeline uses and can be changed per pipeline instance
|
||||
PIPELINE_SETTINGS_WHITE_LIST = [
|
||||
"config.debug",
|
||||
"config.support_escapes",
|
||||
"config.reload.automatic",
|
||||
"config.reload.interval",
|
||||
"config.string",
|
||||
"dead_letter_queue.enable",
|
||||
"dead_letter_queue.max_bytes",
|
||||
"metric.collect",
|
||||
"pipeline.java_execution",
|
||||
"pipeline.plugin_classloaders",
|
||||
"path.config",
|
||||
"path.dead_letter_queue",
|
||||
"path.queue",
|
||||
"pipeline.batch.delay",
|
||||
"pipeline.batch.size",
|
||||
"pipeline.id",
|
||||
"pipeline.reloadable",
|
||||
"pipeline.system",
|
||||
"pipeline.workers",
|
||||
"queue.checkpoint.acks",
|
||||
"queue.checkpoint.interval",
|
||||
"queue.checkpoint.writes",
|
||||
"queue.checkpoint.retry",
|
||||
"queue.drain",
|
||||
"queue.max_bytes",
|
||||
"queue.max_events",
|
||||
"queue.page_capacity",
|
||||
"queue.type",
|
||||
]
|
||||
|
||||
|
||||
def initialize
|
||||
@settings = {}
|
||||
# Theses settings were loaded from the yaml file
|
||||
|
@ -89,6 +122,15 @@ module LogStash
|
|||
self
|
||||
end
|
||||
|
||||
def merge_pipeline_settings(hash, graceful = false)
|
||||
hash.each do |key, _|
|
||||
unless PIPELINE_SETTINGS_WHITE_LIST.include?(key)
|
||||
raise ArgumentError.new("Only pipeline related settings are expected. Received \"#{key}\". Allowed settings: #{PIPELINE_SETTINGS_WHITE_LIST}")
|
||||
end
|
||||
end
|
||||
merge(hash, graceful)
|
||||
end
|
||||
|
||||
def format_settings
|
||||
output = []
|
||||
output << "-------- Logstash Settings (* means modified) ---------"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue