mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
support live reloading of multiple pipelines
NOTE: this is a backport from master * reload config from files * add reload config flag and allow reload through sighup * move signal handling to runner * add "-r" flag to detect changes in config and reload * make SIGHUP reload config file * make agent execute either loop to detect changes or sleep forever * sigint/sigterm shut down the agent.execute task * stop agent if pipeline terminates * Better shutdown logic for controller and pipeline * support multiple pipeline reloading and other cleanups * serialize upgrade_state and running_pipelines? * remove useless @thread from pipeline * remove agent registry and cleanup * reduce reload_interval from 5 to 3 * remove useless MissingAgentError class * fix exit codes for reload_state failures * explain why thread var exists in agent.rb * properly pass backtrace exceptions to oops * remove is_config_test from loader * better documentation on agent/runner/pipeline methods Fixes #4643
This commit is contained in:
parent
e9cd1682f9
commit
0099537868
12 changed files with 652 additions and 200 deletions
|
@ -3,12 +3,16 @@ require "clamp" # gem 'clamp'
|
|||
require "logstash/environment"
|
||||
require "logstash/errors"
|
||||
require "logstash/config/cpu_core_strategy"
|
||||
require "stud/trap"
|
||||
require "logstash/config/loader"
|
||||
require "uri"
|
||||
require "net/http"
|
||||
require "logstash/pipeline"
|
||||
LogStash::Environment.load_locale!
|
||||
|
||||
class LogStash::Agent < Clamp::Command
|
||||
|
||||
attr_reader :pipelines
|
||||
|
||||
DEFAULT_INPUT = "input { stdin { type => stdin } }"
|
||||
DEFAULT_OUTPUT = "output { stdout { codec => rubydebug } }"
|
||||
|
||||
|
@ -22,18 +26,18 @@ class LogStash::Agent < Clamp::Command
|
|||
:default => "", :attribute_name => :config_string
|
||||
|
||||
option ["-w", "--pipeline-workers"], "COUNT",
|
||||
I18n.t("logstash.runner.flag.pipeline-workers"),
|
||||
I18n.t("logstash.agent.flag.pipeline-workers"),
|
||||
:attribute_name => :pipeline_workers,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers]
|
||||
|
||||
|
||||
option ["-b", "--pipeline-batch-size"], "SIZE",
|
||||
I18n.t("logstash.runner.flag.pipeline-batch-size"),
|
||||
I18n.t("logstash.agent.flag.pipeline-batch-size"),
|
||||
:attribute_name => :pipeline_batch_size,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size]
|
||||
|
||||
option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
|
||||
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
|
||||
I18n.t("logstash.agent.flag.pipeline-batch-delay"),
|
||||
:attribute_name => :pipeline_batch_delay,
|
||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay]
|
||||
|
||||
|
@ -71,9 +75,25 @@ class LogStash::Agent < Clamp::Command
|
|||
:attribute_name => :unsafe_shutdown,
|
||||
:default => false
|
||||
|
||||
def initialize(*args)
|
||||
super(*args)
|
||||
@pipeline_settings ||= { :pipeline_id => "base" }
|
||||
option ["-r", "--[no-]auto-reload"], :flag,
|
||||
I18n.t("logstash.agent.flag.auto_reload"),
|
||||
:attribute_name => :auto_reload, :default => false
|
||||
|
||||
option ["--reload-interval"], "RELOAD_INTERVAL",
|
||||
I18n.t("logstash.agent.flag.reload_interval"),
|
||||
:attribute_name => :reload_interval, :default => 3, &:to_i
|
||||
|
||||
option ["-n", "--node-name"], "NAME",
|
||||
I18n.t("logstash.runner.flag.node_name"),
|
||||
:attribute_name => :node_name, :default => Socket.gethostname
|
||||
|
||||
def initialize(*params)
|
||||
super(*params)
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
@pipelines = {}
|
||||
@pipeline_settings ||= { :pipeline_id => "main" }
|
||||
@upgrade_mutex = Mutex.new
|
||||
@config_loader = LogStash::Config::Loader.new(@logger)
|
||||
end
|
||||
|
||||
def pipeline_workers=(pipeline_workers_value)
|
||||
|
@ -103,7 +123,6 @@ class LogStash::Agent < Clamp::Command
|
|||
raise LogStash::ConfigurationError, message
|
||||
end # def warn
|
||||
|
||||
# Emit a failure message and abort.
|
||||
def fail(message)
|
||||
raise LogStash::ConfigurationError, message
|
||||
end # def fail
|
||||
|
@ -114,7 +133,6 @@ class LogStash::Agent < Clamp::Command
|
|||
require "logstash/pipeline"
|
||||
require "cabin" # gem 'cabin'
|
||||
require "logstash/plugin"
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
|
||||
LogStash::ShutdownWatcher.unsafe_shutdown = unsafe_shutdown?
|
||||
LogStash::ShutdownWatcher.logger = @logger
|
||||
|
@ -140,70 +158,63 @@ class LogStash::Agent < Clamp::Command
|
|||
end
|
||||
|
||||
# You must specify a config_string or config_path
|
||||
if @config_string.nil? && @config_path.nil?
|
||||
fail(help + "\n" + I18n.t("logstash.agent.missing-configuration"))
|
||||
if config_string.nil? && config_path.nil?
|
||||
fail(I18n.t("logstash.agent.missing-configuration"))
|
||||
end
|
||||
|
||||
@config_string = @config_string.to_s
|
||||
|
||||
if @config_path
|
||||
# Append the config string.
|
||||
# This allows users to provide both -f and -e flags. The combination
|
||||
# is rare, but useful for debugging.
|
||||
@config_string = @config_string + load_config(@config_path)
|
||||
else
|
||||
# include a default stdin input if no inputs given
|
||||
if @config_string !~ /input *{/
|
||||
@config_string += DEFAULT_INPUT
|
||||
end
|
||||
# include a default stdout output if no outputs given
|
||||
if @config_string !~ /output *{/
|
||||
@config_string += DEFAULT_OUTPUT
|
||||
end
|
||||
if auto_reload? && config_path.nil?
|
||||
# there's nothing to reload
|
||||
fail(I18n.t("logstash.agent.reload-without-config-path"))
|
||||
end
|
||||
|
||||
|
||||
begin
|
||||
pipeline = LogStash::Pipeline.new(@config_string, @pipeline_settings)
|
||||
rescue LoadError => e
|
||||
fail("Configuration problem.")
|
||||
end
|
||||
|
||||
# Make SIGINT shutdown the pipeline.
|
||||
sigint_id = Stud::trap("INT") do
|
||||
|
||||
if @interrupted_once
|
||||
@logger.fatal(I18n.t("logstash.agent.forced_sigint"))
|
||||
exit
|
||||
else
|
||||
@logger.warn(I18n.t("logstash.agent.sigint"))
|
||||
Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) }
|
||||
@interrupted_once = true
|
||||
shutdown(pipeline)
|
||||
end
|
||||
end
|
||||
|
||||
# Make SIGTERM shutdown the pipeline.
|
||||
sigterm_id = Stud::trap("TERM") do
|
||||
@logger.warn(I18n.t("logstash.agent.sigterm"))
|
||||
shutdown(pipeline)
|
||||
end
|
||||
|
||||
Stud::trap("HUP") do
|
||||
@logger.info(I18n.t("logstash.agent.sighup"))
|
||||
configure_logging(log_file)
|
||||
end
|
||||
|
||||
# Stop now if we are only asking for a config test.
|
||||
if config_test?
|
||||
@logger.terminal "Configuration OK"
|
||||
return
|
||||
config_loader = LogStash::Config::Loader.new(@logger)
|
||||
config_str = config_loader.format_config(config_path, config_string)
|
||||
config_error = LogStash::Pipeline.config_valid?(config_str)
|
||||
if config_error == true
|
||||
@logger.terminal "Configuration OK"
|
||||
return 0
|
||||
else
|
||||
@logger.fatal I18n.t("logstash.error", :error => config_error)
|
||||
return 1
|
||||
end
|
||||
end
|
||||
|
||||
register_pipeline("main", @pipeline_settings.merge({
|
||||
:config_string => config_string,
|
||||
:config_path => config_path
|
||||
}))
|
||||
|
||||
sigint_id = trap_sigint()
|
||||
sigterm_id = trap_sigterm()
|
||||
sighup_id = trap_sighup()
|
||||
|
||||
@logger.unsubscribe(stdout_logs) if show_startup_errors
|
||||
|
||||
# TODO(sissel): Get pipeline completion status.
|
||||
pipeline.run
|
||||
@logger.info("starting agent")
|
||||
|
||||
start_pipelines
|
||||
|
||||
return 1 if clean_state?
|
||||
|
||||
@thread = Thread.current # this var is implicilty used by Stud.stop?
|
||||
|
||||
Stud.stoppable_sleep(reload_interval) # sleep before looping
|
||||
|
||||
if auto_reload?
|
||||
Stud.interval(reload_interval) { reload_state! }
|
||||
else
|
||||
while !Stud.stop?
|
||||
if clean_state? || running_pipelines?
|
||||
sleep 0.5
|
||||
else
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
shutdown
|
||||
|
||||
return 0
|
||||
rescue LogStash::ConfigurationError => e
|
||||
@logger.unsubscribe(stdout_logs) if show_startup_errors
|
||||
|
@ -220,45 +231,9 @@ class LogStash::Agent < Clamp::Command
|
|||
@log_fd.close if @log_fd
|
||||
Stud::untrap("INT", sigint_id) unless sigint_id.nil?
|
||||
Stud::untrap("TERM", sigterm_id) unless sigterm_id.nil?
|
||||
Stud::untrap("HUP", sighup_id) unless sighup_id.nil?
|
||||
end # def execute
|
||||
|
||||
def shutdown(pipeline)
|
||||
pipeline.shutdown do
|
||||
::LogStash::ShutdownWatcher.start(pipeline)
|
||||
end
|
||||
end
|
||||
|
||||
def show_version
|
||||
show_version_logstash
|
||||
|
||||
if [:info, :debug].include?(verbosity?) || debug? || verbose?
|
||||
show_version_ruby
|
||||
show_version_java if LogStash::Environment.jruby?
|
||||
show_gems if [:debug].include?(verbosity?) || debug?
|
||||
end
|
||||
end # def show_version
|
||||
|
||||
def show_version_logstash
|
||||
require "logstash/version"
|
||||
puts "logstash #{LOGSTASH_VERSION}"
|
||||
end # def show_version_logstash
|
||||
|
||||
def show_version_ruby
|
||||
puts RUBY_DESCRIPTION
|
||||
end # def show_version_ruby
|
||||
|
||||
def show_version_java
|
||||
properties = java.lang.System.getProperties
|
||||
puts "java #{properties["java.version"]} (#{properties["java.vendor"]})"
|
||||
puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}"
|
||||
end # def show_version_java
|
||||
|
||||
def show_gems
|
||||
require "rubygems"
|
||||
Gem::Specification.each do |spec|
|
||||
puts "gem #{spec.name} #{spec.version}"
|
||||
end
|
||||
end # def show_gems
|
||||
|
||||
# Do any start-time configuration.
|
||||
#
|
||||
|
@ -324,63 +299,195 @@ class LogStash::Agent < Clamp::Command
|
|||
end
|
||||
end
|
||||
|
||||
def load_config(path)
|
||||
begin
|
||||
uri = URI.parse(path)
|
||||
|
||||
case uri.scheme
|
||||
when nil then
|
||||
local_config(path)
|
||||
when /http/ then
|
||||
fetch_config(uri)
|
||||
when "file" then
|
||||
local_config(uri.path)
|
||||
## Signal Trapping ##
|
||||
def trap_sigint
|
||||
Stud::trap("INT") do
|
||||
if @interrupted_once
|
||||
@logger.fatal(I18n.t("logstash.agent.forced_sigint"))
|
||||
exit
|
||||
else
|
||||
fail(I18n.t("logstash.agent.configuration.scheme-not-supported", :path => path))
|
||||
@logger.warn(I18n.t("logstash.agent.sigint"))
|
||||
Thread.new(@logger) {|logger| sleep 5; logger.warn(I18n.t("logstash.agent.slow_shutdown")) }
|
||||
@interrupted_once = true
|
||||
Stud.stop!(@thread)
|
||||
end
|
||||
rescue URI::InvalidURIError
|
||||
# fallback for windows.
|
||||
# if the parsing of the file failed we assume we can reach it locally.
|
||||
# some relative path on windows arent parsed correctly (.\logstash.conf)
|
||||
local_config(path)
|
||||
end
|
||||
end
|
||||
|
||||
def local_config(path)
|
||||
path = File.expand_path(path)
|
||||
path = File.join(path, "*") if File.directory?(path)
|
||||
|
||||
if Dir.glob(path).length == 0
|
||||
fail(I18n.t("logstash.agent.configuration.file-not-found", :path => path))
|
||||
def trap_sigterm
|
||||
Stud::trap("TERM") do
|
||||
@logger.warn(I18n.t("logstash.agent.sigterm"))
|
||||
Stud.stop!(@thread)
|
||||
end
|
||||
end
|
||||
|
||||
config = ""
|
||||
encoding_issue_files = []
|
||||
Dir.glob(path).sort.each do |file|
|
||||
next unless File.file?(file)
|
||||
if file.match(/~$/)
|
||||
@logger.debug("NOT reading config file because it is a temp file", :file => file)
|
||||
next
|
||||
def trap_sighup
|
||||
Stud::trap("HUP") do
|
||||
@logger.warn(I18n.t("logstash.agent.sighup"))
|
||||
reload_state!
|
||||
end
|
||||
end
|
||||
|
||||
## Pipeline CRUD ##
|
||||
def shutdown(pipeline)
|
||||
pipeline.shutdown do
|
||||
::LogStash::ShutdownWatcher.start(pipeline)
|
||||
end
|
||||
end
|
||||
#
|
||||
# register_pipeline - adds a pipeline to the agent's state
|
||||
# @param pipeline_id [String] pipeline string identifier
|
||||
# @param settings [Hash] settings that will be passed when creating the pipeline.
|
||||
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
|
||||
def register_pipeline(pipeline_id, settings)
|
||||
pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id))
|
||||
return unless pipeline.is_a?(LogStash::Pipeline)
|
||||
@pipelines[pipeline_id] = pipeline
|
||||
end
|
||||
|
||||
def reload_state!
|
||||
@upgrade_mutex.synchronize do
|
||||
@pipelines.each do |pipeline_id, _|
|
||||
begin
|
||||
reload_pipeline!(pipeline_id)
|
||||
rescue => e
|
||||
@logger.error(I18n.t("oops"), :error => e, :backtrace => e.backtrace)
|
||||
end
|
||||
end
|
||||
@logger.debug("Reading config file", :file => file)
|
||||
cfg = File.read(file)
|
||||
if !cfg.ascii_only? && !cfg.valid_encoding?
|
||||
encoding_issue_files << file
|
||||
end
|
||||
config << cfg + "\n"
|
||||
end
|
||||
if (encoding_issue_files.any?)
|
||||
fail("The following config files contains non-ascii characters but are not UTF-8 encoded #{encoding_issue_files}")
|
||||
end
|
||||
return config
|
||||
end # def load_config
|
||||
end
|
||||
|
||||
def fetch_config(uri)
|
||||
def create_pipeline(settings)
|
||||
begin
|
||||
Net::HTTP.get(uri) + "\n"
|
||||
rescue Exception => e
|
||||
fail(I18n.t("logstash.agent.configuration.fetch-failed", :path => uri.to_s, :message => e.message))
|
||||
config = fetch_config(settings)
|
||||
rescue => e
|
||||
@logger.error("failed to fetch pipeline configuration", :message => e.message)
|
||||
return
|
||||
end
|
||||
|
||||
begin
|
||||
LogStash::Pipeline.new(config, settings)
|
||||
rescue => e
|
||||
@logger.error("fetched an invalid config", :config => config, :reason => e.message)
|
||||
return
|
||||
end
|
||||
end
|
||||
|
||||
def start_pipelines
|
||||
@pipelines.each { |id, _| start_pipeline(id) }
|
||||
end
|
||||
|
||||
def shutdown
|
||||
shutdown_pipelines
|
||||
end
|
||||
|
||||
def shutdown_pipelines
|
||||
@pipelines.each { |id, _| stop_pipeline(id) }
|
||||
end
|
||||
|
||||
def stop_pipeline(id)
|
||||
pipeline = @pipelines[id]
|
||||
return unless pipeline
|
||||
@logger.log("stopping pipeline", :id => id)
|
||||
pipeline.shutdown { LogStash::ShutdownWatcher.start(pipeline) }
|
||||
@pipelines[id].thread.join
|
||||
end
|
||||
|
||||
def running_pipelines?
|
||||
@upgrade_mutex.synchronize do
|
||||
@pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }.any?
|
||||
end
|
||||
end
|
||||
|
||||
def running_pipeline?(pipeline_id)
|
||||
thread = @pipelines[pipeline_id].thread
|
||||
thread.is_a?(Thread) && thread.alive?
|
||||
end
|
||||
|
||||
def upgrade_pipeline(pipeline_id, new_pipeline)
|
||||
stop_pipeline(pipeline_id)
|
||||
@pipelines[pipeline_id] = new_pipeline
|
||||
start_pipeline(pipeline_id)
|
||||
end
|
||||
|
||||
def clean_state?
|
||||
@pipelines.empty?
|
||||
end
|
||||
|
||||
# since this method modifies the @pipelines hash it is
|
||||
# wrapped in @upgrade_mutex in the parent call `reload_state!`
|
||||
def reload_pipeline!(id)
|
||||
old_pipeline = @pipelines[id]
|
||||
new_pipeline = create_pipeline(old_pipeline.original_settings)
|
||||
return if new_pipeline.nil?
|
||||
|
||||
if old_pipeline.config_str == new_pipeline.config_str
|
||||
@logger.debug("no configuration change for pipeline",
|
||||
:pipeline => id, :config => old_pipeline.config_str)
|
||||
else
|
||||
@logger.log("fetched new config for pipeline. upgrading..",
|
||||
:pipeline => id, :config => new_pipeline.config_str)
|
||||
upgrade_pipeline(id, new_pipeline)
|
||||
end
|
||||
end
|
||||
|
||||
def start_pipeline(id)
|
||||
pipeline = @pipelines[id]
|
||||
return unless pipeline.is_a?(LogStash::Pipeline)
|
||||
return if pipeline.ready?
|
||||
@logger.info("starting pipeline", :id => id)
|
||||
Thread.new do
|
||||
LogStash::Util.set_thread_name("pipeline.#{id}")
|
||||
begin
|
||||
pipeline.run
|
||||
rescue => e
|
||||
@logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace)
|
||||
end
|
||||
end
|
||||
sleep 0.01 until pipeline.ready?
|
||||
end
|
||||
|
||||
## Pipeline Aux methods ##
|
||||
def fetch_config(settings)
|
||||
@config_loader.format_config(settings[:config_path], settings[:config_string])
|
||||
end
|
||||
|
||||
private
|
||||
def node_uuid
|
||||
@node_uuid ||= SecureRandom.uuid
|
||||
end
|
||||
|
||||
### Version actions ###
|
||||
def show_version
|
||||
show_version_logstash
|
||||
|
||||
if [:info, :debug].include?(verbosity?) || debug? || verbose?
|
||||
show_version_ruby
|
||||
show_version_java if LogStash::Environment.jruby?
|
||||
show_gems if [:debug].include?(verbosity?) || debug?
|
||||
end
|
||||
end # def show_version
|
||||
|
||||
def show_version_logstash
|
||||
require "logstash/version"
|
||||
puts "logstash #{LOGSTASH_VERSION}"
|
||||
end # def show_version_logstash
|
||||
|
||||
def show_version_ruby
|
||||
puts RUBY_DESCRIPTION
|
||||
end # def show_version_ruby
|
||||
|
||||
def show_version_java
|
||||
properties = java.lang.System.getProperties
|
||||
puts "java #{properties["java.version"]} (#{properties["java.vendor"]})"
|
||||
puts "jvm #{properties["java.vm.name"]} / #{properties["java.vm.version"]}"
|
||||
end # def show_version_java
|
||||
|
||||
def show_gems
|
||||
require "rubygems"
|
||||
Gem::Specification.each do |spec|
|
||||
puts "gem #{spec.name} #{spec.version}"
|
||||
end
|
||||
end # def show_gems
|
||||
|
||||
end # class LogStash::Agent
|
||||
|
|
|
@ -6,6 +6,14 @@ module LogStash module Config module Defaults
|
|||
|
||||
extend self
|
||||
|
||||
def input
|
||||
"input { stdin { type => stdin } }"
|
||||
end
|
||||
|
||||
def output
|
||||
"output { stdout { codec => rubydebug } }"
|
||||
end
|
||||
|
||||
def cpu_cores
|
||||
Concurrent.processor_count
|
||||
end
|
||||
|
|
90
logstash-core/lib/logstash/config/loader.rb
Normal file
90
logstash-core/lib/logstash/config/loader.rb
Normal file
|
@ -0,0 +1,90 @@
|
|||
require "logstash/config/defaults"
|
||||
|
||||
module LogStash; module Config; class Loader
|
||||
def initialize(logger)
|
||||
@logger = logger
|
||||
end
|
||||
|
||||
def format_config(config_path, config_string)
|
||||
config_string = config_string.to_s
|
||||
if config_path
|
||||
# Append the config string.
|
||||
# This allows users to provide both -f and -e flags. The combination
|
||||
# is rare, but useful for debugging.
|
||||
config_string = config_string + load_config(config_path)
|
||||
else
|
||||
# include a default stdin input if no inputs given
|
||||
if config_string !~ /input *{/
|
||||
config_string += LogStash::Config::Defaults.input
|
||||
end
|
||||
# include a default stdout output if no outputs given
|
||||
if config_string !~ /output *{/
|
||||
config_string += LogStash::Config::Defaults.output
|
||||
end
|
||||
end
|
||||
config_string
|
||||
end
|
||||
|
||||
def load_config(path)
|
||||
begin
|
||||
uri = URI.parse(path)
|
||||
|
||||
case uri.scheme
|
||||
when nil then
|
||||
local_config(path)
|
||||
when /http/ then
|
||||
fetch_config(uri)
|
||||
when "file" then
|
||||
local_config(uri.path)
|
||||
else
|
||||
fail(I18n.t("logstash.runner.configuration.scheme-not-supported", :path => path))
|
||||
end
|
||||
rescue URI::InvalidURIError
|
||||
# fallback for windows.
|
||||
# if the parsing of the file failed we assume we can reach it locally.
|
||||
# some relative path on windows arent parsed correctly (.\logstash.conf)
|
||||
local_config(path)
|
||||
end
|
||||
end
|
||||
|
||||
def local_config(path)
|
||||
path = ::File.expand_path(path)
|
||||
path = ::File.join(path, "*") if ::File.directory?(path)
|
||||
|
||||
if Dir.glob(path).length == 0
|
||||
fail(I18n.t("logstash.runner.configuration.file-not-found", :path => path))
|
||||
end
|
||||
|
||||
config = ""
|
||||
encoding_issue_files = []
|
||||
Dir.glob(path).sort.each do |file|
|
||||
next unless ::File.file?(file)
|
||||
if file.match(/~$/)
|
||||
@logger.debug("NOT reading config file because it is a temp file", :config_file => file)
|
||||
next
|
||||
end
|
||||
@logger.debug("Reading config file", :config_file => file)
|
||||
cfg = ::File.read(file)
|
||||
if !cfg.ascii_only? && !cfg.valid_encoding?
|
||||
encoding_issue_files << file
|
||||
end
|
||||
config << cfg + "\n"
|
||||
@logger.debug? && @logger.debug("\nThe following is the content of a file", :config_file => file.to_s)
|
||||
@logger.debug? && @logger.debug("\n" + cfg + "\n\n")
|
||||
end
|
||||
if encoding_issue_files.any?
|
||||
fail("The following config files contains non-ascii characters but are not UTF-8 encoded #{encoding_issue_files}")
|
||||
end
|
||||
@logger.debug? && @logger.debug("\nThe following is the merged configuration")
|
||||
@logger.debug? && @logger.debug("\n" + config + "\n\n")
|
||||
return config
|
||||
end # def load_config
|
||||
|
||||
def fetch_config(uri)
|
||||
begin
|
||||
Net::HTTP.get(uri) + "\n"
|
||||
rescue Exception => e
|
||||
fail(I18n.t("logstash.runner.configuration.fetch-failed", :path => uri.to_s, :message => e.message))
|
||||
end
|
||||
end
|
||||
end end end
|
|
@ -17,7 +17,7 @@ require "logstash/pipeline_reporter"
|
|||
require "logstash/output_delegator"
|
||||
|
||||
module LogStash; class Pipeline
|
||||
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger
|
||||
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger, :thread, :config_str, :original_settings
|
||||
|
||||
DEFAULT_SETTINGS = {
|
||||
:default_pipeline_workers => LogStash::Config::CpuCoreStrategy.maximum,
|
||||
|
@ -28,9 +28,20 @@ module LogStash; class Pipeline
|
|||
}
|
||||
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
|
||||
|
||||
def self.validate_config(config_str, settings = {})
|
||||
begin
|
||||
# There should be a better way to test this
|
||||
self.new(config_str, settings)
|
||||
rescue => e
|
||||
e.message
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(config_str, settings = {})
|
||||
@pipeline_id = settings[:pipeline_id] || self.object_id
|
||||
@config_str = config_str
|
||||
@original_settings = settings
|
||||
@logger = Cabin::Channel.get(LogStash)
|
||||
@pipeline_id = settings[:pipeline_id] || self.object_id
|
||||
@settings = DEFAULT_SETTINGS.clone
|
||||
settings.each {|setting, value| configure(setting, value) }
|
||||
@reporter = LogStash::PipelineReporter.new(@logger, self)
|
||||
|
@ -117,13 +128,12 @@ module LogStash; class Pipeline
|
|||
end
|
||||
|
||||
def run
|
||||
LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager")
|
||||
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))
|
||||
@thread = Thread.current
|
||||
|
||||
start_workers
|
||||
|
||||
@logger.info("Pipeline started")
|
||||
@logger.terminal("Logstash startup completed")
|
||||
@logger.log("Pipeline #{@pipeline_id} started")
|
||||
|
||||
# Block until all inputs have stopped
|
||||
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread
|
||||
|
@ -138,8 +148,7 @@ module LogStash; class Pipeline
|
|||
shutdown_flusher
|
||||
shutdown_workers
|
||||
|
||||
@logger.info("Pipeline shutdown complete.")
|
||||
@logger.terminal("Logstash shutdown completed")
|
||||
@logger.log("Pipeline #{@pipeline_id} has been shutdown")
|
||||
|
||||
# exit code
|
||||
return 0
|
||||
|
|
|
@ -9,11 +9,15 @@ LogStash::Environment.load_locale!
|
|||
|
||||
require "logstash/namespace"
|
||||
require "logstash/program"
|
||||
require "logstash/config/defaults"
|
||||
|
||||
class LogStash::Runner
|
||||
include LogStash::Program
|
||||
|
||||
attr_reader :agent
|
||||
|
||||
def main(args)
|
||||
|
||||
require "logstash/util"
|
||||
require "logstash/util/java_version"
|
||||
require "stud/trap"
|
||||
|
@ -121,4 +125,5 @@ Available commands:
|
|||
def show_help(command)
|
||||
puts command.help
|
||||
end
|
||||
|
||||
end # class LogStash::Runner
|
||||
|
|
|
@ -34,8 +34,8 @@ module LogStash
|
|||
end
|
||||
|
||||
def self.start(pipeline, cycle_period=CHECK_EVERY, report_every=REPORT_EVERY, abort_threshold=ABORT_AFTER)
|
||||
watcher = self.new(pipeline, cycle_period, report_every, abort_threshold)
|
||||
Thread.new(watcher) { |watcher| watcher.start }
|
||||
controller = self.new(pipeline, cycle_period, report_every, abort_threshold)
|
||||
Thread.new(controller) { |controller| controller.start }
|
||||
end
|
||||
|
||||
def logger
|
||||
|
@ -47,6 +47,7 @@ module LogStash
|
|||
cycle_number = 0
|
||||
stalled_count = 0
|
||||
Stud.interval(@cycle_period) do
|
||||
break unless @pipeline.thread.alive?
|
||||
@reports << pipeline_report_snapshot
|
||||
@reports.delete_at(0) if @reports.size > @report_every # expire old report
|
||||
if cycle_number == (@report_every - 1) # it's report time!
|
||||
|
|
8
logstash-core/lib/logstash/special_agent.rb
Normal file
8
logstash-core/lib/logstash/special_agent.rb
Normal file
|
@ -0,0 +1,8 @@
|
|||
# encoding: utf-8
|
||||
require "logstash/agent"
|
||||
|
||||
class LogStash::SpecialAgent < LogStash::Agent
|
||||
def fetch_config(settings)
|
||||
Net::HTTP.get(settings["remote.url"])
|
||||
end
|
||||
end
|
|
@ -59,12 +59,14 @@ en:
|
|||
missing-configuration: >-
|
||||
No configuration file was specified. Perhaps you forgot to provide
|
||||
the '-f yourlogstash.conf' flag?
|
||||
reload-without-config-path: >-
|
||||
Configuration reloading also requires passing a configuration path with '-f yourlogstash.conf'
|
||||
error: >-
|
||||
Error: %{error}
|
||||
sigint: >-
|
||||
SIGINT received. Shutting down the pipeline.
|
||||
SIGINT received. Shutting down the agent.
|
||||
sigterm: >-
|
||||
SIGTERM received. Shutting down the pipeline.
|
||||
SIGTERM received. Shutting down the agent.
|
||||
slow_shutdown: |-
|
||||
Received shutdown signal, but pipeline is still waiting for in-flight events
|
||||
to be processed. Sending another ^C will force quit Logstash, but this may cause
|
||||
|
@ -163,6 +165,10 @@ en:
|
|||
pipeline-batch-delay: |+
|
||||
When creating pipeline batches, how long to wait while polling
|
||||
for the next event.
|
||||
auto_reload: |+
|
||||
Monitor configuration changes and reload
|
||||
whenever it is changed.
|
||||
NOTE: use SIGHUP to manually reload the config
|
||||
log: |+
|
||||
Write logstash internal logs to the given
|
||||
file. Without this flag, logstash will emit
|
||||
|
|
|
@ -1,41 +1,169 @@
|
|||
# encoding: utf-8
|
||||
require 'spec_helper'
|
||||
require 'stud/temporary'
|
||||
require 'stud/task'
|
||||
|
||||
describe LogStash::Agent do
|
||||
subject { LogStash::Agent.new('') }
|
||||
let(:dummy_config) { 'input {}' }
|
||||
|
||||
context "when loading the configuration" do
|
||||
context "when local" do
|
||||
before { expect(subject).to receive(:local_config).with(path) }
|
||||
let(:logger) { double("logger") }
|
||||
let(:agent_args) { [] }
|
||||
subject { LogStash::Agent.new("", "") }
|
||||
|
||||
context "unix" do
|
||||
let(:path) { './test.conf' }
|
||||
it 'works with relative path' do
|
||||
subject.load_config(path)
|
||||
end
|
||||
before :each do
|
||||
[:log, :info, :warn, :error, :fatal, :debug].each do |level|
|
||||
allow(logger).to receive(level)
|
||||
end
|
||||
[:info?, :warn?, :error?, :fatal?, :debug?].each do |level|
|
||||
allow(logger).to receive(level)
|
||||
end
|
||||
allow(logger).to receive(:level=)
|
||||
allow(logger).to receive(:subscribe)
|
||||
subject.parse(agent_args)
|
||||
subject.instance_variable_set("@reload_interval", 0.01)
|
||||
subject.instance_variable_set("@logger", logger)
|
||||
end
|
||||
|
||||
describe "register_pipeline" do
|
||||
let(:pipeline_id) { "main" }
|
||||
let(:settings) { {
|
||||
:config_string => "input { } filter { } output { }",
|
||||
:pipeline_workers => 4
|
||||
} }
|
||||
|
||||
it "should delegate settings to new pipeline" do
|
||||
expect(LogStash::Pipeline).to receive(:new).with(settings[:config_string], hash_including(settings))
|
||||
subject.register_pipeline(pipeline_id, settings)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#execute" do
|
||||
let(:sample_config) { "input { generator { count => 100000 } } output { }" }
|
||||
let(:config_file) { Stud::Temporary.pathname }
|
||||
|
||||
before :each do
|
||||
File.open(config_file, "w") {|f| f.puts sample_config }
|
||||
end
|
||||
|
||||
after :each do
|
||||
File.unlink(config_file)
|
||||
end
|
||||
|
||||
context "when auto_reload is false" do
|
||||
let(:agent_args) { [ "--config", config_file] } #reload_interval => 0.01, :config_path => } }
|
||||
|
||||
before :each do
|
||||
allow(subject).to receive(:sleep)
|
||||
allow(subject).to receive(:clean_state?).and_return(false)
|
||||
allow(subject).to receive(:running_pipelines?).and_return(true)
|
||||
end
|
||||
|
||||
context "windows" do
|
||||
let(:path) { '.\test.conf' }
|
||||
it 'work with relative windows path' do
|
||||
subject.load_config(path)
|
||||
context "if state is clean" do
|
||||
it "should not reload_state!" do
|
||||
expect(subject).to_not receive(:reload_state!)
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when remote" do
|
||||
context 'supported scheme' do
|
||||
let(:path) { "http://test.local/superconfig.conf" }
|
||||
|
||||
before { expect(Net::HTTP).to receive(:get) { dummy_config } }
|
||||
it 'works with http' do
|
||||
expect(subject.load_config(path)).to eq("#{dummy_config}\n")
|
||||
context "when auto_reload is true" do
|
||||
let(:agent_args) { [ "--auto-reload", "--config", config_file] } #reload_interval => 0.01, :config_path => } }
|
||||
#let(:agent_args) { { :logger => logger, :auto_reload => false, :reload_interval => 0.01, :config_path => config_file } }
|
||||
context "if state is clean" do
|
||||
it "should periodically reload_state" do
|
||||
allow(subject).to receive(:clean_state?).and_return(false)
|
||||
expect(subject).to receive(:reload_state!).at_least(3).times
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#reload_state!" do
|
||||
let(:pipeline_id) { "main" }
|
||||
let(:first_pipeline_config) { "input { } filter { } output { }" }
|
||||
let(:second_pipeline_config) { "input { generator {} } filter { } output { }" }
|
||||
let(:pipeline_settings) { {
|
||||
:config_string => first_pipeline_config,
|
||||
:pipeline_workers => 4
|
||||
} }
|
||||
|
||||
before(:each) do
|
||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
||||
end
|
||||
|
||||
context "when fetching a new state" do
|
||||
it "upgrades the state" do
|
||||
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
|
||||
expect(subject).to receive(:upgrade_pipeline).with(pipeline_id, kind_of(LogStash::Pipeline))
|
||||
subject.send(:reload_state!)
|
||||
end
|
||||
end
|
||||
context "when fetching the same state" do
|
||||
it "doesn't upgrade the state" do
|
||||
expect(subject).to receive(:fetch_config).and_return(first_pipeline_config)
|
||||
expect(subject).to_not receive(:upgrade_pipeline)
|
||||
subject.send(:reload_state!)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "#upgrade_pipeline" do
|
||||
let(:pipeline_id) { "main" }
|
||||
let(:pipeline_config) { "input { } filter { } output { }" }
|
||||
let(:pipeline_settings) { {
|
||||
:config_string => pipeline_config,
|
||||
:pipeline_workers => 4
|
||||
} }
|
||||
let(:new_pipeline_config) { "input { generator {} } output { }" }
|
||||
|
||||
before(:each) do
|
||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
||||
end
|
||||
|
||||
context "when the upgrade fails" do
|
||||
before :each do
|
||||
allow(subject).to receive(:fetch_config).and_return(new_pipeline_config)
|
||||
allow(subject).to receive(:create_pipeline).and_return(nil)
|
||||
allow(subject).to receive(:stop_pipeline)
|
||||
end
|
||||
|
||||
it "leaves the state untouched" do
|
||||
subject.send(:reload_state!)
|
||||
expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config)
|
||||
end
|
||||
|
||||
context "and current state is empty" do
|
||||
it "should not start a pipeline" do
|
||||
expect(subject).to_not receive(:start_pipeline)
|
||||
subject.send(:reload_state!)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when the upgrade succeeds" do
|
||||
let(:new_config) { "input { generator { count => 1 } } output { }" }
|
||||
before :each do
|
||||
allow(subject).to receive(:fetch_config).and_return(new_config)
|
||||
allow(subject).to receive(:stop_pipeline)
|
||||
end
|
||||
it "updates the state" do
|
||||
subject.send(:reload_state!)
|
||||
expect(subject.pipelines[pipeline_id].config_str).to eq(new_config)
|
||||
end
|
||||
it "starts the pipeline" do
|
||||
expect(subject).to receive(:stop_pipeline)
|
||||
expect(subject).to receive(:start_pipeline)
|
||||
subject.send(:reload_state!)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "--pluginpath" do
|
||||
let(:single_path) { "/some/path" }
|
||||
let(:multiple_paths) { ["/some/path1", "/some/path2"] }
|
||||
|
@ -58,5 +186,67 @@ describe LogStash::Agent do
|
|||
subject.configure_plugin_paths(multiple_paths)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#fetch_config" do
|
||||
let(:file_config) { "input { generator { count => 100 } } output { }" }
|
||||
let(:cli_config) { "filter { drop { } } " }
|
||||
let(:tmp_config_path) { Stud::Temporary.pathname }
|
||||
let(:agent_args) { [ "-e", "filter { drop { } } ", "-f", tmp_config_path ] }
|
||||
|
||||
before :each do
|
||||
IO.write(tmp_config_path, file_config)
|
||||
end
|
||||
|
||||
after :each do
|
||||
File.unlink(tmp_config_path)
|
||||
end
|
||||
|
||||
it "should join the config string and config path content" do
|
||||
settings = { :config_path => tmp_config_path, :config_string => cli_config }
|
||||
fetched_config = subject.send(:fetch_config, settings)
|
||||
expect(fetched_config.strip).to eq(cli_config + IO.read(tmp_config_path))
|
||||
end
|
||||
end
|
||||
|
||||
context "--pluginpath" do
|
||||
let(:single_path) { "/some/path" }
|
||||
let(:multiple_paths) { ["/some/path1", "/some/path2"] }
|
||||
|
||||
it "should fail with single invalid dir path" do
|
||||
expect(File).to receive(:directory?).and_return(false)
|
||||
expect(LogStash::Environment).not_to receive(:add_plugin_path)
|
||||
expect{subject.configure_plugin_paths(single_path)}.to raise_error(LogStash::ConfigurationError)
|
||||
end
|
||||
end
|
||||
|
||||
describe "pipeline settings" do
|
||||
let(:pipeline_string) { "input { stdin {} } output { stdout {} }" }
|
||||
let(:main_pipeline_settings) { { :pipeline_id => "main" } }
|
||||
let(:pipeline) { double("pipeline") }
|
||||
|
||||
before(:each) do
|
||||
task = Stud::Task.new { 1 }
|
||||
allow(pipeline).to receive(:run).and_return(task)
|
||||
allow(pipeline).to receive(:shutdown)
|
||||
end
|
||||
|
||||
context "when :pipeline_workers is not defined by the user" do
|
||||
it "should not pass the value to the pipeline" do
|
||||
expect(LogStash::Pipeline).to receive(:new).once.with(pipeline_string, hash_excluding(:pipeline_workers)).and_return(pipeline)
|
||||
args = ["-e", pipeline_string]
|
||||
subject.run(args)
|
||||
end
|
||||
end
|
||||
|
||||
context "when :pipeline_workers is defined by the user" do
|
||||
it "should pass the value to the pipeline" do
|
||||
main_pipeline_settings[:pipeline_workers] = 2
|
||||
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, hash_including(main_pipeline_settings)).and_return(pipeline)
|
||||
args = ["-w", "2", "-e", pipeline_string]
|
||||
subject.run(args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
|
36
logstash-core/spec/logstash/config/loader_spec.rb
Normal file
36
logstash-core/spec/logstash/config/loader_spec.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
# encoding: utf-8
|
||||
require "spec_helper"
|
||||
require "logstash/config/loader"
|
||||
|
||||
describe LogStash::Config::Loader do
|
||||
subject { described_class.new(Cabin::Channel.get) }
|
||||
context "when local" do
|
||||
before { expect(subject).to receive(:local_config).with(path) }
|
||||
|
||||
context "unix" do
|
||||
let(:path) { './test.conf' }
|
||||
it 'works with relative path' do
|
||||
subject.load_config(path)
|
||||
end
|
||||
end
|
||||
|
||||
context "windows" do
|
||||
let(:path) { '.\test.conf' }
|
||||
it 'work with relative windows path' do
|
||||
subject.load_config(path)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "when remote" do
|
||||
context 'supported scheme' do
|
||||
let(:path) { "http://test.local/superconfig.conf" }
|
||||
let(:dummy_config) { 'input {}' }
|
||||
|
||||
before { expect(Net::HTTP).to receive(:get) { dummy_config } }
|
||||
it 'works with http' do
|
||||
expect(subject.load_config(path)).to eq("#{dummy_config}\n")
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -10,6 +10,13 @@ end
|
|||
|
||||
describe LogStash::Runner do
|
||||
|
||||
let(:channel) { Cabin::Channel.new }
|
||||
|
||||
before :each do
|
||||
allow(Cabin::Channel).to receive(:get).with(LogStash).and_return(channel)
|
||||
end
|
||||
|
||||
|
||||
context "argument parsing" do
|
||||
it "should run agent" do
|
||||
expect(Stud::Task).to receive(:new).once.and_return(nil)
|
||||
|
@ -38,31 +45,15 @@ describe LogStash::Runner do
|
|||
end
|
||||
end
|
||||
|
||||
describe "pipeline settings" do
|
||||
let(:pipeline_string) { "input { stdin {} } output { stdout {} }" }
|
||||
let(:base_pipeline_settings) { { :pipeline_id => "base" } }
|
||||
let(:pipeline) { double("pipeline") }
|
||||
context "--auto-reload" do
|
||||
context "when -f is not given" do
|
||||
|
||||
before(:each) do
|
||||
task = Stud::Task.new { 1 }
|
||||
allow(pipeline).to receive(:run).and_return(task)
|
||||
end
|
||||
let(:args) { ["agent", "-r", "-e", "input {} output {}"] }
|
||||
|
||||
context "when pipeline workers is not defined by the user" do
|
||||
it "should not pass the value to the pipeline" do
|
||||
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline)
|
||||
args = ["agent", "-e", pipeline_string]
|
||||
subject.run(args).wait
|
||||
end
|
||||
end
|
||||
|
||||
context "when pipeline workers is defined by the user" do
|
||||
it "should pass the value to the pipeline" do
|
||||
base_pipeline_settings[:pipeline_workers] = 2
|
||||
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline)
|
||||
args = ["agent", "-w", "2", "-e", pipeline_string]
|
||||
subject.run(args).wait
|
||||
it "should exit immediately" do
|
||||
expect(subject.run(args).wait).to eq(1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -17,6 +17,7 @@ describe LogStash::ShutdownWatcher do
|
|||
LogStash::ShutdownWatcher.logger = channel
|
||||
|
||||
allow(pipeline).to receive(:reporter).and_return(reporter)
|
||||
allow(pipeline).to receive(:thread).and_return(Thread.current)
|
||||
allow(reporter).to receive(:snapshot).and_return(reporter_snapshot)
|
||||
allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue