mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
parent
8ee1c73b95
commit
973bb06ec4
1 changed files with 47 additions and 6 deletions
|
@ -1,10 +1,12 @@
|
||||||
# encoding: utf-8
|
# encoding: utf-8
|
||||||
|
|
||||||
require "benchmark"
|
require "benchmark"
|
||||||
|
require "thread"
|
||||||
|
|
||||||
INITIAL_MESSAGE = ">>> lorem ipsum start".freeze
|
INITIAL_MESSAGE = ">>> lorem ipsum start".freeze
|
||||||
LAST_MESSAGE = ">>> lorem ipsum stop".freeze
|
LAST_MESSAGE = ">>> lorem ipsum stop".freeze
|
||||||
LOGSTASH_BIN = File.join(File.expand_path("../../../bin/", __FILE__), "logstash")
|
LOGSTASH_BIN = File.join(File.expand_path("../../../bin/", __FILE__), "logstash")
|
||||||
|
REFRESH_COUNT = 100
|
||||||
|
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
|
@ -37,9 +39,40 @@ def feed_input_interval(io, seconds, lines, last_message)
|
||||||
count
|
count
|
||||||
end
|
end
|
||||||
|
|
||||||
def output_reader(io, regex)
|
# below stats counter and output reader threads are sharing state using
|
||||||
|
# the @stats_lock mutex, @stats_count and @stats. this is a bit messy and should be
|
||||||
|
# refactored into a proper class eventually
|
||||||
|
|
||||||
|
def detach_stats_counter
|
||||||
|
Thread.new do
|
||||||
|
loop do
|
||||||
|
start = @stats_lock.synchronize{@stats_count}
|
||||||
|
sleep(1)
|
||||||
|
@stats_lock.synchronize{@stats << (@stats_count - start)}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# detach_output_reader spawns a thread that will fill in the @stats instance var with tps samples for every seconds
|
||||||
|
# @stats access is synchronized using the @stats_lock mutex but can be safely used
|
||||||
|
# once the output reader thread is completed.
|
||||||
|
def detach_output_reader(io, regex)
|
||||||
Thread.new(io, regex) do |io, regex|
|
Thread.new(io, regex) do |io, regex|
|
||||||
expect_output(io, regex)
|
i = 0
|
||||||
|
@stats = []
|
||||||
|
@stats_count = 0
|
||||||
|
@stats_lock = Mutex.new
|
||||||
|
t = detach_stats_counter
|
||||||
|
|
||||||
|
expect_output(io, regex) do
|
||||||
|
i += 1
|
||||||
|
# avoid mutex synchronize on every loop cycle, using REFRESH_COUNT = 100 results in
|
||||||
|
# much lower mutex overhead and still provides a good resolution since we are typically
|
||||||
|
# have 2000..100000 tps
|
||||||
|
@stats_lock.synchronize{@stats_count = i} if (i % REFRESH_COUNT) == 0
|
||||||
|
end
|
||||||
|
|
||||||
|
@stats_lock.synchronize{t.kill}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -50,10 +83,16 @@ end
|
||||||
def expect_output(io, regex)
|
def expect_output(io, regex)
|
||||||
io.each_line do |line|
|
io.each_line do |line|
|
||||||
puts("received: #{line}") if @debug
|
puts("received: #{line}") if @debug
|
||||||
|
yield if block_given?
|
||||||
break if line =~ regex
|
break if line =~ regex
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def percentile(array, percentile)
|
||||||
|
count = (array.length * (1.0 - percentile)).floor
|
||||||
|
array.sort[-count..-1]
|
||||||
|
end
|
||||||
|
|
||||||
#
|
#
|
||||||
## script main
|
## script main
|
||||||
|
|
||||||
|
@ -90,7 +129,7 @@ required_events_count = options[:events].to_i # total number of events to feed,
|
||||||
required_run_time = options[:time].to_i
|
required_run_time = options[:time].to_i
|
||||||
input_lines = read_input_file(options[:input])
|
input_lines = read_input_file(options[:input])
|
||||||
|
|
||||||
puts("will run with config file=#{options[:config]}, input file=#{options[:input]}") if @debug
|
puts("using config file=#{options[:config]}, input file=#{options[:input]}") if @debug
|
||||||
|
|
||||||
command = [LOGSTASH_BIN, "-f", options[:config], "2>&1"]
|
command = [LOGSTASH_BIN, "-f", options[:config], "2>&1"]
|
||||||
puts("launching #{command.join(" ")}") if @debug
|
puts("launching #{command.join(" ")}") if @debug
|
||||||
|
@ -106,7 +145,7 @@ IO.popen(command.join(" "), "r+") do |io|
|
||||||
expect_output(io, /#{INITIAL_MESSAGE}/)
|
expect_output(io, /#{INITIAL_MESSAGE}/)
|
||||||
|
|
||||||
puts("starting output reader thread") if @debug
|
puts("starting output reader thread") if @debug
|
||||||
@reader = output_reader(io, /#{LAST_MESSAGE}/)
|
reader = detach_output_reader(io, /#{LAST_MESSAGE}/)
|
||||||
puts("starting feeding input") if @debug
|
puts("starting feeding input") if @debug
|
||||||
|
|
||||||
elaspsed = Benchmark.realtime do
|
elaspsed = Benchmark.realtime do
|
||||||
|
@ -117,8 +156,10 @@ IO.popen(command.join(" "), "r+") do |io|
|
||||||
end
|
end
|
||||||
|
|
||||||
puts("waiting for output reader to complete") if @debug
|
puts("waiting for output reader to complete") if @debug
|
||||||
@reader.join
|
reader.join
|
||||||
end
|
end
|
||||||
|
|
||||||
puts("elaspsed=#{"%.2f" % elaspsed}s, events=#{real_events_count}, tps=#{"%.0f" % (real_events_count / elaspsed)}")
|
# the reader thread updates the @stats tps array
|
||||||
|
p = percentile(@stats, 0.70)
|
||||||
|
puts("elaspsed=#{"%.2f" % elaspsed}s, events=#{real_events_count}, avg tps=#{"%.0f" % (real_events_count / elaspsed)}, avg top 30% tps=#{"%.0f" % (p.reduce(:+) / p.size)}, best tps=#{p.last}")
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue