diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index a802371a1..b8ec438be 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -141,7 +141,7 @@ module LogStash; class Pipeline < BasePipeline begin @queue = LogStash::QueueFactory.create(settings) rescue => e - @logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace) + @logger.error("Logstash failed to create queue", "exception" => e.message, "backtrace" => e.backtrace) raise e end @@ -250,12 +250,32 @@ module LogStash; class Pipeline < BasePipeline @running.false? end + # register_plugin simply calls the plugin #register method and catches & logs any error + # @param plugin [Plugin] the plugin to register + # @return [Plugin] the registered plugin + def register_plugin(plugin) + plugin.register + plugin + rescue => e + @logger.error("Error registering plugin", :plugin => plugin.inspect, :error => e.message) + raise e + end + + # register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins + # @param plugins [Array[Plugin]] the list of plugins to register + def register_plugins(plugins) + registered = [] + plugins.each { |plugin| registered << register_plugin(plugin) } + rescue => e + registered.each(&:do_close) + raise e + end + def start_workers @worker_threads.clear # In case we're restarting the pipeline begin - start_inputs - @outputs.each {|o| o.register } - @filters.each {|f| f.register } + register_plugins(@outputs) + register_plugins(@filters) pipeline_workers = safe_pipeline_worker_count batch_size = @settings.get("pipeline.batch.size") @@ -286,6 +306,16 @@ module LogStash; class Pipeline < BasePipeline worker_loop(batch_size, batch_delay) end end + + # inputs should be started last, after all workers + begin + start_inputs + rescue => e + # if there is any exception in starting inputs, make sure we shutdown workers. + # exception will already by logged in start_inputs + shutdown_workers + raise e + end ensure # it is important to guarantee @ready to be true after the startup sequence has been completed # to potentially unblock the shutdown method which may be waiting on @ready to proceed @@ -335,7 +365,7 @@ module LogStash; class Pipeline < BasePipeline # Users need to check their configuration or see if there is a bug in the # plugin. @logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", - "exception" => e, "backtrace" => e.backtrace) + "exception" => e.message, "backtrace" => e.backtrace) raise e end @@ -378,10 +408,11 @@ module LogStash; class Pipeline < BasePipeline end @inputs += moreinputs - @inputs.each do |input| - input.register - start_input(input) - end + # first make sure we can register all input plugins + register_plugins(@inputs) + + # then after all input plugins are sucessfully registered, start them + @inputs.each { |input| start_input(input) } end def start_input(plugin) @@ -395,7 +426,7 @@ module LogStash; class Pipeline < BasePipeline rescue => e if plugin.stop? @logger.debug("Input plugin raised exception during shutdown, ignoring it.", - :plugin => plugin.class.config_name, :exception => e, + :plugin => plugin.class.config_name, :exception => e.message, :backtrace => e.backtrace) return end @@ -403,12 +434,12 @@ module LogStash; class Pipeline < BasePipeline # otherwise, report error and restart if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", - :plugin => plugin.inspect, :error => e.to_s, + :plugin => plugin.inspect, :error => e.message, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) else @logger.error(I18n.t("logstash.pipeline.worker-error", - :plugin => plugin.inspect, :error => e)) + :plugin => plugin.inspect, :error => e.message)) end # Assuming the failure that caused this exception is transient,