mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- finally fix the elasticsearch output test
This commit is contained in:
parent
adc39d1932
commit
e0e10882f3
1 changed files with 41 additions and 73 deletions
|
@ -10,68 +10,31 @@ require "logstash/outputs/elasticsearch"
|
|||
require "logstash/search/elasticsearch"
|
||||
require "logstash/search/query"
|
||||
|
||||
require "spoon" # rubygem 'spoon' - implements posix_spawn via FFI
|
||||
require "tmpdir"
|
||||
#require "spoon" # rubygem 'spoon' - implements posix_spawn via FFI
|
||||
|
||||
class TestOutputElasticSearch < Test::Unit::TestCase
|
||||
ELASTICSEARCH_VERSION = "0.16.0"
|
||||
|
||||
def setup
|
||||
#start_elasticsearch
|
||||
#@cluster = "logstash-test-1234"
|
||||
|
||||
@output = LogStash::Outputs::Elasticsearch.new({
|
||||
#"host" => ["localhost"],
|
||||
#"index" => ["test"],
|
||||
"type" => ["foo"],
|
||||
"embedded" => ["true"],
|
||||
#"cluster" => [@cluster],
|
||||
})
|
||||
@output.register
|
||||
@tmpdir = Dir.mktmpdir
|
||||
Dir.chdir(@tmpdir) do
|
||||
@output = LogStash::Outputs::Elasticsearch.new({
|
||||
"type" => ["foo"],
|
||||
"embedded" => ["true"],
|
||||
})
|
||||
@output.register
|
||||
end
|
||||
end # def setup
|
||||
|
||||
def start_elasticsearch
|
||||
# install
|
||||
version = self.class::ELASTICSEARCH_VERSION
|
||||
system("make -C #{File.dirname(__FILE__)}/../../setup/elasticsearch/ init-elasticsearch-#{version} wipe-elasticsearch-#{version} #{$DEBUG ? "" : "> /dev/null 2>&1"}")
|
||||
|
||||
#1.upto(30) do
|
||||
# Pick a random port
|
||||
#@port_http = (rand * 30000 + 20000).to_i
|
||||
#@port_tcp = (rand * 30000 + 20000).to_i
|
||||
#end # try a few times to launch ES on a random port.
|
||||
|
||||
# Listen on random ports, I don't need them anyway.
|
||||
@port_http = 0
|
||||
@port_tcp = 0
|
||||
|
||||
teardown if @es_pid
|
||||
@cluster = "logstash-test-#{$$}"
|
||||
|
||||
puts "Starting ElasticSearch #{version}"
|
||||
@clusterflags = "-Des.cluster.name=#{@cluster}"
|
||||
|
||||
ENV["ESFLAGS"] = "-Des.http.port=#{@port_http} -Des.transport.tcp.port=#{@port_tcp} "
|
||||
ENV["ESFLAGS"] += @clusterflags
|
||||
ENV["ESFLAGS"] += " > /dev/null 2>&1" if !$DEBUG
|
||||
cmd = ["make", "-C", "#{File.dirname(__FILE__)}/../../setup/elasticsearch/",]
|
||||
cmd << "-s" if !$DEBUG
|
||||
cmd << "run-elasticsearch-#{version}"
|
||||
@es_pid = Spoon.spawnp(*cmd)
|
||||
|
||||
# Assume it's up and happy, or will be.
|
||||
#raise "ElasticSearch failed to start or was otherwise not running properly?"
|
||||
end # def start_elasticsearch
|
||||
|
||||
def teardown
|
||||
# Kill the whole process group for elasticsearch
|
||||
#Process.kill("KILL", -1 * @es_pid) rescue nil
|
||||
#Process.kill("KILL", @es_pid) rescue nil
|
||||
|
||||
# TODO(sissel): Until I fix the way elasticsearch server is run,
|
||||
# we'll use pkill...
|
||||
#system("pkill -9 -f 'java.*#{@clusterflags}.*Bootstrap'")
|
||||
#
|
||||
@output.teardown
|
||||
if @tmpdir !~ /^\/tmp/
|
||||
$stderr.puts("Tempdir is '#{@tmpdir}' - not in /tmp, I won't " \
|
||||
"remove in case it's not safe.")
|
||||
else
|
||||
FileUtils.rm_r(@tmpdir)
|
||||
end
|
||||
end # def teardown
|
||||
|
||||
def test_elasticsearch_basic
|
||||
|
@ -89,32 +52,37 @@ class TestOutputElasticSearch < Test::Unit::TestCase
|
|||
end
|
||||
|
||||
tries = 30
|
||||
es = LogStash::Search::ElasticSearch.new(:cluster => @cluster)
|
||||
loop do
|
||||
es = LogStash::Search::ElasticSearch.new(:type => :local)
|
||||
while tries > 0
|
||||
tries -= 1
|
||||
puts "Tries left: #{tries}" if $DEBUG
|
||||
query = LogStash::Search::Query.new(:query_string => "*", :count => 5)
|
||||
es.search(query, async=false) do |result|
|
||||
if events.size == result.events.size
|
||||
puts "Found #{result.events.size} events, ready to verify!"
|
||||
expected = events.clone
|
||||
assert_equal(events.size, result.events.size)
|
||||
#events.each { |e| p :expect => e }
|
||||
result.events.each do |event|
|
||||
assert(expected.include?(event), "Found event in results that was not expected: #{event.inspect}\n\nExpected: #{events.map{ |a| a.inspect }.join("\n")}")
|
||||
end
|
||||
begin
|
||||
es.search(query, async=false) do |result|
|
||||
if events.size == result.events.size
|
||||
puts "Found #{result.events.size} events, ready to verify!"
|
||||
expected = events.clone
|
||||
assert_equal(events.size, result.events.size)
|
||||
#events.each { |e| p :expect => e }
|
||||
result.events.each do |event|
|
||||
assert(expected.include?(event), "Found event in results that was not expected: #{event.inspect}\n\nExpected: #{events.map{ |a| a.inspect }.join("\n")}")
|
||||
end
|
||||
|
||||
return
|
||||
else
|
||||
tries -= 1
|
||||
if tries <= 0
|
||||
assert(false, "Gave up trying to query elasticsearch. Maybe we aren't indexing properly?")
|
||||
return
|
||||
end
|
||||
end # if events.size == hits.size
|
||||
end # es.search
|
||||
else
|
||||
tries -= 1
|
||||
if tries <= 0
|
||||
assert(false, "Gave up trying to query elasticsearch. Maybe we aren't indexing properly?")
|
||||
return
|
||||
end
|
||||
end # if events.size == hits.size
|
||||
end # es.search
|
||||
rescue org.elasticsearch.action.search.SearchPhaseExecutionException => e
|
||||
# ignore
|
||||
end
|
||||
|
||||
sleep 0.2
|
||||
end # loop
|
||||
end # while tries > 0
|
||||
end # def test_elasticsearch_basic
|
||||
end # class TestOutputElasticSearch
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue