From 8724563699eab5d75fd322e6e407124352078892 Mon Sep 17 00:00:00 2001 From: Juarez Bochi Date: Tue, 9 Jul 2013 17:14:09 -0300 Subject: [PATCH 1/4] Add clear_interval and flush_interval options for metrics filter --- lib/logstash/filters/metrics.rb | 55 ++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index 7ab0d0963..ac64ee9c0 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -3,7 +3,7 @@ require "logstash/namespace" # The metrics filter is useful for aggregating metrics. # -# For example, if you have a field 'response' that is +# For example, if you have a field 'response' that is # a http response code, and you want to count each # kind of response, you can do this: # @@ -14,7 +14,8 @@ require "logstash/namespace" # } # } # -# Metrics are flushed every 5 seconds. Metrics appear as +# Metrics are flushed every 5 seconds by default or according to +# 'flush_interval'. Metrics appear as # new events in the event stream and go through any filters # that occur after as well as outputs. # @@ -101,12 +102,12 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base # syntax: `timer => [ "name of metric", "%{time_value}" ]` config :timer, :validate => :hash, :default => {} - # Don't track events that have @timestamp older than some number of seconds. + # Don't track events that have @timestamp older than some number of seconds. # # This is useful if you want to only include events that are near real-time # in your metrics. # - # Example, to only count events that are within 10 seconds of real-time, you + # Example, to only count events that are within 10 seconds of real-time, you # would do this: # # filter { @@ -117,12 +118,20 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base # } config :ignore_older_than, :validate => :number, :default => 0 + # The flush interval, when the metrics event is created + config :flush_interval, :validate => :number, :default => 5 + + # The clear interval, when all counter are reset. + # + # If set to -1, the default value, the metrics will never be cleared. + config :clear_interval, :validate => :number, :default => -1 + def register require "metriks" require "socket" - - @metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) } - @metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) } + @last_flush = 0 # how many seconds ago the metrics where flushed. + @last_clear = 0 # how many seconds ago the metrics where cleared. + initialize_metrics end # def register def filter(event) @@ -144,8 +153,13 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base end # def filter def flush + # Add 5 seconds to @last_flush and @last_clear counters + # since this method is called every 5 seconds. + @last_flush += 5 + @last_clear += 5 + # Do nothing if there's nothing to do ;) - return if @metric_meters.empty? && @metric_timers.empty? + return unless should_flush? event = LogStash::Event.new event["message"] = Socket.gethostname @@ -154,6 +168,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base event["#{name}.rate_1m"] = metric.one_minute_rate event["#{name}.rate_5m"] = metric.five_minute_rate event["#{name}.rate_15m"] = metric.fifteen_minute_rate + metric.clear if should_clear? end @metric_timers.each do |name, metric| @@ -177,9 +192,33 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base event["#{name}.p90"] = metric.snapshot.value(0.90) event["#{name}.p95"] = metric.snapshot.value(0.95) event["#{name}.p99"] = metric.snapshot.value(0.99) + metric.clear if should_clear? + end + + # Reset counter since metrics were flushed + @last_flush = 0 + + if should_clear? + #Reset counter since metrics were cleared + @last_clear = 0 + initialize_metrics end filter_matched(event) return [event] end + + private + def initialize_metrics + @metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) } + @metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) } + end + + def should_flush? + @last_flush >= @flush_interval && (@metric_meters.any? || @metric_timers.any?) + end + + def should_clear? + @clear_interval > 0 && @last_clear >= @clear_interval + end end # class LogStash::Filters::Metrics From c211a47314711d374e72ee0a74b1b2735e0ae74c Mon Sep 17 00:00:00 2001 From: Juarez Bochi Date: Tue, 9 Jul 2013 17:19:38 -0300 Subject: [PATCH 2/4] Add observation that intervals should be a multiple of 5s --- lib/logstash/filters/metrics.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index ac64ee9c0..8784c82f9 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -118,12 +118,13 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base # } config :ignore_older_than, :validate => :number, :default => 0 - # The flush interval, when the metrics event is created + # The flush interval, when the metrics event is created. Must be a multiple of 5s. config :flush_interval, :validate => :number, :default => 5 # The clear interval, when all counter are reset. # # If set to -1, the default value, the metrics will never be cleared. + # Otherwise, should be a multiple of 5s. config :clear_interval, :validate => :number, :default => -1 def register From de0bdacbf70ad9394adfce9d1e96ee63c118b8ca Mon Sep 17 00:00:00 2001 From: Juarez Bochi Date: Fri, 12 Jul 2013 15:37:51 -0300 Subject: [PATCH 3/4] Adds tests for metrics filter and fixes key collision When using multiple instances of the metrics filter, the keys collided, making the counters interfere with each other. --- lib/logstash/filters/metrics.rb | 10 ++++- spec/filters/metrics.rb | 77 +++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 2 deletions(-) create mode 100644 spec/filters/metrics.rb diff --git a/lib/logstash/filters/metrics.rb b/lib/logstash/filters/metrics.rb index 8784c82f9..6069c2190 100644 --- a/lib/logstash/filters/metrics.rb +++ b/lib/logstash/filters/metrics.rb @@ -1,3 +1,4 @@ +require "securerandom" require "logstash/filters/base" require "logstash/namespace" @@ -132,6 +133,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base require "socket" @last_flush = 0 # how many seconds ago the metrics where flushed. @last_clear = 0 # how many seconds ago the metrics where cleared. + @random_key_preffix = SecureRandom.hex initialize_metrics end # def register @@ -211,8 +213,12 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base private def initialize_metrics - @metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) } - @metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) } + @metric_meters = Hash.new { |h,k| h[k] = Metriks.meter metric_key(k) } + @metric_timers = Hash.new { |h,k| h[k] = Metriks.timer metric_key(k) } + end + + def metric_key(key) + "#{@random_key_preffix}_#{key}" end def should_flush? diff --git a/spec/filters/metrics.rb b/spec/filters/metrics.rb new file mode 100644 index 000000000..863027c7b --- /dev/null +++ b/spec/filters/metrics.rb @@ -0,0 +1,77 @@ +require "logstash/filters/metrics" + +describe LogStash::Filters::Metrics do + + context "with basic config" do + context "when no events were received" do + it "should not flush" do + config = {"meter" => ["http.%{response}"]} + filter = LogStash::Filters::Metrics.new config + filter.register + + events = filter.flush + insist { events }.nil? + end + end + + context "when events are received" do + context "on the first flush" do + it "should flush counts" do + config = {"meter" => ["http.%{response}"]} + filter = LogStash::Filters::Metrics.new config + filter.register + filter.filter LogStash::Event.new({"response" => 200}) + filter.filter LogStash::Event.new({"response" => 200}) + filter.filter LogStash::Event.new({"response" => 404}) + + events = filter.flush + insist { events.length } == 1 + insist { events.first["http.200.count"] } == 2 + insist { events.first["http.404.count"] } == 1 + end + end + + context "on the second flush" do + it "should not reset counts" do + config = {"meter" => ["http.%{response}"]} + filter = LogStash::Filters::Metrics.new config + filter.register + filter.filter LogStash::Event.new({"response" => 200}) + filter.filter LogStash::Event.new({"response" => 200}) + filter.filter LogStash::Event.new({"response" => 404}) + + events = filter.flush + events = filter.flush + insist { events.length } == 1 + insist { events.first["http.200.count"] } == 2 + insist { events.first["http.404.count"] } == 1 + end + end + end + end + + context "with multiple instances" do + it "counts should be independent" do + config_tag1 = {"meter" => ["http.%{response}"], "tags" => ["tag1"]} + config_tag2 = {"meter" => ["http.%{response}"], "tags" => ["tag2"]} + filter_tag1 = LogStash::Filters::Metrics.new config_tag1 + filter_tag2 = LogStash::Filters::Metrics.new config_tag2 + event_tag1 = LogStash::Event.new({"response" => 200, "tags" => [ "tag1" ]}) + event_tag2 = LogStash::Event.new({"response" => 200, "tags" => [ "tag2" ]}) + event2_tag2 = LogStash::Event.new({"response" => 200, "tags" => [ "tag2" ]}) + filter_tag1.register + filter_tag2.register + + [event_tag1, event_tag2, event2_tag2].each do |event| + filter_tag1.filter event + filter_tag2.filter event + end + + events_tag1 = filter_tag1.flush + events_tag2 = filter_tag2.flush + + insist { events_tag1.first["http.200.count"] } == 1 + insist { events_tag2.first["http.200.count"] } == 2 + end + end +end From 313923e7e64c89bce8a2e917207172b1e0d07ec9 Mon Sep 17 00:00:00 2001 From: Juarez Bochi Date: Fri, 12 Jul 2013 16:06:32 -0300 Subject: [PATCH 4/4] Add metrics filter tests for clear and flush intervals --- spec/filters/metrics.rb | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/spec/filters/metrics.rb b/spec/filters/metrics.rb index 863027c7b..eb38658a4 100644 --- a/spec/filters/metrics.rb +++ b/spec/filters/metrics.rb @@ -74,4 +74,35 @@ describe LogStash::Filters::Metrics do insist { events_tag2.first["http.200.count"] } == 2 end end + + context "when a custom flush_interval is set" do + it "should flush only when required" do + config = {"meter" => ["http.%{response}"], "flush_interval" => 15} + filter = LogStash::Filters::Metrics.new config + filter.register + filter.filter LogStash::Event.new({"response" => 200}) + + insist { filter.flush }.nil? # 5s + insist { filter.flush }.nil? # 10s + insist { filter.flush.length } == 1 # 15s + insist { filter.flush }.nil? # 20s + insist { filter.flush }.nil? # 25s + insist { filter.flush.length } == 1 # 30s + end + end + + context "when a custom clear_interval is set" do + it "should clear the metrics after interval has passed" do + config = {"meter" => ["http.%{response}"], "clear_interval" => 15} + filter = LogStash::Filters::Metrics.new config + filter.register + filter.filter LogStash::Event.new({"response" => 200}) + + insist { filter.flush.first["http.200.count"] } == 1 # 5s + insist { filter.flush.first["http.200.count"] } == 1 # 10s + insist { filter.flush.first["http.200.count"] } == 1 # 15s + insist { filter.flush }.nil? # 20s + end + end + end