mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge pull request #545 from jbochi/master
Add clear_interval and flush_interval options for metrics filter
This commit is contained in:
commit
45c7896fff
2 changed files with 162 additions and 8 deletions
|
@ -1,9 +1,10 @@
|
|||
require "securerandom"
|
||||
require "logstash/filters/base"
|
||||
require "logstash/namespace"
|
||||
|
||||
# The metrics filter is useful for aggregating metrics.
|
||||
#
|
||||
# For example, if you have a field 'response' that is
|
||||
# For example, if you have a field 'response' that is
|
||||
# a http response code, and you want to count each
|
||||
# kind of response, you can do this:
|
||||
#
|
||||
|
@ -14,7 +15,8 @@ require "logstash/namespace"
|
|||
# }
|
||||
# }
|
||||
#
|
||||
# Metrics are flushed every 5 seconds. Metrics appear as
|
||||
# Metrics are flushed every 5 seconds by default or according to
|
||||
# 'flush_interval'. Metrics appear as
|
||||
# new events in the event stream and go through any filters
|
||||
# that occur after as well as outputs.
|
||||
#
|
||||
|
@ -101,12 +103,12 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
|
|||
# syntax: `timer => [ "name of metric", "%{time_value}" ]`
|
||||
config :timer, :validate => :hash, :default => {}
|
||||
|
||||
# Don't track events that have @timestamp older than some number of seconds.
|
||||
# Don't track events that have @timestamp older than some number of seconds.
|
||||
#
|
||||
# This is useful if you want to only include events that are near real-time
|
||||
# in your metrics.
|
||||
#
|
||||
# Example, to only count events that are within 10 seconds of real-time, you
|
||||
# Example, to only count events that are within 10 seconds of real-time, you
|
||||
# would do this:
|
||||
#
|
||||
# filter {
|
||||
|
@ -117,12 +119,22 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
|
|||
# }
|
||||
config :ignore_older_than, :validate => :number, :default => 0
|
||||
|
||||
# The flush interval, when the metrics event is created. Must be a multiple of 5s.
|
||||
config :flush_interval, :validate => :number, :default => 5
|
||||
|
||||
# The clear interval, when all counter are reset.
|
||||
#
|
||||
# If set to -1, the default value, the metrics will never be cleared.
|
||||
# Otherwise, should be a multiple of 5s.
|
||||
config :clear_interval, :validate => :number, :default => -1
|
||||
|
||||
def register
|
||||
require "metriks"
|
||||
require "socket"
|
||||
|
||||
@metric_meters = Hash.new { |h,k| h[k] = Metriks.meter(k) }
|
||||
@metric_timers = Hash.new { |h,k| h[k] = Metriks.timer(k) }
|
||||
@last_flush = 0 # how many seconds ago the metrics where flushed.
|
||||
@last_clear = 0 # how many seconds ago the metrics where cleared.
|
||||
@random_key_preffix = SecureRandom.hex
|
||||
initialize_metrics
|
||||
end # def register
|
||||
|
||||
def filter(event)
|
||||
|
@ -144,8 +156,13 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
|
|||
end # def filter
|
||||
|
||||
def flush
|
||||
# Add 5 seconds to @last_flush and @last_clear counters
|
||||
# since this method is called every 5 seconds.
|
||||
@last_flush += 5
|
||||
@last_clear += 5
|
||||
|
||||
# Do nothing if there's nothing to do ;)
|
||||
return if @metric_meters.empty? && @metric_timers.empty?
|
||||
return unless should_flush?
|
||||
|
||||
event = LogStash::Event.new
|
||||
event["message"] = Socket.gethostname
|
||||
|
@ -154,6 +171,7 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
|
|||
event["#{name}.rate_1m"] = metric.one_minute_rate
|
||||
event["#{name}.rate_5m"] = metric.five_minute_rate
|
||||
event["#{name}.rate_15m"] = metric.fifteen_minute_rate
|
||||
metric.clear if should_clear?
|
||||
end
|
||||
|
||||
@metric_timers.each do |name, metric|
|
||||
|
@ -177,9 +195,37 @@ class LogStash::Filters::Metrics < LogStash::Filters::Base
|
|||
event["#{name}.p90"] = metric.snapshot.value(0.90)
|
||||
event["#{name}.p95"] = metric.snapshot.value(0.95)
|
||||
event["#{name}.p99"] = metric.snapshot.value(0.99)
|
||||
metric.clear if should_clear?
|
||||
end
|
||||
|
||||
# Reset counter since metrics were flushed
|
||||
@last_flush = 0
|
||||
|
||||
if should_clear?
|
||||
#Reset counter since metrics were cleared
|
||||
@last_clear = 0
|
||||
initialize_metrics
|
||||
end
|
||||
|
||||
filter_matched(event)
|
||||
return [event]
|
||||
end
|
||||
|
||||
private
|
||||
def initialize_metrics
|
||||
@metric_meters = Hash.new { |h,k| h[k] = Metriks.meter metric_key(k) }
|
||||
@metric_timers = Hash.new { |h,k| h[k] = Metriks.timer metric_key(k) }
|
||||
end
|
||||
|
||||
def metric_key(key)
|
||||
"#{@random_key_preffix}_#{key}"
|
||||
end
|
||||
|
||||
def should_flush?
|
||||
@last_flush >= @flush_interval && (@metric_meters.any? || @metric_timers.any?)
|
||||
end
|
||||
|
||||
def should_clear?
|
||||
@clear_interval > 0 && @last_clear >= @clear_interval
|
||||
end
|
||||
end # class LogStash::Filters::Metrics
|
||||
|
|
108
spec/filters/metrics.rb
Normal file
108
spec/filters/metrics.rb
Normal file
|
@ -0,0 +1,108 @@
|
|||
require "logstash/filters/metrics"
|
||||
|
||||
describe LogStash::Filters::Metrics do
|
||||
|
||||
context "with basic config" do
|
||||
context "when no events were received" do
|
||||
it "should not flush" do
|
||||
config = {"meter" => ["http.%{response}"]}
|
||||
filter = LogStash::Filters::Metrics.new config
|
||||
filter.register
|
||||
|
||||
events = filter.flush
|
||||
insist { events }.nil?
|
||||
end
|
||||
end
|
||||
|
||||
context "when events are received" do
|
||||
context "on the first flush" do
|
||||
it "should flush counts" do
|
||||
config = {"meter" => ["http.%{response}"]}
|
||||
filter = LogStash::Filters::Metrics.new config
|
||||
filter.register
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
filter.filter LogStash::Event.new({"response" => 404})
|
||||
|
||||
events = filter.flush
|
||||
insist { events.length } == 1
|
||||
insist { events.first["http.200.count"] } == 2
|
||||
insist { events.first["http.404.count"] } == 1
|
||||
end
|
||||
end
|
||||
|
||||
context "on the second flush" do
|
||||
it "should not reset counts" do
|
||||
config = {"meter" => ["http.%{response}"]}
|
||||
filter = LogStash::Filters::Metrics.new config
|
||||
filter.register
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
filter.filter LogStash::Event.new({"response" => 404})
|
||||
|
||||
events = filter.flush
|
||||
events = filter.flush
|
||||
insist { events.length } == 1
|
||||
insist { events.first["http.200.count"] } == 2
|
||||
insist { events.first["http.404.count"] } == 1
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context "with multiple instances" do
|
||||
it "counts should be independent" do
|
||||
config_tag1 = {"meter" => ["http.%{response}"], "tags" => ["tag1"]}
|
||||
config_tag2 = {"meter" => ["http.%{response}"], "tags" => ["tag2"]}
|
||||
filter_tag1 = LogStash::Filters::Metrics.new config_tag1
|
||||
filter_tag2 = LogStash::Filters::Metrics.new config_tag2
|
||||
event_tag1 = LogStash::Event.new({"response" => 200, "tags" => [ "tag1" ]})
|
||||
event_tag2 = LogStash::Event.new({"response" => 200, "tags" => [ "tag2" ]})
|
||||
event2_tag2 = LogStash::Event.new({"response" => 200, "tags" => [ "tag2" ]})
|
||||
filter_tag1.register
|
||||
filter_tag2.register
|
||||
|
||||
[event_tag1, event_tag2, event2_tag2].each do |event|
|
||||
filter_tag1.filter event
|
||||
filter_tag2.filter event
|
||||
end
|
||||
|
||||
events_tag1 = filter_tag1.flush
|
||||
events_tag2 = filter_tag2.flush
|
||||
|
||||
insist { events_tag1.first["http.200.count"] } == 1
|
||||
insist { events_tag2.first["http.200.count"] } == 2
|
||||
end
|
||||
end
|
||||
|
||||
context "when a custom flush_interval is set" do
|
||||
it "should flush only when required" do
|
||||
config = {"meter" => ["http.%{response}"], "flush_interval" => 15}
|
||||
filter = LogStash::Filters::Metrics.new config
|
||||
filter.register
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
|
||||
insist { filter.flush }.nil? # 5s
|
||||
insist { filter.flush }.nil? # 10s
|
||||
insist { filter.flush.length } == 1 # 15s
|
||||
insist { filter.flush }.nil? # 20s
|
||||
insist { filter.flush }.nil? # 25s
|
||||
insist { filter.flush.length } == 1 # 30s
|
||||
end
|
||||
end
|
||||
|
||||
context "when a custom clear_interval is set" do
|
||||
it "should clear the metrics after interval has passed" do
|
||||
config = {"meter" => ["http.%{response}"], "clear_interval" => 15}
|
||||
filter = LogStash::Filters::Metrics.new config
|
||||
filter.register
|
||||
filter.filter LogStash::Event.new({"response" => 200})
|
||||
|
||||
insist { filter.flush.first["http.200.count"] } == 1 # 5s
|
||||
insist { filter.flush.first["http.200.count"] } == 1 # 10s
|
||||
insist { filter.flush.first["http.200.count"] } == 1 # 15s
|
||||
insist { filter.flush }.nil? # 20s
|
||||
end
|
||||
end
|
||||
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue