add before_stop shutdown hook and refactor inflight reporter to use it in agent

Fixes #4024
This commit is contained in:
Colin Surprenant 2015-10-13 18:07:23 -04:00 committed by Jordan Sissel
parent e2e717c99e
commit b21331bbcd
2 changed files with 22 additions and 9 deletions

View file

@ -128,14 +128,14 @@ class LogStash::Agent < Clamp::Command
@logger.warn(I18n.t("logstash.agent.sigint"))
Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) }
@interrupted_once = true
pipeline.shutdown
shutdown(pipeline)
end
end
# Make SIGTERM shutdown the pipeline.
sigterm_id = Stud::trap("TERM") do
@logger.warn(I18n.t("logstash.agent.sigterm"))
pipeline.shutdown
shutdown(pipeline)
end
Stud::trap("HUP") do
@ -174,6 +174,13 @@ class LogStash::Agent < Clamp::Command
Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil?
end # def execute
def shutdown(pipeline)
pipeline.shutdown do
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(pipeline.input_to_filter, pipeline.filter_to_output, pipeline.outputs)
end
end
def show_version
show_version_logstash

View file

@ -14,15 +14,20 @@ require "logstash/config/cpu_core_strategy"
require "logstash/util/defaults_printer"
class LogStash::Pipeline
attr_reader :inputs, :filters, :outputs, :input_to_filter, :filter_to_output
def initialize(configstr)
@logger = Cabin::Channel.get(LogStash)
@inputs = nil
@filters = nil
@outputs = nil
grammar = LogStashConfigParser.new
@config = grammar.parse(configstr)
if @config.nil?
raise LogStash::ConfigurationError, grammar.failure_reason
end
# This will compile the config to ruby and evaluate the resulting code.
# The code will initialize all the plugins and define the
# filter and output methods.
@ -39,6 +44,7 @@ class LogStash::Pipeline
@input_to_filter = SizedQueue.new(20)
# if no filters, pipe inputs directly to outputs
@filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter
@settings = {
"filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent
}
@ -222,9 +228,9 @@ class LogStash::Pipeline
end
end
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code.
# But if an exception is raised up to the worker thread their are mostly
# fatal and logstash can't recover from this situation.
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered
# fatal and logstash will not recover from this situation.
#
# Users need to check their configuration or see if there is a bug in the
# plugin.
@ -253,7 +259,8 @@ class LogStash::Pipeline
# initiate the pipeline shutdown sequence
# this method is intended to be called from outside the pipeline thread
def shutdown
# @param before_stop [Proc] code block called before performing stop operation on input plugins
def shutdown(&before_stop)
# shutdown can only start once the pipeline has completed its startup.
# avoid potential race conditoon between the startup sequence and this
# shutdown method which can be called from another thread at any time
@ -261,8 +268,7 @@ class LogStash::Pipeline
# TODO: should we also check against calling shutdown multiple times concurently?
InflightEventsReporter.logger = @logger
InflightEventsReporter.start(@input_to_filter, @filter_to_output, @outputs)
before_stop.call if block_given?
@inputs.each(&:do_stop)
end # def shutdown