mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- only yield to Agent#run's block when all inputs and outputs have been
registered. Consider it like a 'ready' callback.
This commit is contained in:
parent
0b961c1c1c
commit
02e1d53280
1 changed files with 16 additions and 4 deletions
|
@ -213,7 +213,7 @@ class LogStash::Agent
|
|||
end # def configure
|
||||
|
||||
public
|
||||
def run
|
||||
def run(&block)
|
||||
LogStash::Util::set_thread_name(self.class.name)
|
||||
|
||||
ok = parse_options
|
||||
|
@ -226,7 +226,7 @@ class LogStash::Agent
|
|||
# Load the config file
|
||||
config = LogStash::Config::File.new(@config_file)
|
||||
|
||||
run_with_config(config)
|
||||
run_with_config(config, &block)
|
||||
end # def run
|
||||
|
||||
def run_with_config(config)
|
||||
|
@ -265,6 +265,8 @@ class LogStash::Agent
|
|||
filter_queue = SizedQueue.new(10)
|
||||
output_queue = LogStash::MultiQueue.new
|
||||
|
||||
ready_queue = Queue.new
|
||||
|
||||
input_target = @filters.length > 0 ? filter_queue : output_queue
|
||||
# Start inputs
|
||||
@inputs.each do |input|
|
||||
|
@ -272,6 +274,7 @@ class LogStash::Agent
|
|||
@threads[input] = Thread.new(input_target) do |input_target|
|
||||
input.logger = @logger
|
||||
input.register
|
||||
ready_queue << input
|
||||
input.run(input_target)
|
||||
end # new thread for thsi input
|
||||
end # @inputs.each
|
||||
|
@ -313,6 +316,7 @@ class LogStash::Agent
|
|||
output_queue.add_queue(queue)
|
||||
@threads["outputs/#{output.to_s}"] = Thread.new(queue) do |queue|
|
||||
output.register
|
||||
ready_queue << output
|
||||
begin
|
||||
LogStash::Util::set_thread_name("output/#{output.to_s}")
|
||||
output.logger = @logger
|
||||
|
@ -323,18 +327,26 @@ class LogStash::Agent
|
|||
end
|
||||
rescue Exception => e
|
||||
@logger.warn(["Output #{output.to_s} thread exception", e])
|
||||
# TODO(sissel): should we abort after too many failures?
|
||||
retry
|
||||
end
|
||||
end # begin/rescue
|
||||
end # Thread.new
|
||||
end # @outputs.each
|
||||
|
||||
# Wait for all inputs and outputs to be registered.
|
||||
wait_count = outputs.size + inputs.size
|
||||
while wait_count > 0 and ready_queue.pop
|
||||
wait_count -= 1
|
||||
end
|
||||
|
||||
# yield to a block in case someone's waiting for us to be done setting up
|
||||
# like tests, etc.
|
||||
yield if block_given?
|
||||
|
||||
# TODO(sissel): Monitor what's going on? Sleep forever? what?
|
||||
while sleep 5
|
||||
end
|
||||
end # def run
|
||||
end # def run_with_config
|
||||
|
||||
public
|
||||
def stop
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue