mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
several improvements to logging messaging and ordering
* remove unused close_pipelines agent code Fixes #7956
This commit is contained in:
parent
703dfca10e
commit
a95bf6716e
7 changed files with 31 additions and 59 deletions
|
@ -81,7 +81,7 @@ class LogStash::Agent
|
|||
|
||||
def execute
|
||||
@thread = Thread.current # this var is implicitly used by Stud.stop?
|
||||
logger.debug("starting agent")
|
||||
logger.debug("Starting agent")
|
||||
|
||||
start_webserver
|
||||
|
||||
|
@ -272,24 +272,6 @@ class LogStash::Agent
|
|||
end
|
||||
end
|
||||
|
||||
def close_pipeline(id)
|
||||
with_pipelines do |pipelines|
|
||||
pipeline = pipelines[id]
|
||||
if pipeline
|
||||
@logger.warn("closing pipeline", :id => id)
|
||||
pipeline.close
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def close_pipelines
|
||||
with_pipelines do |pipelines|
|
||||
pipelines.each do |id, _|
|
||||
close_pipeline(id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def transition_to_stopped
|
||||
@running.make_false
|
||||
|
@ -310,12 +292,10 @@ class LogStash::Agent
|
|||
# for other tasks.
|
||||
#
|
||||
def converge_state(pipeline_actions)
|
||||
logger.debug("Converging pipelines")
|
||||
logger.debug("Converging pipelines state", :actions_count => pipeline_actions.size)
|
||||
|
||||
converge_result = LogStash::ConvergeResult.new(pipeline_actions.size)
|
||||
|
||||
logger.debug("Needed actions to converge", :actions_count => pipeline_actions.size) unless pipeline_actions.empty?
|
||||
|
||||
pipeline_actions.each do |action|
|
||||
# We execute every task we need to converge the current state of pipelines
|
||||
# for every task we will record the action result, that will help us
|
||||
|
@ -409,7 +389,7 @@ class LogStash::Agent
|
|||
@collector = LogStash::Instrument::Collector.new
|
||||
|
||||
@metric = if collect_metrics?
|
||||
@logger.debug("Agent: Configuring metric collection")
|
||||
@logger.debug("Setting up metric collection")
|
||||
LogStash::Instrument::Metric.new(@collector)
|
||||
else
|
||||
LogStash::Instrument::NullMetric.new(@collector)
|
||||
|
|
|
@ -31,9 +31,11 @@ module LogStash module Config module Source
|
|||
end
|
||||
|
||||
def match?
|
||||
if modules_cli? || modules? || config_string? || config_path?
|
||||
return false
|
||||
end
|
||||
detect_pipelines if !@detect_pipelines_called
|
||||
# see basic settings predicates and getters defined in the base class
|
||||
return !(invalid_pipelines_detected? || modules_cli? || modules? || config_string? || config_path?)
|
||||
return !(invalid_pipelines_detected?)
|
||||
end
|
||||
|
||||
def invalid_pipelines_detected?
|
||||
|
@ -41,10 +43,10 @@ module LogStash module Config module Source
|
|||
end
|
||||
|
||||
def config_conflict?
|
||||
detect_pipelines if !@detect_pipelines_called
|
||||
@conflict_messages.clear
|
||||
# are there any auto-reload conflicts?
|
||||
if !(modules_cli? || modules? || config_string? || config_path?)
|
||||
detect_pipelines if !@detect_pipelines_called
|
||||
if @detected_marker.nil?
|
||||
@conflict_messages << I18n.t("logstash.runner.config-pipelines-failed-read", :path => pipelines_yaml_location)
|
||||
elsif @detected_marker == false
|
||||
|
|
|
@ -84,7 +84,7 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
|
||||
public
|
||||
def do_stop
|
||||
@logger.debug("stopping", :plugin => self.class.name)
|
||||
@logger.debug("Stopping", :plugin => self.class.name)
|
||||
@stop_called.make_true
|
||||
stop
|
||||
end
|
||||
|
|
|
@ -27,7 +27,7 @@ module LogStash module Instrument module PeriodicPoller
|
|||
if exception.is_a?(Concurrent::TimeoutError)
|
||||
# On a busy system this can happen, we just log it as a debug
|
||||
# event instead of an error, Some of the JVM calls can take a long time or block.
|
||||
logger.debug("PeriodicPoller: Timeout exception",
|
||||
logger.debug("Timeout exception",
|
||||
:poller => self,
|
||||
:result => result,
|
||||
:polling_timeout => @options[:polling_timeout],
|
||||
|
@ -35,7 +35,7 @@ module LogStash module Instrument module PeriodicPoller
|
|||
:exception => exception.class,
|
||||
:executed_at => time)
|
||||
else
|
||||
logger.error("PeriodicPoller: exception",
|
||||
logger.error("Exception",
|
||||
:poller => self,
|
||||
:result => result,
|
||||
:exception => exception.class,
|
||||
|
@ -50,7 +50,7 @@ module LogStash module Instrument module PeriodicPoller
|
|||
end
|
||||
|
||||
def start
|
||||
logger.debug("PeriodicPoller: Starting",
|
||||
logger.debug("Starting",
|
||||
:polling_interval => @options[:polling_interval],
|
||||
:polling_timeout => @options[:polling_timeout]) if logger.debug?
|
||||
|
||||
|
@ -59,7 +59,7 @@ module LogStash module Instrument module PeriodicPoller
|
|||
end
|
||||
|
||||
def stop
|
||||
logger.debug("PeriodicPoller: Stopping")
|
||||
logger.debug("Stopping")
|
||||
@task.shutdown
|
||||
end
|
||||
|
||||
|
|
|
@ -74,9 +74,7 @@ module LogStash; class BasePipeline
|
|||
parsed_config.process_escape_sequences = settings.get_value("config.support_escapes")
|
||||
config_code = parsed_config.compile
|
||||
|
||||
# config_code = BasePipeline.compileConfig(config_str)
|
||||
|
||||
if settings.get_value("config.debug") && @logger.debug?
|
||||
if settings.get_value("config.debug")
|
||||
@logger.debug("Compiled pipeline code", default_logging_keys(:code => config_code))
|
||||
end
|
||||
|
||||
|
@ -238,7 +236,10 @@ module LogStash; class Pipeline < BasePipeline
|
|||
collect_stats
|
||||
collect_dlq_stats
|
||||
|
||||
@logger.debug("Starting pipeline", default_logging_keys)
|
||||
@logger.info("Starting pipeline", default_logging_keys(
|
||||
"pipeline.workers" => @settings.get("pipeline.workers"),
|
||||
"pipeline.batch.size" => @settings.get("pipeline.batch.size"),
|
||||
"pipeline.batch.delay" => @settings.get("pipeline.batch.delay")))
|
||||
|
||||
@finished_execution = Concurrent::AtomicBoolean.new(false)
|
||||
|
||||
|
@ -249,14 +250,14 @@ module LogStash; class Pipeline < BasePipeline
|
|||
@finished_execution.make_true
|
||||
rescue => e
|
||||
close
|
||||
logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
||||
@logger.error("Pipeline aborted due to error", default_logging_keys(:exception => e, :backtrace => e.backtrace))
|
||||
end
|
||||
end
|
||||
|
||||
status = wait_until_started
|
||||
|
||||
if status
|
||||
logger.debug("Pipeline started successfully", default_logging_keys(:pipeline_id => pipeline_id))
|
||||
@logger.info("Pipeline started succesfully", default_logging_keys)
|
||||
end
|
||||
|
||||
status
|
||||
|
@ -287,8 +288,6 @@ module LogStash; class Pipeline < BasePipeline
|
|||
|
||||
start_workers
|
||||
|
||||
@logger.info("Pipeline started", "pipeline.id" => @pipeline_id)
|
||||
|
||||
# Block until all inputs have stopped
|
||||
# Generally this happens if SIGINT is sent and `shutdown` is called from an external thread
|
||||
|
||||
|
@ -297,14 +296,13 @@ module LogStash; class Pipeline < BasePipeline
|
|||
wait_inputs
|
||||
transition_to_stopped
|
||||
|
||||
@logger.debug("Input plugins stopped! Will shutdown filter/output workers.", default_logging_keys)
|
||||
|
||||
shutdown_flusher
|
||||
@logger.debug("Shutting down filter/output workers", default_logging_keys)
|
||||
shutdown_workers
|
||||
|
||||
close
|
||||
|
||||
@logger.debug("Pipeline has been shutdown", default_logging_keys)
|
||||
@logger.info("Pipeline has terminated", default_logging_keys)
|
||||
|
||||
# exit code
|
||||
return 0
|
||||
|
@ -378,12 +376,6 @@ module LogStash; class Pipeline < BasePipeline
|
|||
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
|
||||
config_metric.gauge(:dead_letter_queue_path, @dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
|
||||
|
||||
|
||||
@logger.info("Starting pipeline", default_logging_keys(
|
||||
"pipeline.workers" => pipeline_workers,
|
||||
"pipeline.batch.size" => batch_size,
|
||||
"pipeline.batch.delay" => batch_delay,
|
||||
"pipeline.max_inflight" => max_inflight))
|
||||
if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
|
||||
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
|
||||
end
|
||||
|
@ -565,19 +557,19 @@ module LogStash; class Pipeline < BasePipeline
|
|||
# stopped
|
||||
wait_for_workers
|
||||
clear_pipeline_metrics
|
||||
@logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id)
|
||||
end # def shutdown
|
||||
|
||||
def wait_for_workers
|
||||
@logger.debug("Closing inputs", default_logging_keys)
|
||||
@worker_threads.map(&:join)
|
||||
@logger.debug("Worker closed", default_logging_keys)
|
||||
@worker_threads.each do |t|
|
||||
t.join
|
||||
@logger.debug("Worker terminated", default_logging_keys(:thread => t.inspect))
|
||||
end
|
||||
end
|
||||
|
||||
def stop_inputs
|
||||
@logger.debug("Closing inputs", default_logging_keys)
|
||||
@logger.debug("Stopping inputs", default_logging_keys)
|
||||
@inputs.each(&:do_stop)
|
||||
@logger.debug("Closed inputs", default_logging_keys)
|
||||
@logger.debug("Stopped inputs", default_logging_keys)
|
||||
end
|
||||
|
||||
# After `shutdown` is called from an external thread this is called from the main thread to
|
||||
|
|
|
@ -73,7 +73,7 @@ class LogStash::Plugin
|
|||
# close is called during shutdown, after the plugin worker
|
||||
# main task terminates
|
||||
def do_close
|
||||
@logger.debug("closing", :plugin => self.class.name)
|
||||
@logger.debug("Closing", :plugin => self.class.name)
|
||||
close
|
||||
end
|
||||
|
||||
|
|
|
@ -58,9 +58,9 @@ en:
|
|||
sighup: >-
|
||||
SIGHUP received.
|
||||
sigint: >-
|
||||
SIGINT received. Shutting down the agent.
|
||||
SIGINT received. Shutting down.
|
||||
sigterm: >-
|
||||
SIGTERM received. Shutting down the agent.
|
||||
SIGTERM received. Shutting down.
|
||||
slow_shutdown: |-
|
||||
Received shutdown signal, but pipeline is still waiting for in-flight events
|
||||
to be processed. Sending another ^C will force quit Logstash, but this may cause
|
||||
|
@ -327,8 +327,6 @@ en:
|
|||
name: |+
|
||||
Specify the name of this logstash instance, if no value is given
|
||||
it will default to the current hostname.
|
||||
agent: |+
|
||||
Specify an alternate agent plugin name.
|
||||
config_debug: |+
|
||||
Print the compiled config ruby code out as a debug log (you must also have --log.level=debug enabled).
|
||||
WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue