diff --git a/lib/logstash/filters/elapsed.rb b/lib/logstash/filters/elapsed.rb new file mode 100644 index 000000000..64881f5b9 --- /dev/null +++ b/lib/logstash/filters/elapsed.rb @@ -0,0 +1,256 @@ +# elapsed filter +# +# This filter tracks a pair of start/end events and calculates the elapsed +# time between them. + +require "logstash/filters/base" +require "logstash/namespace" +require 'thread' + +# The elapsed filter tracks a pair of start/end events and uses their +# timestamps to calculate the elapsed time between them. +# +# The filter has been developed to track the execution time of processes and +# other long tasks. +# +# The configuration looks like this: +# +# filter { +# elapsed { +# start_tag => "start event tag" +# end_tag => "end event tag" +# unique_id_field => "id field name" +# timeout => seconds +# new_event_on_match => true/false +# } +# } +# +# The events managed by this filter must have some particular properties. +# The event describing the start of the task (the "start event") must contain +# a tag equal to 'start_tag'. On the other side, the event describing the end +# of the task (the "end event") must contain a tag equal to 'end_tag'. Both +# these two kinds of event need to own an ID field which identify uniquely that +# particular task. The name of this field is stored in 'unique_id_field'. +# +# You can use a Grok filter to prepare the events for the elapsed filter. +# An example of configuration can be: +# +# filter { +# grok { +# match => ["message", "%{TIMESTAMP_ISO8601} START id: (?.*)"] +# add_tag => [ "taskStarted" ] +# } +# +# grok { +# match => ["message", "%{TIMESTAMP_ISO8601} END id: (?.*)"] +# add_tag => [ "taskTerminated"] +# } +# +# elapsed { +# start_tag => "taskStarted" +# end_tag => "taskTerminated" +# unique_id_field => "task_id" +# } +# } +# +# The elapsed filter collects all the "start events". If two, or more, "start +# events" have the same ID, only the first one is recorded, the others are +# discarded. +# +# When an "end event" matching a previously collected "start event" is +# received, there is a match. The configuration property 'new_event_on_match' +# tells where to insert the elapsed information: they can be added to the +# "end event" or a new "match event" can be created. Both events store the +# following information: +# - the tags "elapsed" and "elapsed.match" +# - the field "elapsed.time" with the difference, in seconds, between +# the two events timestamps +# - an ID filed with the task ID +# - the field "elapsed.timestamp_start" with the timestamp of the "start event" +# +# If the "end event" does not arrive before "timeout" seconds, the +# "start event" is discarded and an "expired event" is generated. This event +# contains: +# - the tags "elapsed" and "elapsed.expired_error" +# - a field called "elapsed.time" with the age, in seconds, of the +# "start event" +# - an ID filed with the task ID +# - the field "elapsed.timestamp_start" with the timestamp of the "start event" +# +class LogStash::Filters::Elapsed < LogStash::Filters::Base + PREFIX = "elapsed." + ELAPSED_FIELD = PREFIX + "time" + TIMESTAMP_START_EVENT_FIELD = PREFIX + "timestamp_start" + HOST_FIELD = "host" + + ELAPSED_TAG = "elapsed" + EXPIRED_ERROR_TAG = PREFIX + "expired_error" + END_WITHOUT_START_TAG = PREFIX + "end_wtihout_start" + MATCH_TAG = PREFIX + "match" + + config_name "elapsed" + milestone 1 + + # The name of the tag identifying the "start event" + config :start_tag, :validate => :string, :required => true + + # The name of the tag identifying the "end event" + config :end_tag, :validate => :string, :required => true + + # The name of the field containing the task ID. + # This value must uniquely identify the task in the system, otherwise + # it's impossible to match the couple of events. + config :unique_id_field, :validate => :string, :required => true + + # The amount of seconds after an "end event" can be considered lost. + # The corresponding "start event" is discarded and an "expired event" + # is generated. The default value is 30 minutes (1800 seconds). + config :timeout, :validate => :number, :required => false, :default => 1800 + + # This property manage what to do when an "end event" matches a "start event". + # If it's set to 'false' (default value), the elapsed information are added + # to the "end event"; if it's set to 'true' a new "match event" is created. + config :new_event_on_match, :validate => :boolean, :required => false, :default => false + + public + def register + @mutex = Mutex.new + # This is the state of the filter. The keys are the "unique_id_field", + # the values are couples of values: + @start_events = {} + + @logger.info("Elapsed, timeout: #{@timeout} seconds") + end + + # Getter method used for the tests + def start_events + @start_events + end + + def filter(event) + return unless filter?(event) + + unique_id = event[@unique_id_field] + return if unique_id.nil? + + if(start_event?(event)) + filter_matched(event) + @logger.info("Elapsed, 'start event' received", start_tag: @start_tag, unique_id_field: @unique_id_field) + + @mutex.synchronize do + unless(@start_events.has_key?(unique_id)) + @start_events[unique_id] = LogStash::Filters::Elapsed::Element.new(event) + end + end + + elsif(end_event?(event)) + filter_matched(event) + @logger.info("Elapsed, 'end event' received", end_tag: @end_tag, unique_id_field: @unique_id_field) + + @mutex.lock + if(@start_events.has_key?(unique_id)) + start_event = @start_events.delete(unique_id).event + @mutex.unlock + elapsed = event["@timestamp"] - start_event["@timestamp"] + if(@new_event_on_match) + elapsed_event = new_elapsed_event(elapsed, unique_id, start_event["@timestamp"]) + filter_matched(elapsed_event) + yield elapsed_event if block_given? + else + return add_elapsed_info(event, elapsed, unique_id, start_event["@timestamp"]) + end + else + @mutex.unlock + # The "start event" did not arrive. + event.tag(END_WITHOUT_START_TAG) + end + end + end # def filter + + # The method is invoked by LogStash every 5 seconds. + def flush() + expired_elements = [] + + @mutex.synchronize do + increment_age_by(5) + expired_elements = remove_expired_elements() + end + + return create_expired_events_from(expired_elements) + end + + private + def increment_age_by(seconds) + @start_events.each_pair do |key, element| + element.age += seconds + end + end + + # Remove the expired "start events" from the internal + # buffer and return them. + def remove_expired_elements() + expired = [] + @start_events.delete_if do |key, element| + if(element.age >= @timeout) + expired << element + next true + end + next false + end + + return expired + end + + def create_expired_events_from(expired_elements) + events = [] + expired_elements.each do |element| + error_event = LogStash::Event.new + error_event.tag(ELAPSED_TAG) + error_event.tag(EXPIRED_ERROR_TAG) + + error_event[HOST_FIELD] = Socket.gethostname + error_event[@unique_id_field] = element.event[@unique_id_field] + error_event[ELAPSED_FIELD] = element.age + error_event[TIMESTAMP_START_EVENT_FIELD] = element.event["@timestamp"] + + events << error_event + filter_matched(error_event) + end + + return events + end + + def start_event?(event) + return (event["tags"] != nil && event["tags"].include?(@start_tag)) + end + + def end_event?(event) + return (event["tags"] != nil && event["tags"].include?(@end_tag)) + end + + def new_elapsed_event(elapsed_time, unique_id, timestamp_start_event) + new_event = LogStash::Event.new + new_event[HOST_FIELD] = Socket.gethostname + return add_elapsed_info(new_event, elapsed_time, unique_id, timestamp_start_event) + end + + def add_elapsed_info(event, elapsed_time, unique_id, timestamp_start_event) + event.tag(ELAPSED_TAG) + event.tag(MATCH_TAG) + + event[ELAPSED_FIELD] = elapsed_time + event[@unique_id_field] = unique_id + event[TIMESTAMP_START_EVENT_FIELD] = timestamp_start_event + + return event + end +end # class LogStash::Filters::Elapsed + +class LogStash::Filters::Elapsed::Element + attr_accessor :event, :age + + def initialize(event) + @event = event + @age = 0 + end +end diff --git a/spec/filters/elapsed.rb b/spec/filters/elapsed.rb new file mode 100644 index 000000000..067705a4b --- /dev/null +++ b/spec/filters/elapsed.rb @@ -0,0 +1,294 @@ +require 'socket' +require "logstash/filters/elapsed" + +describe LogStash::Filters::Elapsed do + START_TAG = "startTag" + END_TAG = "endTag" + ID_FIELD = "uniqueIdField" + + def event(data) + data["message"] ||= "Log message" + LogStash::Event.new(data) + end + + def start_event(data) + data["tags"] ||= [] + data["tags"] << START_TAG + event(data) + end + + def end_event(data = {}) + data["tags"] ||= [] + data["tags"] << END_TAG + event(data) + end + + before(:each) do + setup_filter() + end + + def setup_filter(config = {}) + @config = {"start_tag" => START_TAG, "end_tag" => END_TAG, "unique_id_field" => ID_FIELD} + @config.merge!(config) + @filter = LogStash::Filters::Elapsed.new(@config) + @filter.register + end + + context "General validation" do + describe "receiving an event without start or end tag" do + it "does not record it" do + @filter.filter(event("message" => "Log message")) + insist { @filter.start_events.size } == 0 + end + end + + describe "receiving an event with a different start/end tag from the ones specified in the configuration" do + it "does not record it" do + @filter.filter(event("tags" => ["tag1", "tag2"])) + insist { @filter.start_events.size } == 0 + end + end + end + + context "Start event" do + describe "receiving an event with a valid start tag" do + describe "but without an unique id field" do + it "does not record it" do + @filter.filter(event("tags" => ["tag1", START_TAG])) + insist { @filter.start_events.size } == 0 + end + end + + describe "and a valid id field" do + it "records it" do + event = start_event(ID_FIELD => "id123") + @filter.filter(event) + + insist { @filter.start_events.size } == 1 + insist { @filter.start_events["id123"].event } == event + end + end + end + + describe "receiving two 'start events' for the same id field" do + it "keeps the first one and does not save the second one" do + args = {"tags" => [START_TAG], ID_FIELD => "id123"} + first_event = event(args) + second_event = event(args) + + @filter.filter(first_event) + @filter.filter(second_event) + + insist { @filter.start_events.size } == 1 + insist { @filter.start_events["id123"].event } == first_event + end + end + end + + context "End event" do + describe "receiving an event with a valid end tag" do + describe "and without an id" do + it "does nothing" do + insist { @filter.start_events.size } == 0 + @filter.filter(end_event()) + insist { @filter.start_events.size } == 0 + end + end + + describe "and with an id" do + describe "but without a previous 'start event'" do + it "adds a tag 'elapsed.end_witout_start' to the 'end event'" do + end_event = end_event(ID_FIELD => "id_123") + + @filter.filter(end_event) + + insist { end_event["tags"].include?("elapsed.end_wtihout_start") } == true + end + end + end + end + end + + context "Start/end events interaction" do + describe "receiving a 'start event'" do + before(:each) do + @id_value = "id_123" + @start_event = start_event(ID_FIELD => @id_value) + @filter.filter(@start_event) + end + + describe "and receiving an event with a valid end tag" do + describe "and without an id" do + it "does nothing" do + @filter.filter(end_event()) + insist { @filter.start_events.size } == 1 + insist { @filter.start_events[@id_value].event } == @start_event + end + end + + describe "and an id different from the one of the 'start event'" do + it "does nothing" do + different_id_value = @id_value + "_different" + @filter.filter(end_event(ID_FIELD => different_id_value)) + + insist { @filter.start_events.size } == 1 + insist { @filter.start_events[@id_value].event } == @start_event + end + end + + describe "and the same id of the 'start event'" do + it "deletes the recorded 'start event'" do + insist { @filter.start_events.size } == 1 + + @filter.filter(end_event(ID_FIELD => @id_value)) + + insist { @filter.start_events.size } == 0 + end + + shared_examples_for "match event" do + it "contains the tag 'elapsed'" do + insist { @match_event["tags"].include?("elapsed") } == true + end + + it "contains the tag tag 'elapsed.match'" do + insist { @match_event["tags"].include?("elapsed.match") } == true + end + + it "contains an 'elapsed.time field' with the elapsed time" do + insist { @match_event["elapsed.time"] } == 10 + end + + it "contains an 'elapsed.timestamp_start field' with the timestamp of the 'start event'" do + insist { @match_event["elapsed.timestamp_start"] } == @start_event["@timestamp"] + end + + it "contains an 'id field'" do + insist { @match_event[ID_FIELD] } == @id_value + end + end + + context "if 'new_event_on_match' is set to 'true'" do + before(:each) do + # I need to create a new filter because I need to set + # the config property 'new_event_on_match" to 'true'. + setup_filter("new_event_on_match" => true) + @start_event = start_event(ID_FIELD => @id_value) + @filter.filter(@start_event) + + end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + @filter.filter(end_event) do |new_event| + @match_event = new_event + end + end + + context "creates a new event that" do + it_behaves_like "match event" + + it "contains the 'host field'" do + insist { @match_event["host"] } == Socket.gethostname + end + end + end + + context "if 'new_event_on_match' is set to 'false'" do + before(:each) do + end_timestamp = @start_event["@timestamp"] + 10 + end_event = end_event(ID_FIELD => @id_value, "@timestamp" => end_timestamp) + @filter.filter(end_event) + + @match_event = end_event + end + + context "modifies the 'end event' that" do + it_behaves_like "match event" + end + end + + end + end + end + end + + describe "#flush" do + def setup(timeout = 1000) + @config["timeout"] = timeout + @filter = LogStash::Filters::Elapsed.new(@config) + @filter.register + + @start_event_1 = start_event(ID_FIELD => "1") + @start_event_2 = start_event(ID_FIELD => "2") + @start_event_3 = start_event(ID_FIELD => "3") + + @filter.filter(@start_event_1) + @filter.filter(@start_event_2) + @filter.filter(@start_event_3) + + # Force recorded events to different ages + @filter.start_events["2"].age = 25 + @filter.start_events["3"].age = 26 + end + + it "increments the 'age' of all the recorded 'start events' by 5 seconds" do + setup() + old_age = ages() + + @filter.flush() + + ages().each_with_index do |new_age, i| + insist { new_age } == (old_age[i] + 5) + end + end + + def ages() + @filter.start_events.each_value.map{|element| element.age } + end + + context "if the 'timeout interval' is set to 30 seconds" do + before(:each) do + setup(30) + + @expired_events = @filter.flush() + + insist { @filter.start_events.size } == 1 + insist { @expired_events.size } == 2 + end + + it "deletes the recorded 'start events' with 'age' greater, or equal to, the timeout" do + insist { @filter.start_events.key?("1") } == true + insist { @filter.start_events.key?("2") } == false + insist { @filter.start_events.key?("3") } == false + end + + it "creates a new event with tag 'elapsed.expired_error' for each expired 'start event'" do + insist { @expired_events[0]["tags"].include?("elapsed.expired_error") } == true + insist { @expired_events[1]["tags"].include?("elapsed.expired_error") } == true + end + + it "creates a new event with tag 'elapsed' for each expired 'start event'" do + insist { @expired_events[0]["tags"].include?("elapsed") } == true + insist { @expired_events[1]["tags"].include?("elapsed") } == true + end + + it "creates a new event containing the 'id field' of the expired 'start event'" do + insist { @expired_events[0][ID_FIELD] } == "2" + insist { @expired_events[1][ID_FIELD] } == "3" + end + + it "creates a new event containing an 'elapsed.time field' with the age of the expired 'start event'" do + insist { @expired_events[0]["elapsed.time"] } == 30 + insist { @expired_events[1]["elapsed.time"] } == 31 + end + + it "creates a new event containing an 'elapsed.timestamp_start field' with the timestamp of the expired 'start event'" do + insist { @expired_events[0]["elapsed.timestamp_start"] } == @start_event_2["@timestamp"] + insist { @expired_events[1]["elapsed.timestamp_start"] } == @start_event_3["@timestamp"] + end + + it "creates a new event containing a 'host field' for each expired 'start event'" do + insist { @expired_events[0]["host"] } == Socket.gethostname + insist { @expired_events[1]["host"] } == Socket.gethostname + end + end + end +end