mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Merge branch 'master' of github.com:logstash/logstash
This commit is contained in:
commit
8323258d97
2 changed files with 209 additions and 0 deletions
91
pl.rb
Normal file
91
pl.rb
Normal file
|
@ -0,0 +1,91 @@
|
|||
# pipeline tests
|
||||
|
||||
$: << "lib"
|
||||
require "logstash/config/file"
|
||||
config = LogStash::Config::File.new(nil, ARGV[0])
|
||||
agent = LogStash::Agent.new
|
||||
inputs, filters, outputs = agent.instance_eval { parse_config(config) }
|
||||
|
||||
inputs.collect(&:register)
|
||||
filters.collect(&:register)
|
||||
outputs.collect(&:register)
|
||||
|
||||
i2f = SizedQueue.new(16)
|
||||
f2o = SizedQueue.new(16)
|
||||
i2f = f2o if filters.empty?
|
||||
|
||||
input_threads = inputs.collect do |i|
|
||||
t = Thread.new do
|
||||
begin
|
||||
i.run(i2f)
|
||||
rescue => e
|
||||
puts :input => i.class, :exception => e
|
||||
end
|
||||
end
|
||||
t[:name] = i.class
|
||||
t
|
||||
end
|
||||
|
||||
#input_supervisor_thread = Thread.new do
|
||||
#while true
|
||||
#input_threads.collect(&:join)
|
||||
#i2f << :shutdown
|
||||
#end
|
||||
#end
|
||||
|
||||
filter_thread = Thread.new(filters) do |filters|
|
||||
if filters.any?
|
||||
event = i2f.pop
|
||||
filters.each do |filter|
|
||||
filter.filter(event)
|
||||
end
|
||||
f2o << event
|
||||
end
|
||||
end
|
||||
filter_thread[:name] = "filterworker"
|
||||
|
||||
output_thread = Thread.new do
|
||||
begin
|
||||
while true
|
||||
event = f2o.pop
|
||||
outputs.each do |output|
|
||||
output.receive(event)
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
puts :output_thread => e
|
||||
end
|
||||
end
|
||||
output_thread[:name] = "outputworker"
|
||||
|
||||
def twait(thread)
|
||||
begin
|
||||
puts :waiting => thread[:name]
|
||||
thread.join
|
||||
puts :donewaiting => thread[:name]
|
||||
rescue => e
|
||||
puts thread => e
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown(input, filter, output)
|
||||
input.each do |i|
|
||||
i.raise("SHUTDOWN")
|
||||
twait(i)
|
||||
end
|
||||
|
||||
#filter.raise("SHUTDOWN")
|
||||
#twait(filter)
|
||||
output.raise("SHUTDOWN")
|
||||
twait(output)
|
||||
end
|
||||
|
||||
trap("INT") do
|
||||
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
|
||||
exit 1
|
||||
end
|
||||
|
||||
#[*input_threads, filter_thread, output_thread].collect(&:join)
|
||||
sleep 30
|
||||
|
||||
|
118
pl2.rb
Normal file
118
pl2.rb
Normal file
|
@ -0,0 +1,118 @@
|
|||
$: << "lib"
|
||||
require "logstash/config/file"
|
||||
|
||||
class Pipeline
|
||||
class ShutdownSignal; end
|
||||
|
||||
def initialize(configstr)
|
||||
# hacks for now to parse a config string
|
||||
config = LogStash::Config::File.new(nil, configstr)
|
||||
agent = LogStash::Agent.new
|
||||
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }
|
||||
|
||||
@inputs.collect(&:register)
|
||||
@filters.collect(&:register)
|
||||
@outputs.collect(&:register)
|
||||
|
||||
@input_to_filter = SizedQueue(16)
|
||||
@filter_to_output = SizedQueue(16)
|
||||
|
||||
# If no filters, pipe inputs to outputs
|
||||
if @filters.empty?
|
||||
input_to_filter = filter_to_output
|
||||
end
|
||||
end
|
||||
|
||||
def run
|
||||
# one thread per input
|
||||
@input_threads = @inputs.collect do |input|
|
||||
Thread.new(input) do |input|
|
||||
inputworker(input)
|
||||
end
|
||||
end
|
||||
|
||||
# one filterworker thread
|
||||
#@filter_threads = @filters.collect do |input
|
||||
# TODO(sissel): THIS IS WHERE I STOPPED WORKING
|
||||
|
||||
# one outputworker thread
|
||||
|
||||
# Now monitor input threads state
|
||||
# if all inputs are terminated, send shutdown signal to @input_to_filter
|
||||
end
|
||||
|
||||
def inputworker(plugin)
|
||||
begin
|
||||
plugin.run(@input_to_filter)
|
||||
rescue ShutdownSignal
|
||||
plugin.teardown
|
||||
rescue => e
|
||||
@logger.error("Exception in plugin #{plugin.class}, restarting plugin.",
|
||||
"plugin" => plugin.inspect, "exception" => e)
|
||||
plugin.teardown
|
||||
retry
|
||||
end
|
||||
end # def
|
||||
|
||||
def filterworker
|
||||
begin
|
||||
while true
|
||||
event << @input_to_filter
|
||||
break if event == :shutdown
|
||||
@filters.each do |filter|
|
||||
filter.filter(event)
|
||||
end
|
||||
next if event.cancelled?
|
||||
@filter_to_output << event
|
||||
end
|
||||
rescue => e
|
||||
@logger.error("Exception in plugin #{plugin.class}",
|
||||
"plugin" => plugin.inspect, "exception" => e)
|
||||
end
|
||||
@filters.each(&:teardown)
|
||||
end # def filterworker
|
||||
|
||||
def outputworker
|
||||
begin
|
||||
while true
|
||||
event << @filter_to_output
|
||||
break if event == :shutdown
|
||||
@outputs.each do |output|
|
||||
output.receive(event)
|
||||
end
|
||||
end
|
||||
rescue => e
|
||||
@logger.error("Exception in plugin #{plugin.class}",
|
||||
"plugin" => plugin.inspect, "exception" => e)
|
||||
end
|
||||
@outputs.each(&:teardown)
|
||||
end # def filterworker
|
||||
end # class Pipeline
|
||||
|
||||
def twait(thread)
|
||||
begin
|
||||
puts :waiting => thread[:name]
|
||||
thread.join
|
||||
puts :donewaiting => thread[:name]
|
||||
rescue => e
|
||||
puts thread => e
|
||||
end
|
||||
end
|
||||
|
||||
def shutdown(input, filter, output)
|
||||
input.each do |i|
|
||||
i.raise("SHUTDOWN")
|
||||
end
|
||||
|
||||
#filter.raise("SHUTDOWN")
|
||||
#twait(filter)
|
||||
output.raise("SHUTDOWN")
|
||||
twait(output)
|
||||
end
|
||||
|
||||
trap("INT") do
|
||||
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
|
||||
exit 1
|
||||
end
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue