From 0ec869bca0825c6308af37483520db9d899058a3 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Tue, 20 Aug 2013 22:59:04 -0700 Subject: [PATCH] - fix redis tests --- spec/inputs/redis.rb | 23 ++++++++++------------- spec/outputs/redis.rb | 35 ++--------------------------------- 2 files changed, 12 insertions(+), 46 deletions(-) diff --git a/spec/inputs/redis.rb b/spec/inputs/redis.rb index 6a145ef84..bec6d6a45 100644 --- a/spec/inputs/redis.rb +++ b/spec/inputs/redis.rb @@ -5,23 +5,21 @@ def populate(key, event_count) require "logstash/event" redis = Redis.new(:host => "localhost") event_count.times do |value| - event = LogStash::Event.new("@fields" => { "sequence" => value }) + event = LogStash::Event.new("sequence" => value) Stud::try(10.times) do redis.rpush(key, event.to_json) end end end -def process(plugins, event_count) +def process(pipeline, queue, event_count) sequence = 0 - redis = plugins.first - output = Shiftback.new do |event| - insist { event["sequence"] } == sequence - sequence += 1 - redis.teardown if sequence == event_count + Thread.new { pipeline.run } + event_count.times do |i| + event = queue.pop + insist { event["sequence"] } == i end - redis.register - redis.run(output) + pipeline.shutdown end # process describe "inputs/redis" do @@ -36,13 +34,13 @@ describe "inputs/redis" do type => "blah" key => "#{key}" data_type => "list" - format => json_event } } CONFIG before(:each) { populate(key, event_count) } - input { |plugins| process(plugins, event_count) } + + input { |pipeline, queue| process(pipeline, queue, event_count) } end describe "read events from a list with batch_count=5" do @@ -55,12 +53,11 @@ describe "inputs/redis" do key => "#{key}" data_type => "list" batch_count => #{rand(20)+1} - format => json_event } } CONFIG before(:each) { populate(key, event_count) } - input { |plugins| process(plugins, event_count) } + input { |pipeline, queue| process(pipeline, queue, event_count) } end end diff --git a/spec/outputs/redis.rb b/spec/outputs/redis.rb index 2a3edf040..3a50e6a4e 100644 --- a/spec/outputs/redis.rb +++ b/spec/outputs/redis.rb @@ -38,8 +38,7 @@ describe LogStash::Outputs::Redis do id, element = redis.blpop(key, 0) event = LogStash::Event.new(JSON.parse(element)) insist { event["sequence"] } == value - insist { event.message } == "hello world" - insist { event.type } == "generator" + insist { event["message"] } == "hello world" end # The list should now be empty @@ -47,35 +46,6 @@ describe LogStash::Outputs::Redis do end # agent end - describe "skips a message which can't be encoded as json" do - key = 10.times.collect { rand(10).to_s }.join("") - - config <<-CONFIG - input { - generator { - message => "\xAD\u0000" - count => 1 - type => "generator" - } - } - output { - redis { - host => "127.0.0.1" - key => "#{key}" - data_type => list - } - } - CONFIG - - agent do - # Query redis directly and inspect the goodness. - redis = Redis.new(:host => "127.0.0.1") - - # The list should contain no elements. - insist { redis.llen(key) } == 0 - end # agent - end - describe "batch mode" do key = 10.times.collect { rand(10).to_s }.join("") event_count = 200000 @@ -116,8 +86,7 @@ describe LogStash::Outputs::Redis do id, element = redis.blpop(key, 0) event = LogStash::Event.new(JSON.parse(element)) insist { event["sequence"] } == value - insist { event.message } == "hello world" - insist { event.type } == "generator" + insist { event["message"] } == "hello world" end # The list should now be empty