cleanup partially initialized pipeline

DRY the plugin registration

typo

log exception message
This commit is contained in:
Colin Surprenant 2017-02-10 18:23:09 -05:00
parent e784f0c8aa
commit 3885fc5a9a

View file

@ -141,7 +141,7 @@ module LogStash; class Pipeline < BasePipeline
begin begin
@queue = LogStash::QueueFactory.create(settings) @queue = LogStash::QueueFactory.create(settings)
rescue => e rescue => e
@logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace) @logger.error("Logstash failed to create queue", "exception" => e.message, "backtrace" => e.backtrace)
raise e raise e
end end
@ -250,12 +250,32 @@ module LogStash; class Pipeline < BasePipeline
@running.false? @running.false?
end end
# register_plugin simply calls the plugin #register method and catches & logs any error
# @param plugin [Plugin] the plugin to register
# @return [Plugin] the registered plugin
def register_plugin(plugin)
plugin.register
plugin
rescue => e
@logger.error("Error registering plugin", :plugin => plugin.inspect, :error => e.message)
raise e
end
# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
registered = []
plugins.each { |plugin| registered << register_plugin(plugin) }
rescue => e
registered.each(&:do_close)
raise e
end
def start_workers def start_workers
@worker_threads.clear # In case we're restarting the pipeline @worker_threads.clear # In case we're restarting the pipeline
begin begin
start_inputs register_plugins(@outputs)
@outputs.each {|o| o.register } register_plugins(@filters)
@filters.each {|f| f.register }
pipeline_workers = safe_pipeline_worker_count pipeline_workers = safe_pipeline_worker_count
batch_size = @settings.get("pipeline.batch.size") batch_size = @settings.get("pipeline.batch.size")
@ -286,6 +306,16 @@ module LogStash; class Pipeline < BasePipeline
worker_loop(batch_size, batch_delay) worker_loop(batch_size, batch_delay)
end end
end end
# inputs should be started last, after all workers
begin
start_inputs
rescue => e
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
shutdown_workers
raise e
end
ensure ensure
# it is important to guarantee @ready to be true after the startup sequence has been completed # it is important to guarantee @ready to be true after the startup sequence has been completed
# to potentially unblock the shutdown method which may be waiting on @ready to proceed # to potentially unblock the shutdown method which may be waiting on @ready to proceed
@ -335,7 +365,7 @@ module LogStash; class Pipeline < BasePipeline
# Users need to check their configuration or see if there is a bug in the # Users need to check their configuration or see if there is a bug in the
# plugin. # plugin.
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", @logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
"exception" => e, "backtrace" => e.backtrace) "exception" => e.message, "backtrace" => e.backtrace)
raise e raise e
end end
@ -378,10 +408,11 @@ module LogStash; class Pipeline < BasePipeline
end end
@inputs += moreinputs @inputs += moreinputs
@inputs.each do |input| # first make sure we can register all input plugins
input.register register_plugins(@inputs)
start_input(input)
end # then after all input plugins are sucessfully registered, start them
@inputs.each { |input| start_input(input) }
end end
def start_input(plugin) def start_input(plugin)
@ -395,7 +426,7 @@ module LogStash; class Pipeline < BasePipeline
rescue => e rescue => e
if plugin.stop? if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.", @logger.debug("Input plugin raised exception during shutdown, ignoring it.",
:plugin => plugin.class.config_name, :exception => e, :plugin => plugin.class.config_name, :exception => e.message,
:backtrace => e.backtrace) :backtrace => e.backtrace)
return return
end end
@ -403,12 +434,12 @@ module LogStash; class Pipeline < BasePipeline
# otherwise, report error and restart # otherwise, report error and restart
if @logger.debug? if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug", @logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e.to_s, :plugin => plugin.inspect, :error => e.message,
:exception => e.class, :exception => e.class,
:stacktrace => e.backtrace.join("\n"))) :stacktrace => e.backtrace.join("\n")))
else else
@logger.error(I18n.t("logstash.pipeline.worker-error", @logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e)) :plugin => plugin.inspect, :error => e.message))
end end
# Assuming the failure that caused this exception is transient, # Assuming the failure that caused this exception is transient,