diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 8e73fd461..16ba2e583 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -130,7 +130,6 @@ class LogStash::Agent end def stop_collecting_metrics - @collector.stop @periodic_pollers.stop end diff --git a/logstash-core/lib/logstash/instrument/collector.rb b/logstash-core/lib/logstash/instrument/collector.rb index c6946781f..25ee3b7e7 100644 --- a/logstash-core/lib/logstash/instrument/collector.rb +++ b/logstash-core/lib/logstash/instrument/collector.rb @@ -11,12 +11,8 @@ module LogStash module Instrument # The Collector is the single point of reference for all # the metrics collection inside logstash, the metrics library will make # direct calls to this class. - # - # This class is an observable responsable of periodically emitting view of the system - # to other components like the internal metrics pipelines. class Collector include LogStash::Util::Loggable - include Observable SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds @@ -26,7 +22,6 @@ module LogStash module Instrument def initialize @metric_store = MetricStore.new @agent = nil - start_periodic_snapshotting end # The metric library will call this unique interface @@ -43,8 +38,6 @@ module LogStash module Instrument end metric.execute(*metric_type_params) - - changed # we had changes coming in so we can notify the observers rescue MetricStore::NamespacesExpectedError => e logger.error("Collector: Cannot record metric", :exception => e) rescue NameError => e @@ -58,51 +51,13 @@ module LogStash module Instrument end end - # Monitor the `Concurrent::TimerTask` this update is triggered on every successful or not - # run of the task, TimerTask implement Observable and the collector acts as - # the observer and will keep track if something went wrong in the execution. - # - # @param [Time] Time of execution - # @param [result] Result of the execution - # @param [Exception] Exception - def update(time_of_execution, result, exception) - return true if exception.nil? - logger.error("Collector: Something went wrong went sending data to the observers", - :execution_time => time_of_execution, - :result => result, - :exception => exception.class.name) - end - # Snapshot the current Metric Store and return it immediately, # This is useful if you want to get access to the current metric store without # waiting for a periodic call. # # @return [LogStash::Instrument::MetricStore] def snapshot_metric - Snapshot.new(@metric_store) - end - - # Configure and start the periodic task for snapshotting the `MetricStore` - def start_periodic_snapshotting - @snapshot_task = Concurrent::TimerTask.new { publish_snapshot } - @snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS - @snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS - @snapshot_task.add_observer(self) - @snapshot_task.execute - end - - def stop - @snapshot_task.shutdown - end - - # Create a snapshot of the MetricStore and send it to to the registered observers - # The observer will receive the following signature in the update methode. - # - # `#update(created_at, metric_store)` - def publish_snapshot - created_at = Time.now - logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug? - notify_observers(snapshot_metric) + Snapshot.new(@metric_store.dup) end def clear(keypath)