mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
refactor agent pipeline reloading to avoid double live pipelines with same settings
extracted BasePipeline class to support complete config validation minor review changes added comment
This commit is contained in:
parent
7fd2f28c42
commit
e503fcf60b
3 changed files with 201 additions and 149 deletions
|
@ -207,16 +207,13 @@ class LogStash::Agent
|
|||
@collector = LogStash::Instrument::Collector.new
|
||||
|
||||
@metric = if collect_metrics?
|
||||
@logger.debug("Agent: Configuring metric collection")
|
||||
LogStash::Instrument::Metric.new(@collector)
|
||||
else
|
||||
LogStash::Instrument::NullMetric.new(@collector)
|
||||
end
|
||||
@logger.debug("Agent: Configuring metric collection")
|
||||
LogStash::Instrument::Metric.new(@collector)
|
||||
else
|
||||
LogStash::Instrument::NullMetric.new(@collector)
|
||||
end
|
||||
|
||||
|
||||
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric,
|
||||
settings.get("queue.type"),
|
||||
self)
|
||||
@periodic_pollers = LogStash::Instrument::PeriodicPollers.new(@metric, settings.get("queue.type"), self)
|
||||
@periodic_pollers.start
|
||||
end
|
||||
|
||||
|
@ -232,31 +229,40 @@ class LogStash::Agent
|
|||
@collect_metric
|
||||
end
|
||||
|
||||
def create_pipeline(settings, config=nil)
|
||||
def increment_reload_failures_metrics(id, config, exception)
|
||||
@instance_reload_metric.increment(:failures)
|
||||
@pipeline_reload_metric.namespace([id.to_sym, :reloads]).tap do |n|
|
||||
n.increment(:failures)
|
||||
n.gauge(:last_error, { :message => exception.message, :backtrace => exception.backtrace})
|
||||
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
if @logger.debug?
|
||||
@logger.error("fetched an invalid config", :config => config, :reason => exception.message, :backtrace => exception.backtrace)
|
||||
else
|
||||
@logger.error("fetched an invalid config", :config => config, :reason => exception.message)
|
||||
end
|
||||
end
|
||||
|
||||
# create a new pipeline with the given settings and config, if the pipeline initialization failed
|
||||
# increment the failures metrics
|
||||
# @param settings [Settings] the setting for the new pipelines
|
||||
# @param config [String] the configuration string or nil to fetch the configuration per settings
|
||||
# @return [Pipeline] the new pipeline or nil if it failed
|
||||
def create_pipeline(settings, config = nil)
|
||||
if config.nil?
|
||||
begin
|
||||
config = fetch_config(settings)
|
||||
rescue => e
|
||||
@logger.error("failed to fetch pipeline configuration", :message => e.message)
|
||||
return
|
||||
return nil
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
LogStash::Pipeline.new(config, settings, metric)
|
||||
rescue => e
|
||||
@instance_reload_metric.increment(:failures)
|
||||
@pipeline_reload_metric.namespace([settings.get("pipeline.id").to_sym, :reloads]).tap do |n|
|
||||
n.increment(:failures)
|
||||
n.gauge(:last_error, { :message => e.message, :backtrace => e.backtrace})
|
||||
n.gauge(:last_failure_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
if @logger.debug?
|
||||
@logger.error("fetched an invalid config", :config => config, :reason => e.message, :backtrace => e.backtrace)
|
||||
else
|
||||
@logger.error("fetched an invalid config", :config => config, :reason => e.message)
|
||||
end
|
||||
return
|
||||
increment_reload_failures_metrics(settings.get("pipeline.id"), config, e)
|
||||
return nil
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -264,30 +270,89 @@ class LogStash::Agent
|
|||
@config_loader.format_config(settings.get("path.config"), settings.get("config.string"))
|
||||
end
|
||||
|
||||
# since this method modifies the @pipelines hash it is
|
||||
# wrapped in @upgrade_mutex in the parent call `reload_state!`
|
||||
# reload_pipeline trys to reloads the pipeline with id using a potential new configuration if it changed
|
||||
# since this method modifies the @pipelines hash it is wrapped in @upgrade_mutex in the parent call `reload_state!`
|
||||
# @param id [String] the pipeline id to reload
|
||||
def reload_pipeline!(id)
|
||||
old_pipeline = @pipelines[id]
|
||||
new_config = fetch_config(old_pipeline.settings)
|
||||
|
||||
if old_pipeline.config_str == new_config
|
||||
@logger.debug("no configuration change for pipeline",
|
||||
:pipeline => id, :config => new_config)
|
||||
@logger.debug("no configuration change for pipeline", :pipeline => id, :config => new_config)
|
||||
return
|
||||
end
|
||||
|
||||
new_pipeline = create_pipeline(old_pipeline.settings, new_config)
|
||||
|
||||
return if new_pipeline.nil?
|
||||
|
||||
if !new_pipeline.reloadable?
|
||||
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"),
|
||||
:pipeline_id => id,
|
||||
:plugins => new_pipeline.non_reloadable_plugins.map(&:class))
|
||||
# check if this pipeline is not reloadable. it should not happen as per the check below
|
||||
# but keep it here as a safety net if a reloadable pipeline was releoaded with a non reloadable pipeline
|
||||
if !old_pipeline.reloadable?
|
||||
@logger.error("pipeline is not reloadable", :pipeline => id)
|
||||
return
|
||||
else
|
||||
@logger.warn("fetched new config for pipeline. upgrading..",
|
||||
:pipeline => id, :config => new_pipeline.config_str)
|
||||
upgrade_pipeline(id, new_pipeline)
|
||||
end
|
||||
|
||||
# BasePipeline#initialize will compile the config, and load all plugins and raise an exception
|
||||
# on an invalid configuration
|
||||
begin
|
||||
pipeline_validator = LogStash::BasePipeline.new(new_config, old_pipeline.settings)
|
||||
rescue => e
|
||||
increment_reload_failures_metrics(id, new_config, e)
|
||||
return
|
||||
end
|
||||
|
||||
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort
|
||||
if !pipeline_validator.reloadable?
|
||||
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => id, :plugins => pipeline_validator.non_reloadable_plugins.map(&:class))
|
||||
# TODO: in the original code the failure metrics were not incremented, should we do it here?
|
||||
# increment_reload_failures_metrics(id, new_config, e)
|
||||
return
|
||||
end
|
||||
|
||||
# we know configis valid so we are fairly comfortable to first stop old pipeline and then start new one
|
||||
upgrade_pipeline(id, old_pipeline.settings, new_config)
|
||||
end
|
||||
|
||||
# upgrade_pipeline first stops the old pipeline and starts the new one
|
||||
# this method exists only for specs to be able to expects this to be executed
|
||||
# @params pipeline_id [String] the pipeline id to upgrade
|
||||
# @params settings [Settings] the settings for the new pipeline
|
||||
# @params new_config [String] the new pipeline config
|
||||
def upgrade_pipeline(pipeline_id, settings, new_config)
|
||||
@logger.warn("fetched new config for pipeline. upgrading..", :pipeline => pipeline_id, :config => new_config)
|
||||
|
||||
# first step: stop the old pipeline.
|
||||
# IMPORTANT: a new pipeline with same settings should not be instantiated before the previous one is shutdown
|
||||
|
||||
stop_pipeline(pipeline_id)
|
||||
reset_pipeline_metrics(pipeline_id)
|
||||
|
||||
# second step create and start a new pipeline now that the old one is shutdown
|
||||
|
||||
new_pipeline = create_pipeline(settings, new_config)
|
||||
if new_pipeline.nil?
|
||||
# this is a scenario where the configuration is valid (compilable) but the new pipeline refused to start
|
||||
# and at this point NO pipeline is running
|
||||
@logger.error("failed to create the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
|
||||
return
|
||||
end
|
||||
|
||||
# check if the new pipeline will be reloadable in which case we want to log that as an error and abort. this should normally not
|
||||
# happen since the check should be done in reload_pipeline! prior to get here.
|
||||
if !new_pipeline.reloadable?
|
||||
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => pipeline_id, :plugins => new_pipeline.non_reloadable_plugins.map(&:class))
|
||||
return
|
||||
end
|
||||
|
||||
@pipelines[pipeline_id] = new_pipeline
|
||||
|
||||
if !start_pipeline(pipeline_id)
|
||||
@logger.error("failed to start the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id)
|
||||
return
|
||||
end
|
||||
|
||||
# pipeline started successfuly, update reload success metrics
|
||||
@instance_reload_metric.increment(:successes)
|
||||
@pipeline_reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n|
|
||||
n.increment(:successes)
|
||||
n.gauge(:last_success_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -349,20 +414,6 @@ class LogStash::Agent
|
|||
thread.is_a?(Thread) && thread.alive?
|
||||
end
|
||||
|
||||
def upgrade_pipeline(pipeline_id, new_pipeline)
|
||||
stop_pipeline(pipeline_id)
|
||||
reset_pipeline_metrics(pipeline_id)
|
||||
@pipelines[pipeline_id] = new_pipeline
|
||||
if start_pipeline(pipeline_id) # pipeline started successfuly
|
||||
@instance_reload_metric.increment(:successes)
|
||||
@pipeline_reload_metric.namespace([pipeline_id.to_sym, :reloads]).tap do |n|
|
||||
n.increment(:successes)
|
||||
n.gauge(:last_success_timestamp, LogStash::Timestamp.now)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
def clean_state?
|
||||
@pipelines.empty?
|
||||
end
|
||||
|
|
|
@ -21,21 +21,101 @@ require "logstash/output_delegator"
|
|||
require "logstash/filter_delegator"
|
||||
require "logstash/queue_factory"
|
||||
|
||||
module LogStash; class Pipeline
|
||||
module LogStash; class BasePipeline
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
attr_reader :inputs,
|
||||
:filters,
|
||||
:outputs,
|
||||
attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id
|
||||
|
||||
def initialize(config_str, settings)
|
||||
@logger = self.logger
|
||||
@config_str = config_str
|
||||
@config_hash = Digest::SHA1.hexdigest(@config_str)
|
||||
# Every time #plugin is invoked this is incremented to give each plugin
|
||||
# a unique id when auto-generating plugin ids
|
||||
@plugin_counter ||= 0
|
||||
|
||||
@pipeline_id = settings.get_value("pipeline.id") || self.object_id
|
||||
|
||||
# A list of plugins indexed by id
|
||||
@plugins_by_id = {}
|
||||
@inputs = nil
|
||||
@filters = nil
|
||||
@outputs = nil
|
||||
|
||||
grammar = LogStashConfigParser.new
|
||||
parsed_config = grammar.parse(config_str)
|
||||
raise(ConfigurationError, grammar.failure_reason) if parsed_config.nil?
|
||||
|
||||
config_code = parsed_config.compile
|
||||
|
||||
# config_code = BasePipeline.compileConfig(config_str)
|
||||
|
||||
if settings.get_value("config.debug") && @logger.debug?
|
||||
@logger.debug("Compiled pipeline code", :code => config_code)
|
||||
end
|
||||
|
||||
# Evaluate the config compiled code that will initialize all the plugins and define the
|
||||
# filter and output methods.
|
||||
begin
|
||||
eval(config_code)
|
||||
rescue => e
|
||||
# TODO: the original code rescue e but does nothing with it, should we re-raise to have original exception details!?
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
||||
def plugin(plugin_type, name, *args)
|
||||
@plugin_counter += 1
|
||||
|
||||
# Collapse the array of arguments into a single merged hash
|
||||
args = args.reduce({}, &:merge)
|
||||
|
||||
id = if args["id"].nil? || args["id"].empty?
|
||||
args["id"] = "#{@config_hash}-#{@plugin_counter}"
|
||||
else
|
||||
args["id"]
|
||||
end
|
||||
|
||||
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
||||
@plugins_by_id[id] = true
|
||||
|
||||
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
|
||||
metric = @metric || Instrument::NullMetric.new
|
||||
|
||||
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
|
||||
# Scope plugins of type 'input' to 'inputs'
|
||||
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
|
||||
|
||||
klass = Plugin.lookup(plugin_type, name)
|
||||
|
||||
if plugin_type == "output"
|
||||
OutputDelegator.new(@logger, klass, type_scoped_metric, OutputDelegatorStrategyRegistry.instance, args)
|
||||
elsif plugin_type == "filter"
|
||||
FilterDelegator.new(@logger, klass, type_scoped_metric, args)
|
||||
else # input
|
||||
input_plugin = klass.new(args)
|
||||
input_plugin.metric = type_scoped_metric.namespace(id)
|
||||
input_plugin
|
||||
end
|
||||
end
|
||||
|
||||
def reloadable?
|
||||
non_reloadable_plugins.empty?
|
||||
end
|
||||
|
||||
def non_reloadable_plugins
|
||||
(inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
|
||||
end
|
||||
end; end
|
||||
|
||||
module LogStash; class Pipeline < BasePipeline
|
||||
attr_reader \
|
||||
:worker_threads,
|
||||
:events_consumed,
|
||||
:events_filtered,
|
||||
:reporter,
|
||||
:pipeline_id,
|
||||
:started_at,
|
||||
:thread,
|
||||
:config_str,
|
||||
:config_hash,
|
||||
:settings,
|
||||
:metric,
|
||||
:filter_queue_client,
|
||||
|
@ -45,55 +125,19 @@ module LogStash; class Pipeline
|
|||
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
|
||||
|
||||
def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
|
||||
@logger = self.logger
|
||||
@config_str = config_str
|
||||
@config_hash = Digest::SHA1.hexdigest(@config_str)
|
||||
# Every time #plugin is invoked this is incremented to give each plugin
|
||||
# a unique id when auto-generating plugin ids
|
||||
@plugin_counter ||= 0
|
||||
@settings = settings
|
||||
@pipeline_id = @settings.get_value("pipeline.id") || self.object_id
|
||||
@reporter = PipelineReporter.new(@logger, self)
|
||||
|
||||
# A list of plugins indexed by id
|
||||
@plugins_by_id = {}
|
||||
@inputs = nil
|
||||
@filters = nil
|
||||
@outputs = nil
|
||||
|
||||
@worker_threads = []
|
||||
|
||||
# This needs to be configured before we evaluate the code to make
|
||||
# This needs to be configured before we call super which will evaluate the code to make
|
||||
# sure the metric instance is correctly send to the plugins to make the namespace scoping work
|
||||
@metric = if namespaced_metric
|
||||
settings.get("metric.collect") ? namespaced_metric : Instrument::NullMetric.new(namespaced_metric.collector)
|
||||
else
|
||||
Instrument::NullMetric.new
|
||||
end
|
||||
|
||||
grammar = LogStashConfigParser.new
|
||||
@config = grammar.parse(config_str)
|
||||
if @config.nil?
|
||||
raise ConfigurationError, grammar.failure_reason
|
||||
end
|
||||
# This will compile the config to ruby and evaluate the resulting code.
|
||||
# The code will initialize all the plugins and define the
|
||||
# filter and output methods.
|
||||
code = @config.compile
|
||||
@code = code
|
||||
|
||||
# The config code is hard to represent as a log message...
|
||||
# So just print it.
|
||||
|
||||
if @settings.get_value("config.debug") && @logger.debug?
|
||||
@logger.debug("Compiled pipeline code", :code => code)
|
||||
settings.get("metric.collect") ? namespaced_metric : Instrument::NullMetric.new(namespaced_metric.collector)
|
||||
else
|
||||
Instrument::NullMetric.new
|
||||
end
|
||||
|
||||
begin
|
||||
eval(code)
|
||||
rescue => e
|
||||
raise
|
||||
end
|
||||
@settings = settings
|
||||
@reporter = PipelineReporter.new(@logger, self)
|
||||
@worker_threads = []
|
||||
|
||||
super(config_str, settings)
|
||||
|
||||
@queue = LogStash::QueueFactory.create(settings)
|
||||
@input_queue_client = @queue.write_client
|
||||
|
@ -403,41 +447,6 @@ module LogStash; class Pipeline
|
|||
@outputs.each(&:do_close)
|
||||
end
|
||||
|
||||
def plugin(plugin_type, name, *args)
|
||||
@plugin_counter += 1
|
||||
|
||||
# Collapse the array of arguments into a single merged hash
|
||||
args = args.reduce({}, &:merge)
|
||||
|
||||
id = if args["id"].nil? || args["id"].empty?
|
||||
args["id"] = "#{@config_hash}-#{@plugin_counter}"
|
||||
else
|
||||
args["id"]
|
||||
end
|
||||
|
||||
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
|
||||
|
||||
pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins])
|
||||
|
||||
klass = Plugin.lookup(plugin_type, name)
|
||||
|
||||
# Scope plugins of type 'input' to 'inputs'
|
||||
type_scoped_metric = pipeline_scoped_metric.namespace("#{plugin_type}s".to_sym)
|
||||
plugin = if plugin_type == "output"
|
||||
OutputDelegator.new(@logger, klass, type_scoped_metric,
|
||||
OutputDelegatorStrategyRegistry.instance,
|
||||
args)
|
||||
elsif plugin_type == "filter"
|
||||
FilterDelegator.new(@logger, klass, type_scoped_metric, args)
|
||||
else # input
|
||||
input_plugin = klass.new(args)
|
||||
input_plugin.metric = type_scoped_metric.namespace(id)
|
||||
input_plugin
|
||||
end
|
||||
|
||||
@plugins_by_id[id] = plugin
|
||||
end
|
||||
|
||||
# for backward compatibility in devutils for the rspec helpers, this method is not used
|
||||
# in the pipeline anymore.
|
||||
def filter(event, &block)
|
||||
|
@ -518,14 +527,6 @@ module LogStash; class Pipeline
|
|||
.each {|t| t.delete("status") }
|
||||
end
|
||||
|
||||
def reloadable?
|
||||
non_reloadable_plugins.empty?
|
||||
end
|
||||
|
||||
def non_reloadable_plugins
|
||||
(inputs + filters + outputs).select { |plugin| !plugin.reloadable? }
|
||||
end
|
||||
|
||||
def collect_stats
|
||||
pipeline_metric = @metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
|
||||
pipeline_metric.gauge(:type, settings.get("queue.type"))
|
||||
|
|
|
@ -262,7 +262,7 @@ describe LogStash::Agent do
|
|||
context "when fetching a new state" do
|
||||
it "upgrades the state" do
|
||||
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
|
||||
expect(subject).to receive(:upgrade_pipeline).with(default_pipeline_id, kind_of(LogStash::Pipeline))
|
||||
expect(subject).to receive(:upgrade_pipeline).with(default_pipeline_id, kind_of(LogStash::Settings), second_pipeline_config)
|
||||
subject.reload_state!
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue