mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
add test case, found some issue and fixed them.
This commit is contained in:
parent
4920d04d45
commit
6cf2e57a19
2 changed files with 123 additions and 5 deletions
|
@ -31,7 +31,7 @@ require "logstash/namespace"
|
||||||
class LogStash::Filters::Sort < LogStash::Filters::Base
|
class LogStash::Filters::Sort < LogStash::Filters::Base
|
||||||
|
|
||||||
config_name "sort"
|
config_name "sort"
|
||||||
plugin_status "experimental"
|
milestone 3
|
||||||
|
|
||||||
config :sortSize, :validate => :number, :default => 1000
|
config :sortSize, :validate => :number, :default => 1000
|
||||||
config :sortInterval, :validate => :string, :default => "1m"
|
config :sortInterval, :validate => :string, :default => "1m"
|
||||||
|
@ -64,20 +64,23 @@ class LogStash::Filters::Sort < LogStash::Filters::Base
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
# if the event is sorted, a "sorted" tag will be marked.
|
# if the event is sorted, a "sorted" tag will be marked, so for those unsorted event, cancel them first.
|
||||||
if (!event.tags.include?"sorted")
|
if event.tags.nil? || !event.tags.include?("sorted")
|
||||||
event.cancel
|
event.cancel
|
||||||
else
|
else
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
@mutex.synchronize{
|
@mutex.synchronize{
|
||||||
|
@sortingArray.push(event.clone)
|
||||||
|
|
||||||
if (@sortingArray.length == @sortSize)
|
if (@sortingArray.length == @sortSize)
|
||||||
sort
|
sort
|
||||||
end
|
end
|
||||||
|
|
||||||
if (@sortingDone)
|
if (@sortingDone)
|
||||||
while sortedEvent = @sortingArray.pop
|
while sortedEvent = @sortingArray.pop
|
||||||
|
sortedEvent.tags = Array.new if sortedEvent.tags.nil?
|
||||||
sortedEvent.tags << "sorted"
|
sortedEvent.tags << "sorted"
|
||||||
filter_matched(sortedEvent)
|
filter_matched(sortedEvent)
|
||||||
yield sortedEvent
|
yield sortedEvent
|
||||||
|
@ -85,8 +88,6 @@ class LogStash::Filters::Sort < LogStash::Filters::Base
|
||||||
# reset sortingDone flag
|
# reset sortingDone flag
|
||||||
@sortingDone = false
|
@sortingDone = false
|
||||||
end
|
end
|
||||||
|
|
||||||
@sortingArray.push(event.clone)
|
|
||||||
}
|
}
|
||||||
end # def filter
|
end # def filter
|
||||||
|
|
||||||
|
|
117
spec/filters/sort.rb
Normal file
117
spec/filters/sort.rb
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
require "test_utils"
|
||||||
|
require "logstash/filters/sort"
|
||||||
|
|
||||||
|
describe LogStash::Filters::Sort do
|
||||||
|
extend LogStash::RSpec
|
||||||
|
|
||||||
|
describe "sort when sortSize is full" do
|
||||||
|
config <<-CONFIG
|
||||||
|
filter {
|
||||||
|
sort {
|
||||||
|
sortSize => 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
events = [
|
||||||
|
{
|
||||||
|
"@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||||
|
"message" => "later message"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||||
|
"message" => "earlier message"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
sample(events) do
|
||||||
|
insist { subject }.is_a? Array
|
||||||
|
insist { subject.length } == 2
|
||||||
|
subject.each_with_index do |s,i|
|
||||||
|
if i == 0 # first one should be the earlier message
|
||||||
|
insist { s["message"] } == "earlier message"
|
||||||
|
end
|
||||||
|
if i == 1 # second one should be the later message
|
||||||
|
insist { s["message"]} == "later message"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "sort by desc" do
|
||||||
|
config <<-CONFIG
|
||||||
|
filter {
|
||||||
|
sort {
|
||||||
|
sortSize => 3
|
||||||
|
sortBy => "desc"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CONFIG
|
||||||
|
|
||||||
|
events = [
|
||||||
|
{
|
||||||
|
"@timestamp" => Time.iso8601("2013-01-03T00:00:00.000Z"),
|
||||||
|
"message" => "third message"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||||
|
"message" => "first message"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||||
|
"message" => "second message"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
|
||||||
|
sample(events) do
|
||||||
|
insist { subject }.is_a? Array
|
||||||
|
insist { subject.length } == 3
|
||||||
|
subject.each_with_index do |s,i|
|
||||||
|
if i == 0 # first one should be the third message
|
||||||
|
insist { s["message"] } == "third message"
|
||||||
|
end
|
||||||
|
if i == 1 # second one should be the second message
|
||||||
|
insist { s["message"]} == "second message"
|
||||||
|
end
|
||||||
|
if i == 2 # third one should be the third message
|
||||||
|
insist { s["message"]} == "first message"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# describe "sort when sort interval reached" do
|
||||||
|
# config <<-CONFIG
|
||||||
|
# filter {
|
||||||
|
# sort {
|
||||||
|
# sortInterval => "1s"
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# CONFIG
|
||||||
|
|
||||||
|
# events = [
|
||||||
|
# {
|
||||||
|
# "@timestamp" => Time.iso8601("2013-01-02T00:00:00.000Z"),
|
||||||
|
# "message" => "later message"
|
||||||
|
# },
|
||||||
|
# {
|
||||||
|
# "@timestamp" => Time.iso8601("2013-01-01T00:00:00.000Z"),
|
||||||
|
# "message" => "earlier message"
|
||||||
|
# }
|
||||||
|
# ]
|
||||||
|
|
||||||
|
# sample(events) do
|
||||||
|
# sleep(2)
|
||||||
|
# insist { subject }.is_a? Array
|
||||||
|
# insist { subject.length } == 2
|
||||||
|
# subject.each_with_index do |s,i|
|
||||||
|
# if i == 0 # first one should be the earlier message
|
||||||
|
# insist { s["message"] } == "earlier message"
|
||||||
|
# end
|
||||||
|
# if i == 1 # second one should be the later message
|
||||||
|
# insist { s["message"]} == "later message"
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
# end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue