mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
commit
305f699451
2 changed files with 235 additions and 0 deletions
113
lib/logstash/filters/sort.rb
Normal file
113
lib/logstash/filters/sort.rb
Normal file
|
@ -0,0 +1,113 @@
|
|||
require "logstash/filters/base"
|
||||
require "logstash/namespace"
|
||||
|
||||
# The sort filter is for sorting a amount of events or a period of events by timestamp.
|
||||
#
|
||||
# The original goal of this filter was to merge the logs from different sources by the time of log,
|
||||
# for example, in real-time log collection, logs can be sorted by amount of 3000 logs or
|
||||
# can be sorted in 30 seconds.
|
||||
#
|
||||
# The config looks like this:
|
||||
#
|
||||
# filter {
|
||||
# sort {
|
||||
# sortSize => 3000
|
||||
# sortInterval => "30s"
|
||||
# sortBy => "asce"
|
||||
# }
|
||||
# }
|
||||
class LogStash::Filters::Sort < LogStash::Filters::Base
|
||||
|
||||
config_name "sort"
|
||||
milestone 1
|
||||
|
||||
# The 'sortSize' is the window size which how many logs should be sorted.(default 1000)
|
||||
config :sortSize, :validate => :number, :default => 1000
|
||||
|
||||
# The 'sortInterval' is the time window which how long the logs should be sorted. (default 1m)
|
||||
config :sortInterval, :validate => :string, :default => "1m"
|
||||
|
||||
# The 'sortBy' can only be "asce" or "desc" (defaults asce), sorted by timestamp asce or desc.
|
||||
config :sortBy, :validate => ["asce", "desc"], :default => "asce"
|
||||
|
||||
public
|
||||
def register
|
||||
require "thread"
|
||||
require "rufus/scheduler"
|
||||
|
||||
@mutex = Mutex.new
|
||||
@sortingDone = false
|
||||
@sortingArray = Array.new
|
||||
@scheduler = Rufus::Scheduler.start_new
|
||||
@job = @scheduler.every @sortInterval do
|
||||
@logger.info("Scheduler Activated")
|
||||
@mutex.synchronize{
|
||||
sort
|
||||
}
|
||||
end
|
||||
end # def register
|
||||
|
||||
public
|
||||
def filter(event)
|
||||
@logger.info("do sort filter")
|
||||
if event == LogStash::SHUTDOWN
|
||||
@job.trigger()
|
||||
@job.unschedule()
|
||||
@logger.info("sort filter thread shutdown.")
|
||||
return
|
||||
end
|
||||
|
||||
# if the event is sorted, a "sorted" tag will be marked, so for those unsorted event, cancel them first.
|
||||
if event.tags.nil? || !event.tags.include?("sorted")
|
||||
event.cancel
|
||||
else
|
||||
return
|
||||
end
|
||||
|
||||
@mutex.synchronize{
|
||||
@sortingArray.push(event.clone)
|
||||
|
||||
if (@sortingArray.length == @sortSize)
|
||||
sort
|
||||
end
|
||||
|
||||
if (@sortingDone)
|
||||
while sortedEvent = @sortingArray.pop
|
||||
sortedEvent.tags = Array.new if sortedEvent.tags.nil?
|
||||
sortedEvent.tags << "sorted"
|
||||
filter_matched(sortedEvent)
|
||||
yield sortedEvent
|
||||
end # while @sortingArray.pop
|
||||
# reset sortingDone flag
|
||||
@sortingDone = false
|
||||
end
|
||||
}
|
||||
end # def filter
|
||||
|
||||
private
|
||||
def sort
|
||||
if (@sortBy == "asce")
|
||||
@sortingArray.sort! { |eventA, eventB| eventB.timestamp <=> eventA.timestamp }
|
||||
else
|
||||
@sortingArray.sort! { |eventA, eventB| eventA.timestamp <=> eventB.timestamp }
|
||||
end
|
||||
@sortingDone = true
|
||||
end # def sort
|
||||
|
||||
# Flush any pending messages.
|
||||
public
|
||||
def flush
|
||||
events = []
|
||||
if (@sortingDone)
|
||||
@mutex.synchronize{
|
||||
while sortedEvent = @sortingArray.pop
|
||||
sortedEvent.tags << "sorted"
|
||||
events << sortedEvent
|
||||
end # while @sortingArray.pop
|
||||
}
|
||||
# reset sortingDone flag.
|
||||
@sortingDone = false
|
||||
end
|
||||
return events
|
||||
end # def flush
|
||||
end #
|
122
spec/filters/sort.rb
Normal file
122
spec/filters/sort.rb
Normal file
|
@ -0,0 +1,122 @@
|
|||
require "test_utils"
|
||||
require "logstash/filters/sort"
|
||||
|
||||
describe LogStash::Filters::Sort do
|
||||
extend LogStash::RSpec
|
||||
|
||||
describe "sort when sortSize is full" do
|
||||
config <<-CONFIG
|
||||
filter {
|
||||
sort {
|
||||
sortSize => 2
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
events = [
|
||||
{
|
||||
"@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||
"message" => "later message"
|
||||
},
|
||||
{
|
||||
"@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||
"message" => "earlier message"
|
||||
}
|
||||
]
|
||||
|
||||
sample(events) do
|
||||
insist { subject }.is_a? Array
|
||||
insist { subject.length } == 2
|
||||
subject.each_with_index do |s,i|
|
||||
if i == 0 # first one should be the earlier message
|
||||
insist { s["message"] } == "earlier message"
|
||||
end
|
||||
if i == 1 # second one should be the later message
|
||||
insist { s["message"]} == "later message"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "sort by desc" do
|
||||
config <<-CONFIG
|
||||
filter {
|
||||
sort {
|
||||
sortSize => 3
|
||||
sortBy => "desc"
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
events = [
|
||||
{
|
||||
"@timestamp" => Time.iso8601("2013-01-03T00:00:00.000Z"),
|
||||
"message" => "third message"
|
||||
},
|
||||
{
|
||||
"@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||
"message" => "first message"
|
||||
},
|
||||
{
|
||||
"@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||
"message" => "second message"
|
||||
}
|
||||
]
|
||||
|
||||
sample(events) do
|
||||
insist { subject }.is_a? Array
|
||||
insist { subject.length } == 3
|
||||
subject.each_with_index do |s,i|
|
||||
if i == 0 # first one should be the third message
|
||||
insist { s["message"] } == "third message"
|
||||
end
|
||||
if i == 1 # second one should be the second message
|
||||
insist { s["message"]} == "second message"
|
||||
end
|
||||
if i == 2 # third one should be the third message
|
||||
insist { s["message"]} == "first message"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# (Ignored) Currently this case can't pass because of the case depends on the flush function of the filter in the test,
|
||||
# there was a TODO marked in the code (test_utils.rb, # TODO(sissel): pipeline flush needs to be implemented.),
|
||||
# and the case wants to test the scenario which sort was triggered by a scheduler, so in this case, it needs to sleep few seconds
|
||||
# waiting the scheduler triggered, and after the events were flushed, then the result can be checked.
|
||||
|
||||
# describe "sort when sort interval reached" do
|
||||
# config <<-CONFIG
|
||||
# filter {
|
||||
# sort {
|
||||
# sortInterval => "1s"
|
||||
# }
|
||||
# }
|
||||
# CONFIG
|
||||
|
||||
# events = [
|
||||
# {
|
||||
# "@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||
# "message" => "later message"
|
||||
# },
|
||||
# {
|
||||
# "@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||
# "message" => "earlier message"
|
||||
# }
|
||||
# ]
|
||||
|
||||
# sample(events) do
|
||||
# sleep(2)
|
||||
# insist { subject }.is_a? Array
|
||||
# insist { subject.length } == 2
|
||||
# subject.each_with_index do |s,i|
|
||||
# if i == 0 # first one should be the earlier message
|
||||
# insist { s["message"] } == "earlier message"
|
||||
# end
|
||||
# if i == 1 # second one should be the later message
|
||||
# insist { s["message"]} == "later message"
|
||||
# end
|
||||
# end
|
||||
# end
|
||||
# end
|
||||
end
|
Loading…
Add table
Add a link
Reference in a new issue