mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Code cleanup for the collector observer
We have more the responsability of watching the collector inside the input itself, this feature might come back when we have a new execution model that can be improved in watching metrics. But this would require more granular watchers. No tests were affected by this changes since the code that required that features was already removed. Fixes: #6447 Fixes #6456
This commit is contained in:
parent
23e9d910d1
commit
6250974508
2 changed files with 1 additions and 47 deletions
|
@ -130,7 +130,6 @@ class LogStash::Agent
|
||||||
end
|
end
|
||||||
|
|
||||||
def stop_collecting_metrics
|
def stop_collecting_metrics
|
||||||
@collector.stop
|
|
||||||
@periodic_pollers.stop
|
@periodic_pollers.stop
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -11,12 +11,8 @@ module LogStash module Instrument
|
||||||
# The Collector is the single point of reference for all
|
# The Collector is the single point of reference for all
|
||||||
# the metrics collection inside logstash, the metrics library will make
|
# the metrics collection inside logstash, the metrics library will make
|
||||||
# direct calls to this class.
|
# direct calls to this class.
|
||||||
#
|
|
||||||
# This class is an observable responsable of periodically emitting view of the system
|
|
||||||
# to other components like the internal metrics pipelines.
|
|
||||||
class Collector
|
class Collector
|
||||||
include LogStash::Util::Loggable
|
include LogStash::Util::Loggable
|
||||||
include Observable
|
|
||||||
|
|
||||||
SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds
|
SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds
|
||||||
SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds
|
SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds
|
||||||
|
@ -26,7 +22,6 @@ module LogStash module Instrument
|
||||||
def initialize
|
def initialize
|
||||||
@metric_store = MetricStore.new
|
@metric_store = MetricStore.new
|
||||||
@agent = nil
|
@agent = nil
|
||||||
start_periodic_snapshotting
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# The metric library will call this unique interface
|
# The metric library will call this unique interface
|
||||||
|
@ -43,8 +38,6 @@ module LogStash module Instrument
|
||||||
end
|
end
|
||||||
|
|
||||||
metric.execute(*metric_type_params)
|
metric.execute(*metric_type_params)
|
||||||
|
|
||||||
changed # we had changes coming in so we can notify the observers
|
|
||||||
rescue MetricStore::NamespacesExpectedError => e
|
rescue MetricStore::NamespacesExpectedError => e
|
||||||
logger.error("Collector: Cannot record metric", :exception => e)
|
logger.error("Collector: Cannot record metric", :exception => e)
|
||||||
rescue NameError => e
|
rescue NameError => e
|
||||||
|
@ -58,51 +51,13 @@ module LogStash module Instrument
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Monitor the `Concurrent::TimerTask` this update is triggered on every successful or not
|
|
||||||
# run of the task, TimerTask implement Observable and the collector acts as
|
|
||||||
# the observer and will keep track if something went wrong in the execution.
|
|
||||||
#
|
|
||||||
# @param [Time] Time of execution
|
|
||||||
# @param [result] Result of the execution
|
|
||||||
# @param [Exception] Exception
|
|
||||||
def update(time_of_execution, result, exception)
|
|
||||||
return true if exception.nil?
|
|
||||||
logger.error("Collector: Something went wrong went sending data to the observers",
|
|
||||||
:execution_time => time_of_execution,
|
|
||||||
:result => result,
|
|
||||||
:exception => exception.class.name)
|
|
||||||
end
|
|
||||||
|
|
||||||
# Snapshot the current Metric Store and return it immediately,
|
# Snapshot the current Metric Store and return it immediately,
|
||||||
# This is useful if you want to get access to the current metric store without
|
# This is useful if you want to get access to the current metric store without
|
||||||
# waiting for a periodic call.
|
# waiting for a periodic call.
|
||||||
#
|
#
|
||||||
# @return [LogStash::Instrument::MetricStore]
|
# @return [LogStash::Instrument::MetricStore]
|
||||||
def snapshot_metric
|
def snapshot_metric
|
||||||
Snapshot.new(@metric_store)
|
Snapshot.new(@metric_store.dup)
|
||||||
end
|
|
||||||
|
|
||||||
# Configure and start the periodic task for snapshotting the `MetricStore`
|
|
||||||
def start_periodic_snapshotting
|
|
||||||
@snapshot_task = Concurrent::TimerTask.new { publish_snapshot }
|
|
||||||
@snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS
|
|
||||||
@snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS
|
|
||||||
@snapshot_task.add_observer(self)
|
|
||||||
@snapshot_task.execute
|
|
||||||
end
|
|
||||||
|
|
||||||
def stop
|
|
||||||
@snapshot_task.shutdown
|
|
||||||
end
|
|
||||||
|
|
||||||
# Create a snapshot of the MetricStore and send it to to the registered observers
|
|
||||||
# The observer will receive the following signature in the update methode.
|
|
||||||
#
|
|
||||||
# `#update(created_at, metric_store)`
|
|
||||||
def publish_snapshot
|
|
||||||
created_at = Time.now
|
|
||||||
logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug?
|
|
||||||
notify_observers(snapshot_metric)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def clear(keypath)
|
def clear(keypath)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue