diff --git a/lib/logstash/filters/sort.rb b/lib/logstash/filters/sort.rb new file mode 100644 index 000000000..295606a88 --- /dev/null +++ b/lib/logstash/filters/sort.rb @@ -0,0 +1,113 @@ +require "logstash/filters/base" +require "logstash/namespace" + +# The sort filter is for sorting a amount of events or a period of events by timestamp. +# +# The original goal of this filter was to merge the logs from different sources by the time of log, +# for example, in real-time log collection, logs can be sorted by amount of 3000 logs or +# can be sorted in 30 seconds. +# +# The config looks like this: +# +# filter { +# sort { +# sortSize => 3000 +# sortInterval => "30s" +# sortBy => "asce" +# } +# } +class LogStash::Filters::Sort < LogStash::Filters::Base + + config_name "sort" + milestone 1 + + # The 'sortSize' is the window size which how many logs should be sorted.(default 1000) + config :sortSize, :validate => :number, :default => 1000 + + # The 'sortInterval' is the time window which how long the logs should be sorted. (default 1m) + config :sortInterval, :validate => :string, :default => "1m" + + # The 'sortBy' can only be "asce" or "desc" (defaults asce), sorted by timestamp asce or desc. + config :sortBy, :validate => ["asce", "desc"], :default => "asce" + + public + def register + require "thread" + require "rufus/scheduler" + + @mutex = Mutex.new + @sortingDone = false + @sortingArray = Array.new + @scheduler = Rufus::Scheduler.start_new + @job = @scheduler.every @sortInterval do + @logger.info("Scheduler Activated") + @mutex.synchronize{ + sort + } + end + end # def register + + public + def filter(event) + @logger.info("do sort filter") + if event == LogStash::SHUTDOWN + @job.trigger() + @job.unschedule() + @logger.info("sort filter thread shutdown.") + return + end + + # if the event is sorted, a "sorted" tag will be marked, so for those unsorted event, cancel them first. + if event.tags.nil? || !event.tags.include?("sorted") + event.cancel + else + return + end + + @mutex.synchronize{ + @sortingArray.push(event.clone) + + if (@sortingArray.length == @sortSize) + sort + end + + if (@sortingDone) + while sortedEvent = @sortingArray.pop + sortedEvent.tags = Array.new if sortedEvent.tags.nil? + sortedEvent.tags << "sorted" + filter_matched(sortedEvent) + yield sortedEvent + end # while @sortingArray.pop + # reset sortingDone flag + @sortingDone = false + end + } + end # def filter + + private + def sort + if (@sortBy == "asce") + @sortingArray.sort! { |eventA, eventB| eventB.timestamp <=> eventA.timestamp } + else + @sortingArray.sort! { |eventA, eventB| eventA.timestamp <=> eventB.timestamp } + end + @sortingDone = true + end # def sort + + # Flush any pending messages. + public + def flush + events = [] + if (@sortingDone) + @mutex.synchronize{ + while sortedEvent = @sortingArray.pop + sortedEvent.tags << "sorted" + events << sortedEvent + end # while @sortingArray.pop + } + # reset sortingDone flag. + @sortingDone = false + end + return events + end # def flush +end # \ No newline at end of file diff --git a/spec/filters/sort.rb b/spec/filters/sort.rb new file mode 100644 index 000000000..ff1c29ffc --- /dev/null +++ b/spec/filters/sort.rb @@ -0,0 +1,122 @@ +require "test_utils" +require "logstash/filters/sort" + +describe LogStash::Filters::Sort do + extend LogStash::RSpec + + describe "sort when sortSize is full" do + config <<-CONFIG + filter { + sort { + sortSize => 2 + } + } + CONFIG + + events = [ + { + "@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"), + "message" => "later message" + }, + { + "@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"), + "message" => "earlier message" + } + ] + + sample(events) do + insist { subject }.is_a? Array + insist { subject.length } == 2 + subject.each_with_index do |s,i| + if i == 0 # first one should be the earlier message + insist { s["message"] } == "earlier message" + end + if i == 1 # second one should be the later message + insist { s["message"]} == "later message" + end + end + end + end + + describe "sort by desc" do + config <<-CONFIG + filter { + sort { + sortSize => 3 + sortBy => "desc" + } + } + CONFIG + + events = [ + { + "@timestamp" => Time.iso8601("2013-01-03T00:00:00.000Z"), + "message" => "third message" + }, + { + "@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"), + "message" => "first message" + }, + { + "@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"), + "message" => "second message" + } + ] + + sample(events) do + insist { subject }.is_a? Array + insist { subject.length } == 3 + subject.each_with_index do |s,i| + if i == 0 # first one should be the third message + insist { s["message"] } == "third message" + end + if i == 1 # second one should be the second message + insist { s["message"]} == "second message" + end + if i == 2 # third one should be the third message + insist { s["message"]} == "first message" + end + end + end + end + + # (Ignored) Currently this case can't pass because of the case depends on the flush function of the filter in the test, + # there was a TODO marked in the code (test_utils.rb, # TODO(sissel): pipeline flush needs to be implemented.), + # and the case wants to test the scenario which sort was triggered by a scheduler, so in this case, it needs to sleep few seconds + # waiting the scheduler triggered, and after the events were flushed, then the result can be checked. + + # describe "sort when sort interval reached" do + # config <<-CONFIG + # filter { + # sort { + # sortInterval => "1s" + # } + # } + # CONFIG + + # events = [ + # { + # "@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"), + # "message" => "later message" + # }, + # { + # "@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"), + # "message" => "earlier message" + # } + # ] + + # sample(events) do + # sleep(2) + # insist { subject }.is_a? Array + # insist { subject.length } == 2 + # subject.each_with_index do |s,i| + # if i == 0 # first one should be the earlier message + # insist { s["message"] } == "earlier message" + # end + # if i == 1 # second one should be the later message + # insist { s["message"]} == "later message" + # end + # end + # end + # end +end