mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
- fix redis tests
This commit is contained in:
parent
a4aa87e691
commit
0ec869bca0
2 changed files with 12 additions and 46 deletions
|
@ -5,23 +5,21 @@ def populate(key, event_count)
|
|||
require "logstash/event"
|
||||
redis = Redis.new(:host => "localhost")
|
||||
event_count.times do |value|
|
||||
event = LogStash::Event.new("@fields" => { "sequence" => value })
|
||||
event = LogStash::Event.new("sequence" => value)
|
||||
Stud::try(10.times) do
|
||||
redis.rpush(key, event.to_json)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def process(plugins, event_count)
|
||||
def process(pipeline, queue, event_count)
|
||||
sequence = 0
|
||||
redis = plugins.first
|
||||
output = Shiftback.new do |event|
|
||||
insist { event["sequence"] } == sequence
|
||||
sequence += 1
|
||||
redis.teardown if sequence == event_count
|
||||
Thread.new { pipeline.run }
|
||||
event_count.times do |i|
|
||||
event = queue.pop
|
||||
insist { event["sequence"] } == i
|
||||
end
|
||||
redis.register
|
||||
redis.run(output)
|
||||
pipeline.shutdown
|
||||
end # process
|
||||
|
||||
describe "inputs/redis" do
|
||||
|
@ -36,13 +34,13 @@ describe "inputs/redis" do
|
|||
type => "blah"
|
||||
key => "#{key}"
|
||||
data_type => "list"
|
||||
format => json_event
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
before(:each) { populate(key, event_count) }
|
||||
input { |plugins| process(plugins, event_count) }
|
||||
|
||||
input { |pipeline, queue| process(pipeline, queue, event_count) }
|
||||
end
|
||||
|
||||
describe "read events from a list with batch_count=5" do
|
||||
|
@ -55,12 +53,11 @@ describe "inputs/redis" do
|
|||
key => "#{key}"
|
||||
data_type => "list"
|
||||
batch_count => #{rand(20)+1}
|
||||
format => json_event
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
before(:each) { populate(key, event_count) }
|
||||
input { |plugins| process(plugins, event_count) }
|
||||
input { |pipeline, queue| process(pipeline, queue, event_count) }
|
||||
end
|
||||
end
|
||||
|
|
|
@ -38,8 +38,7 @@ describe LogStash::Outputs::Redis do
|
|||
id, element = redis.blpop(key, 0)
|
||||
event = LogStash::Event.new(JSON.parse(element))
|
||||
insist { event["sequence"] } == value
|
||||
insist { event.message } == "hello world"
|
||||
insist { event.type } == "generator"
|
||||
insist { event["message"] } == "hello world"
|
||||
end
|
||||
|
||||
# The list should now be empty
|
||||
|
@ -47,35 +46,6 @@ describe LogStash::Outputs::Redis do
|
|||
end # agent
|
||||
end
|
||||
|
||||
describe "skips a message which can't be encoded as json" do
|
||||
key = 10.times.collect { rand(10).to_s }.join("")
|
||||
|
||||
config <<-CONFIG
|
||||
input {
|
||||
generator {
|
||||
message => "\xAD\u0000"
|
||||
count => 1
|
||||
type => "generator"
|
||||
}
|
||||
}
|
||||
output {
|
||||
redis {
|
||||
host => "127.0.0.1"
|
||||
key => "#{key}"
|
||||
data_type => list
|
||||
}
|
||||
}
|
||||
CONFIG
|
||||
|
||||
agent do
|
||||
# Query redis directly and inspect the goodness.
|
||||
redis = Redis.new(:host => "127.0.0.1")
|
||||
|
||||
# The list should contain no elements.
|
||||
insist { redis.llen(key) } == 0
|
||||
end # agent
|
||||
end
|
||||
|
||||
describe "batch mode" do
|
||||
key = 10.times.collect { rand(10).to_s }.join("")
|
||||
event_count = 200000
|
||||
|
@ -116,8 +86,7 @@ describe LogStash::Outputs::Redis do
|
|||
id, element = redis.blpop(key, 0)
|
||||
event = LogStash::Event.new(JSON.parse(element))
|
||||
insist { event["sequence"] } == value
|
||||
insist { event.message } == "hello world"
|
||||
insist { event.type } == "generator"
|
||||
insist { event["message"] } == "hello world"
|
||||
end
|
||||
|
||||
# The list should now be empty
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue