mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- call register methods, silly goose.
- implement start_inputs - respect an inputs 'threads' setting
This commit is contained in:
parent
b7a8e9236b
commit
2655d81dfb
1 changed files with 28 additions and 12 deletions
|
@ -37,6 +37,9 @@ class LogStash::Pipeline
|
|||
|
||||
def compile_config
|
||||
code = []
|
||||
code << "@inputs = []"
|
||||
code << "@filters = []"
|
||||
code << "@outputs = []"
|
||||
sections = @config.recursive_select(LogStash::Config::AST::PluginSection)
|
||||
sections.each do |s|
|
||||
code << s.compile_initializer
|
||||
|
@ -45,13 +48,6 @@ class LogStash::Pipeline
|
|||
# start inputs
|
||||
code << "class << self"
|
||||
definitions = []
|
||||
definitions << "def start_inputs(queue)"
|
||||
["input"].each do |type|
|
||||
sections.select { |s| s.plugin_type.text_value == type }.each do |s|
|
||||
definitions << s.compile.split("\n").map { |e| " #{e}" }.join("\n")
|
||||
end
|
||||
end
|
||||
definitions << "end"
|
||||
|
||||
["filter", "output"].each do |type|
|
||||
definitions << "def #{type}(event)"
|
||||
|
@ -69,9 +65,9 @@ class LogStash::Pipeline
|
|||
|
||||
def run
|
||||
@input_threads = []
|
||||
start_inputs(@input_to_filter)
|
||||
start_filters(@input_to_filter, @filter_to_output) if filters?
|
||||
start_outputs(@filter_to_output)
|
||||
start_inputs
|
||||
start_filters if filters?
|
||||
start_outputs
|
||||
|
||||
@logger.info("Pipeline started")
|
||||
wait_inputs
|
||||
|
@ -107,13 +103,29 @@ class LogStash::Pipeline
|
|||
@output_thread.join
|
||||
end
|
||||
|
||||
def start_inputs
|
||||
moreinputs = []
|
||||
@inputs.each do |input|
|
||||
if input.threadable && input.threads > 1
|
||||
(input.threads-1).times do |i|
|
||||
moreinputs << input.clone
|
||||
end
|
||||
end
|
||||
end
|
||||
@inputs += moreinputs
|
||||
|
||||
@inputs.each do |input|
|
||||
start_input(input)
|
||||
end
|
||||
end
|
||||
|
||||
def start_filters
|
||||
@filter_threads = [
|
||||
Thread.new { filterworker }
|
||||
]
|
||||
end
|
||||
|
||||
def start_outputs(queue)
|
||||
def start_outputs
|
||||
@output_thread = Thread.new do
|
||||
outputworker
|
||||
end
|
||||
|
@ -124,6 +136,7 @@ class LogStash::Pipeline
|
|||
end
|
||||
|
||||
def inputworker(plugin)
|
||||
LogStash::Util::set_thread_name("<#{plugin.class.config_name}")
|
||||
begin
|
||||
plugin.run(@input_to_filter)
|
||||
rescue ShutdownSignal
|
||||
|
@ -145,6 +158,8 @@ class LogStash::Pipeline
|
|||
end # def inputworker
|
||||
|
||||
def filterworker
|
||||
LogStash::Util::set_thread_name("|worker")
|
||||
@filters.each(&:register)
|
||||
begin
|
||||
while true
|
||||
event = @input_to_filter.pop
|
||||
|
@ -162,10 +177,11 @@ class LogStash::Pipeline
|
|||
end # def filterworker
|
||||
|
||||
def outputworker
|
||||
LogStash::Util::set_thread_name(">output")
|
||||
@outputs.each(&:register)
|
||||
while true
|
||||
event = @filter_to_output.pop
|
||||
break if event == ShutdownSignal
|
||||
|
||||
output(event)
|
||||
end # while true
|
||||
@outputs.each(&:teardown)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue