trigger input plugin shutdown using out of band call

Currently, during shutdown, the pipeline loops through input workers
and calls Thread.raise on each input thread. Plugins rescue that
exception to exit the `run` loop. Afterwards, `teardown` is called for
any additional bookkeeping.

This proposal exposes a `stop` call on the input plugin class to alert
an input that it should shutdown. It is the plugin job to frequently
poll an internal `stop?` method to know if it's time to exit and
terminate if so.

The `teardown` method remains to do the additional bookkeeping, and it
will be called by the inputworker when `run` terminates.

add concurrent-ruby as logstash-core dependency

make Plugins::Input#stop? public

In some scenarios it's necessary for the inputworker or the pipeline
to know that the plugin is in a shutdown mode

document why inputworker sleeps on StandardError

better debug message when input plugin raises exception during shutdown
This commit is contained in:
Joao Duarte 2015-08-28 10:51:01 +01:00 committed by Colin Surprenant
parent 186806d64c
commit 3bdac41a6c
4 changed files with 37 additions and 100 deletions

View file

@ -67,6 +67,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
def initialize(params={})
super
@threadable = false
@stop_called = Concurrent::AtomicBoolean.new(false)
config_init(params)
@tags ||= []
@ -100,6 +101,20 @@ class LogStash::Inputs::Base < LogStash::Plugin
@tags << newtag
end # def tag
# if you override stop, don't forget to call super
# as the first action
public
def stop
@logger.debug("stopping", :plugin => self)
@stop_called.make_true
end
# stop? should never be overriden
public
def stop?
@stop_called.value
end
protected
def to_event(raw, source)
raise LogStash::ThisMethodWasRemoved("LogStash::Inputs::Base#to_event - you should use codecs now instead of to_event. Not sure what this means? Get help on https://discuss.elastic.co/c/logstash")

View file

@ -175,9 +175,16 @@ class LogStash::Pipeline
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
begin
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
# ignore and quit
rescue => e
# if plugin is stopping, ignore uncatched exceptions and exit worker
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
:plugin => plugin.class.config_name, :exception => e,
:backtrace => e.backtrace)
return
end
# otherwise, report error and restart
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e.to_s,
@ -187,23 +194,13 @@ class LogStash::Pipeline
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
end
puts e.backtrace if @logger.debug?
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
sleep 1
retry
end
ensure
begin
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
rescue LogStash::ShutdownSignal
# teardown could receive the ShutdownSignal, retry it
# Assuming the failure that caused this exception is transient,
# let's sleep for a bit and execute #run again
sleep(1)
retry
ensure
plugin.teardown
end
end # def inputworker
@ -242,8 +239,8 @@ class LogStash::Pipeline
event = @filter_to_output.pop
break if event == LogStash::SHUTDOWN
output_func(event)
end # while true
end
ensure
@outputs.each do |output|
output.worker_plugins.each(&:teardown)
end
@ -255,21 +252,6 @@ class LogStash::Pipeline
def shutdown
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs)
@input_threads.each do |thread|
# Interrupt all inputs
@logger.info("Sending shutdown signal to input thread", :thread => thread)
# synchronize both ShutdownSignal and teardown below. by synchronizing both
# we will avoid potentially sending a shutdown signal when the inputworker is
# executing the teardown method.
@run_mutex.synchronize do
thread.raise(LogStash::ShutdownSignal)
begin
thread.wakeup # in case it's in blocked IO or sleeping
rescue ThreadError
end
end
end
# sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly
@inputs.each do |input|
@ -277,15 +259,12 @@ class LogStash::Pipeline
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{input.teardown}
@run_mutex.synchronize{input.stop}
rescue LogStash::ShutdownSignal
# teardown could receive the ShutdownSignal, retry it
retry
end
end
# No need to send the ShutdownEvent to the filters/outputs nor to wait for
# the inputs to finish, because in the #run method we wait for that anyway.
end # def shutdown
def plugin(plugin_type, name, *args)

View file

@ -3,6 +3,7 @@ require "logstash/namespace"
require "logstash/logging"
require "logstash/config/mixin"
require "cabin"
require "concurrent"
class LogStash::Plugin
attr_accessor :params
@ -27,72 +28,14 @@ class LogStash::Plugin
@logger = Cabin::Channel.get(LogStash)
end
# This method is called when someone or something wants this plugin to shut
# down. When you successfully shutdown, you must call 'finished'
# You must also call 'super' in any subclasses.
public
def shutdown(queue)
# By default, shutdown is assumed a no-op for all plugins.
# If you need to take special efforts to shutdown (like waiting for
# an operation to complete, etc)
teardown
@logger.info("Received shutdown signal", :plugin => self)
@shutdown_queue = queue
if @plugin_state == :finished
finished
else
@plugin_state = :terminating
end
end # def shutdown
# You should call this method when you (the plugin) are done with work
# forever.
public
def finished
# TODO(sissel): I'm not sure what I had planned for this shutdown_queue
# thing
if @shutdown_queue
@logger.info("Sending shutdown event to agent queue", :plugin => self)
@shutdown_queue << self
end
if @plugin_state != :finished
@logger.info("Plugin is finished", :plugin => self)
@plugin_state = :finished
end
end # def finished
# Subclasses should implement this teardown method if you need to perform any
# special tasks during shutdown (like flushing, etc.)
# if you override teardown, don't forget to call super
public
def teardown
# nothing by default
finished
@logger.debug("closing", :plugin => self)
end
# This method is called when a SIGHUP triggers a reload operation
public
def reload
# Do nothing by default
end
public
def finished?
return @plugin_state == :finished
end # def finished?
public
def running?
return @plugin_state != :finished
end # def finished?
public
def terminating?
return @plugin_state == :terminating
end # def terminating?
public
def to_s
return "#{self.class.name}: #{@params}"
end

View file

@ -23,7 +23,7 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency "clamp", "~> 0.6.5" #(MIT license) for command line args/flags
gem.add_runtime_dependency "filesize", "0.0.4" #(MIT license) for :bytes config validator
gem.add_runtime_dependency "gems", "~> 0.8.3" #(MIT license)
gem.add_runtime_dependency "concurrent-ruby", "0.9.1"
gem.add_runtime_dependency "concurrent-ruby", "~> 0.9.1"
# TODO(sissel): Treetop 1.5.x doesn't seem to work well, but I haven't
# investigated what the cause might be. -Jordan