From 6250974508343f1fb8779badf233ac3afef5913c Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Wed, 21 Dec 2016 10:46:54 -0500 Subject: [PATCH] Code cleanup for the collector observer We have more the responsability of watching the collector inside the input itself, this feature might come back when we have a new execution model that can be improved in watching metrics. But this would require more granular watchers. No tests were affected by this changes since the code that required that features was already removed. Fixes: #6447 Fixes #6456 --- logstash-core/lib/logstash/agent.rb | 1 - .../lib/logstash/instrument/collector.rb | 47 +------------------ 2 files changed, 1 insertion(+), 47 deletions(-) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 8e73fd461..16ba2e583 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -130,7 +130,6 @@ class LogStash::Agent end def stop_collecting_metrics - @collector.stop @periodic_pollers.stop end diff --git a/logstash-core/lib/logstash/instrument/collector.rb b/logstash-core/lib/logstash/instrument/collector.rb index c6946781f..25ee3b7e7 100644 --- a/logstash-core/lib/logstash/instrument/collector.rb +++ b/logstash-core/lib/logstash/instrument/collector.rb @@ -11,12 +11,8 @@ module LogStash module Instrument # The Collector is the single point of reference for all # the metrics collection inside logstash, the metrics library will make # direct calls to this class. - # - # This class is an observable responsable of periodically emitting view of the system - # to other components like the internal metrics pipelines. class Collector include LogStash::Util::Loggable - include Observable SNAPSHOT_ROTATION_TIME_SECS = 1 # seconds SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS = 10 * 60 # seconds @@ -26,7 +22,6 @@ module LogStash module Instrument def initialize @metric_store = MetricStore.new @agent = nil - start_periodic_snapshotting end # The metric library will call this unique interface @@ -43,8 +38,6 @@ module LogStash module Instrument end metric.execute(*metric_type_params) - - changed # we had changes coming in so we can notify the observers rescue MetricStore::NamespacesExpectedError => e logger.error("Collector: Cannot record metric", :exception => e) rescue NameError => e @@ -58,51 +51,13 @@ module LogStash module Instrument end end - # Monitor the `Concurrent::TimerTask` this update is triggered on every successful or not - # run of the task, TimerTask implement Observable and the collector acts as - # the observer and will keep track if something went wrong in the execution. - # - # @param [Time] Time of execution - # @param [result] Result of the execution - # @param [Exception] Exception - def update(time_of_execution, result, exception) - return true if exception.nil? - logger.error("Collector: Something went wrong went sending data to the observers", - :execution_time => time_of_execution, - :result => result, - :exception => exception.class.name) - end - # Snapshot the current Metric Store and return it immediately, # This is useful if you want to get access to the current metric store without # waiting for a periodic call. # # @return [LogStash::Instrument::MetricStore] def snapshot_metric - Snapshot.new(@metric_store) - end - - # Configure and start the periodic task for snapshotting the `MetricStore` - def start_periodic_snapshotting - @snapshot_task = Concurrent::TimerTask.new { publish_snapshot } - @snapshot_task.execution_interval = SNAPSHOT_ROTATION_TIME_SECS - @snapshot_task.timeout_interval = SNAPSHOT_ROTATION_TIMEOUT_INTERVAL_SECS - @snapshot_task.add_observer(self) - @snapshot_task.execute - end - - def stop - @snapshot_task.shutdown - end - - # Create a snapshot of the MetricStore and send it to to the registered observers - # The observer will receive the following signature in the update methode. - # - # `#update(created_at, metric_store)` - def publish_snapshot - created_at = Time.now - logger.debug("Collector: Sending snapshot to observers", :created_at => created_at) if logger.debug? - notify_observers(snapshot_metric) + Snapshot.new(@metric_store.dup) end def clear(keypath)