mirror of
https://github.com/elastic/logstash.git
synced 2025-04-22 05:37:21 -04:00
91 lines
1.6 KiB
Ruby
91 lines
1.6 KiB
Ruby
# pipeline tests
|
|
|
|
$: << "lib"
|
|
require "logstash/config/file"
|
|
config = LogStash::Config::File.new(nil, ARGV[0])
|
|
agent = LogStash::Agent.new
|
|
inputs, filters, outputs = agent.instance_eval { parse_config(config) }
|
|
|
|
inputs.collect(&:register)
|
|
filters.collect(&:register)
|
|
outputs.collect(&:register)
|
|
|
|
i2f = SizedQueue.new(16)
|
|
f2o = SizedQueue.new(16)
|
|
i2f = f2o if filters.empty?
|
|
|
|
input_threads = inputs.collect do |i|
|
|
t = Thread.new do
|
|
begin
|
|
i.run(i2f)
|
|
rescue => e
|
|
puts :input => i.class, :exception => e
|
|
end
|
|
end
|
|
t[:name] = i.class
|
|
t
|
|
end
|
|
|
|
#input_supervisor_thread = Thread.new do
|
|
#while true
|
|
#input_threads.collect(&:join)
|
|
#i2f << :shutdown
|
|
#end
|
|
#end
|
|
|
|
filter_thread = Thread.new(filters) do |filters|
|
|
if filters.any?
|
|
event = i2f.pop
|
|
filters.each do |filter|
|
|
filter.filter(event)
|
|
end
|
|
f2o << event
|
|
end
|
|
end
|
|
filter_thread[:name] = "filterworker"
|
|
|
|
output_thread = Thread.new do
|
|
begin
|
|
while true
|
|
event = f2o.pop
|
|
outputs.each do |output|
|
|
output.receive(event)
|
|
end
|
|
end
|
|
rescue => e
|
|
puts :output_thread => e
|
|
end
|
|
end
|
|
output_thread[:name] = "outputworker"
|
|
|
|
def twait(thread)
|
|
begin
|
|
puts :waiting => thread[:name]
|
|
thread.join
|
|
puts :donewaiting => thread[:name]
|
|
rescue => e
|
|
puts thread => e
|
|
end
|
|
end
|
|
|
|
def shutdown(input, filter, output)
|
|
input.each do |i|
|
|
i.raise("SHUTDOWN")
|
|
twait(i)
|
|
end
|
|
|
|
#filter.raise("SHUTDOWN")
|
|
#twait(filter)
|
|
output.raise("SHUTDOWN")
|
|
twait(output)
|
|
end
|
|
|
|
trap("INT") do
|
|
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
|
|
exit 1
|
|
end
|
|
|
|
#[*input_threads, filter_thread, output_thread].collect(&:join)
|
|
sleep 30
|
|
|
|
|
|
|