From 973bb06ec4a7cee7a55b5edb4bf8ca6fe31ef15b Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Fri, 26 Sep 2014 13:57:19 -0400 Subject: [PATCH] track per second tps and report top & avg tps Fixes #1773 --- test/integration/run.rb | 53 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/test/integration/run.rb b/test/integration/run.rb index ad94d5fba..efa2cabef 100644 --- a/test/integration/run.rb +++ b/test/integration/run.rb @@ -1,10 +1,12 @@ # encoding: utf-8 require "benchmark" +require "thread" INITIAL_MESSAGE = ">>> lorem ipsum start".freeze LAST_MESSAGE = ">>> lorem ipsum stop".freeze LOGSTASH_BIN = File.join(File.expand_path("../../../bin/", __FILE__), "logstash") +REFRESH_COUNT = 100 Thread.abort_on_exception = true @@ -37,9 +39,40 @@ def feed_input_interval(io, seconds, lines, last_message) count end -def output_reader(io, regex) +# below stats counter and output reader threads are sharing state using +# the @stats_lock mutex, @stats_count and @stats. this is a bit messy and should be +# refactored into a proper class eventually + +def detach_stats_counter + Thread.new do + loop do + start = @stats_lock.synchronize{@stats_count} + sleep(1) + @stats_lock.synchronize{@stats << (@stats_count - start)} + end + end +end + +# detach_output_reader spawns a thread that will fill in the @stats instance var with tps samples for every seconds +# @stats access is synchronized using the @stats_lock mutex but can be safely used +# once the output reader thread is completed. +def detach_output_reader(io, regex) Thread.new(io, regex) do |io, regex| - expect_output(io, regex) + i = 0 + @stats = [] + @stats_count = 0 + @stats_lock = Mutex.new + t = detach_stats_counter + + expect_output(io, regex) do + i += 1 + # avoid mutex synchronize on every loop cycle, using REFRESH_COUNT = 100 results in + # much lower mutex overhead and still provides a good resolution since we are typically + # have 2000..100000 tps + @stats_lock.synchronize{@stats_count = i} if (i % REFRESH_COUNT) == 0 + end + + @stats_lock.synchronize{t.kill} end end @@ -50,10 +83,16 @@ end def expect_output(io, regex) io.each_line do |line| puts("received: #{line}") if @debug + yield if block_given? break if line =~ regex end end +def percentile(array, percentile) + count = (array.length * (1.0 - percentile)).floor + array.sort[-count..-1] +end + # ## script main @@ -90,7 +129,7 @@ required_events_count = options[:events].to_i # total number of events to feed, required_run_time = options[:time].to_i input_lines = read_input_file(options[:input]) -puts("will run with config file=#{options[:config]}, input file=#{options[:input]}") if @debug +puts("using config file=#{options[:config]}, input file=#{options[:input]}") if @debug command = [LOGSTASH_BIN, "-f", options[:config], "2>&1"] puts("launching #{command.join(" ")}") if @debug @@ -106,7 +145,7 @@ IO.popen(command.join(" "), "r+") do |io| expect_output(io, /#{INITIAL_MESSAGE}/) puts("starting output reader thread") if @debug - @reader = output_reader(io, /#{LAST_MESSAGE}/) + reader = detach_output_reader(io, /#{LAST_MESSAGE}/) puts("starting feeding input") if @debug elaspsed = Benchmark.realtime do @@ -117,8 +156,10 @@ IO.popen(command.join(" "), "r+") do |io| end puts("waiting for output reader to complete") if @debug - @reader.join + reader.join end - puts("elaspsed=#{"%.2f" % elaspsed}s, events=#{real_events_count}, tps=#{"%.0f" % (real_events_count / elaspsed)}") + # the reader thread updates the @stats tps array + p = percentile(@stats, 0.70) + puts("elaspsed=#{"%.2f" % elaspsed}s, events=#{real_events_count}, avg tps=#{"%.0f" % (real_events_count / elaspsed)}, avg top 30% tps=#{"%.0f" % (p.reduce(:+) / p.size)}, best tps=#{p.last}") end