mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Add basic defaults module and spec. Use defaults in pipeline and agent
move/rename defaults under config, use Concurrent.processor_count remove method `filter_workers_count` add CpuCoreStrategy module functions/tests. Use fifty_percent as default. use extend self for Module functions added defaults printing facility change comment add tests to defaults printer subsystem closes #3852
This commit is contained in:
parent
112b28149e
commit
5b47009648
10 changed files with 322 additions and 9 deletions
|
@ -2,6 +2,7 @@
|
||||||
require "clamp" # gem 'clamp'
|
require "clamp" # gem 'clamp'
|
||||||
require "logstash/environment"
|
require "logstash/environment"
|
||||||
require "logstash/errors"
|
require "logstash/errors"
|
||||||
|
require "logstash/config/cpu_core_strategy"
|
||||||
require "uri"
|
require "uri"
|
||||||
require "net/http"
|
require "net/http"
|
||||||
LogStash::Environment.load_locale!
|
LogStash::Environment.load_locale!
|
||||||
|
@ -21,7 +22,8 @@ class LogStash::Agent < Clamp::Command
|
||||||
|
|
||||||
option ["-w", "--filterworkers"], "COUNT",
|
option ["-w", "--filterworkers"], "COUNT",
|
||||||
I18n.t("logstash.agent.flag.filterworkers"),
|
I18n.t("logstash.agent.flag.filterworkers"),
|
||||||
:attribute_name => :filter_workers, :default => 1, &:to_i
|
:attribute_name => :filter_workers,
|
||||||
|
:default => LogStash::Config::CpuCoreStrategy.fifty_percent, &:to_i
|
||||||
|
|
||||||
option ["-l", "--log"], "FILE",
|
option ["-l", "--log"], "FILE",
|
||||||
I18n.t("logstash.agent.flag.log"),
|
I18n.t("logstash.agent.flag.log"),
|
||||||
|
|
32
lib/logstash/config/cpu_core_strategy.rb
Normal file
32
lib/logstash/config/cpu_core_strategy.rb
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "logstash/namespace"
|
||||||
|
require "logstash/config/defaults"
|
||||||
|
|
||||||
|
module LogStash module Config module CpuCoreStrategy
|
||||||
|
|
||||||
|
extend self
|
||||||
|
|
||||||
|
def maximum
|
||||||
|
LogStash::Config::Defaults.cpu_cores
|
||||||
|
end
|
||||||
|
|
||||||
|
def fifty_percent
|
||||||
|
[1, (maximum * 0.5)].max.floor
|
||||||
|
end
|
||||||
|
|
||||||
|
def seventy_five_percent
|
||||||
|
[1, (maximum * 0.75)].max.floor
|
||||||
|
end
|
||||||
|
|
||||||
|
def twenty_five_percent
|
||||||
|
[1, (maximum * 0.25)].max.floor
|
||||||
|
end
|
||||||
|
|
||||||
|
def max_minus_one
|
||||||
|
[1, (maximum - 1)].max.floor
|
||||||
|
end
|
||||||
|
|
||||||
|
def max_minus_two
|
||||||
|
[1, (maximum - 2)].max.floor
|
||||||
|
end
|
||||||
|
end end end
|
12
lib/logstash/config/defaults.rb
Normal file
12
lib/logstash/config/defaults.rb
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "logstash/namespace"
|
||||||
|
require "concurrent"
|
||||||
|
|
||||||
|
module LogStash module Config module Defaults
|
||||||
|
|
||||||
|
extend self
|
||||||
|
|
||||||
|
def cpu_cores
|
||||||
|
Concurrent.processor_count
|
||||||
|
end
|
||||||
|
end end end
|
|
@ -10,6 +10,8 @@ require "logstash/filters/base"
|
||||||
require "logstash/inputs/base"
|
require "logstash/inputs/base"
|
||||||
require "logstash/outputs/base"
|
require "logstash/outputs/base"
|
||||||
require "logstash/util/reporter"
|
require "logstash/util/reporter"
|
||||||
|
require "logstash/config/cpu_core_strategy"
|
||||||
|
require "logstash/util/defaults_printer"
|
||||||
|
|
||||||
class LogStash::Pipeline
|
class LogStash::Pipeline
|
||||||
|
|
||||||
|
@ -38,7 +40,7 @@ class LogStash::Pipeline
|
||||||
# if no filters, pipe inputs directly to outputs
|
# if no filters, pipe inputs directly to outputs
|
||||||
@filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter
|
@filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter
|
||||||
@settings = {
|
@settings = {
|
||||||
"filter-workers" => 1,
|
"filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent
|
||||||
}
|
}
|
||||||
|
|
||||||
# @ready requires thread safety since it is typically polled from outside the pipeline thread
|
# @ready requires thread safety since it is typically polled from outside the pipeline thread
|
||||||
|
@ -51,10 +53,10 @@ class LogStash::Pipeline
|
||||||
end
|
end
|
||||||
|
|
||||||
def configure(setting, value)
|
def configure(setting, value)
|
||||||
if setting == "filter-workers"
|
if setting == "filter-workers" && value > 1
|
||||||
# Abort if we have any filters that aren't threadsafe
|
# Abort if we have any filters that aren't threadsafe
|
||||||
if value > 1 && @filters.any? { |f| !f.threadsafe? }
|
plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
|
||||||
plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
|
if !plugins.size.zero?
|
||||||
raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}"
|
raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}"
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -66,6 +68,8 @@ class LogStash::Pipeline
|
||||||
end
|
end
|
||||||
|
|
||||||
def run
|
def run
|
||||||
|
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))
|
||||||
|
|
||||||
begin
|
begin
|
||||||
start_inputs
|
start_inputs
|
||||||
start_filters if filters?
|
start_filters if filters?
|
||||||
|
@ -139,10 +143,17 @@ class LogStash::Pipeline
|
||||||
|
|
||||||
def start_filters
|
def start_filters
|
||||||
@filters.each(&:register)
|
@filters.each(&:register)
|
||||||
@filter_threads = @settings["filter-workers"].times.collect do
|
to_start = @settings["filter-workers"]
|
||||||
|
@filter_threads = to_start.times.collect do
|
||||||
Thread.new { filterworker }
|
Thread.new { filterworker }
|
||||||
end
|
end
|
||||||
|
actually_started = @filter_threads.select(&:alive?).size
|
||||||
|
msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
|
||||||
|
if actually_started < to_start
|
||||||
|
@logger.warn(msg)
|
||||||
|
else
|
||||||
|
@logger.info(msg)
|
||||||
|
end
|
||||||
@flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
|
@flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -191,7 +202,7 @@ class LogStash::Pipeline
|
||||||
end # def inputworker
|
end # def inputworker
|
||||||
|
|
||||||
def filterworker
|
def filterworker
|
||||||
LogStash::Util::set_thread_name("|worker")
|
LogStash::Util.set_thread_name("|worker")
|
||||||
begin
|
begin
|
||||||
while true
|
while true
|
||||||
event = @input_to_filter.pop
|
event = @input_to_filter.pop
|
||||||
|
@ -218,7 +229,7 @@ class LogStash::Pipeline
|
||||||
end # def filterworker
|
end # def filterworker
|
||||||
|
|
||||||
def outputworker
|
def outputworker
|
||||||
LogStash::Util::set_thread_name(">output")
|
LogStash::Util.set_thread_name(">output")
|
||||||
@outputs.each(&:worker_setup)
|
@outputs.each(&:worker_setup)
|
||||||
|
|
||||||
while true
|
while true
|
||||||
|
|
31
lib/logstash/util/defaults_printer.rb
Normal file
31
lib/logstash/util/defaults_printer.rb
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "logstash/namespace"
|
||||||
|
require "logstash/util"
|
||||||
|
require "logstash/util/worker_threads_default_printer"
|
||||||
|
|
||||||
|
|
||||||
|
# This class exists to format the settings for defaults used
|
||||||
|
module LogStash module Util class DefaultsPrinter
|
||||||
|
def self.print(settings)
|
||||||
|
new(settings).print
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(settings)
|
||||||
|
@settings = settings
|
||||||
|
@printers = [workers]
|
||||||
|
end
|
||||||
|
|
||||||
|
def print
|
||||||
|
collector = []
|
||||||
|
@printers.each do |printer|
|
||||||
|
printer.visit(collector)
|
||||||
|
end
|
||||||
|
"Default settings used: " + collector.join(', ')
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def workers
|
||||||
|
WorkerThreadsDefaultPrinter.new(@settings)
|
||||||
|
end
|
||||||
|
end end end
|
17
lib/logstash/util/worker_threads_default_printer.rb
Normal file
17
lib/logstash/util/worker_threads_default_printer.rb
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "logstash/namespace"
|
||||||
|
require "logstash/util"
|
||||||
|
|
||||||
|
# This class exists to format the settings for default worker threads
|
||||||
|
module LogStash module Util class WorkerThreadsDefaultPrinter
|
||||||
|
|
||||||
|
def initialize(settings)
|
||||||
|
@setting = settings.fetch('filter-workers', 1)
|
||||||
|
end
|
||||||
|
|
||||||
|
def visit(collector)
|
||||||
|
collector.push "Filter workers: #{@setting}"
|
||||||
|
end
|
||||||
|
|
||||||
|
end end end
|
||||||
|
|
123
spec/core/config_cpu_core_strategy_spec.rb
Normal file
123
spec/core/config_cpu_core_strategy_spec.rb
Normal file
|
@ -0,0 +1,123 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "spec_helper"
|
||||||
|
require "logstash/config/cpu_core_strategy"
|
||||||
|
|
||||||
|
describe LogStash::Config::CpuCoreStrategy do
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(LogStash::Config::Defaults).to receive(:cpu_cores).and_return(cores)
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the machine has 6 cores' do
|
||||||
|
let(:cores) { 6 }
|
||||||
|
|
||||||
|
it ".maximum should return 6" do
|
||||||
|
expect(described_class.maximum).to eq(6)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".fifty_percent should return 3" do
|
||||||
|
expect(described_class.fifty_percent).to eq(3)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".seventy_five_percent should return 4" do
|
||||||
|
expect(described_class.seventy_five_percent).to eq(4)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".twenty_five_percent should return 1" do
|
||||||
|
expect(described_class.twenty_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_one should return 5" do
|
||||||
|
expect(described_class.max_minus_one).to eq(5)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_two should return 4" do
|
||||||
|
expect(described_class.max_minus_two).to eq(4)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the machine has 4 cores' do
|
||||||
|
let(:cores) { 4 }
|
||||||
|
|
||||||
|
it ".maximum should return 4" do
|
||||||
|
expect(described_class.maximum).to eq(4)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".fifty_percent should return 2" do
|
||||||
|
expect(described_class.fifty_percent).to eq(2)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".seventy_five_percent should return 3" do
|
||||||
|
expect(described_class.seventy_five_percent).to eq(3)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".twenty_five_percent should return 1" do
|
||||||
|
expect(described_class.twenty_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_one should return 3" do
|
||||||
|
expect(described_class.max_minus_one).to eq(3)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_two should return 2" do
|
||||||
|
expect(described_class.max_minus_two).to eq(2)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the machine has 2 cores' do
|
||||||
|
let(:cores) { 2 }
|
||||||
|
|
||||||
|
it ".maximum should return 2" do
|
||||||
|
expect(described_class.maximum).to eq(2)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".fifty_percent should return 1" do
|
||||||
|
expect(described_class.fifty_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".seventy_five_percent should return 1" do
|
||||||
|
expect(described_class.seventy_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".twenty_five_percent should return 1" do
|
||||||
|
expect(described_class.twenty_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_one should return 1" do
|
||||||
|
expect(described_class.max_minus_one).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_two should return 1" do
|
||||||
|
expect(described_class.max_minus_two).to eq(1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the machine has 1 core' do
|
||||||
|
let(:cores) { 1 }
|
||||||
|
|
||||||
|
it ".maximum should return 1" do
|
||||||
|
expect(described_class.maximum).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".fifty_percent should return 1" do
|
||||||
|
expect(described_class.fifty_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".seventy_five_percent should return 1" do
|
||||||
|
expect(described_class.seventy_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".twenty_five_percent should return 1" do
|
||||||
|
expect(described_class.twenty_five_percent).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_one should return 1" do
|
||||||
|
expect(described_class.max_minus_one).to eq(1)
|
||||||
|
end
|
||||||
|
|
||||||
|
it ".max_minus_two should return 1" do
|
||||||
|
expect(described_class.max_minus_two).to eq(1)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
end
|
10
spec/core/config_defaults_spec.rb
Normal file
10
spec/core/config_defaults_spec.rb
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "spec_helper"
|
||||||
|
require "logstash/config/defaults"
|
||||||
|
|
||||||
|
describe LogStash::Config::Defaults do
|
||||||
|
it ".cpu_cores should return a positive integer" do
|
||||||
|
expect(described_class.cpu_cores.nil?).to be false
|
||||||
|
expect(described_class.cpu_cores.zero?).to be false
|
||||||
|
end
|
||||||
|
end
|
49
spec/util/defaults_printer_spec.rb
Normal file
49
spec/util/defaults_printer_spec.rb
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "spec_helper"
|
||||||
|
require "logstash/util/defaults_printer"
|
||||||
|
|
||||||
|
describe LogStash::Util::DefaultsPrinter do
|
||||||
|
shared_examples "a defaults printer" do
|
||||||
|
it 'the .print method returns a defaults description' do
|
||||||
|
expect(actual_block.call).to eq(expected)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:workers) { 1 }
|
||||||
|
let(:expected) { "Default settings used: Filter workers: #{workers}" }
|
||||||
|
let(:settings) { {} }
|
||||||
|
|
||||||
|
describe 'class methods API' do
|
||||||
|
let(:actual_block) do
|
||||||
|
-> {described_class.print(settings)}
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the settings hash is empty' do
|
||||||
|
it_behaves_like "a defaults printer"
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the settings hash has content' do
|
||||||
|
let(:workers) { 42 }
|
||||||
|
let(:settings) { {'filter-workers' => workers} }
|
||||||
|
|
||||||
|
it_behaves_like "a defaults printer"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe 'instance method API' do
|
||||||
|
let(:actual_block) do
|
||||||
|
-> {described_class.new(settings).print}
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the settings hash is empty' do
|
||||||
|
it_behaves_like "a defaults printer"
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the settings hash has content' do
|
||||||
|
let(:workers) { 13 }
|
||||||
|
let(:settings) { {'filter-workers' => workers} }
|
||||||
|
|
||||||
|
it_behaves_like "a defaults printer"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
26
spec/util/worker_threads_default_printer_spec.rb
Normal file
26
spec/util/worker_threads_default_printer_spec.rb
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
# encoding: utf-8
|
||||||
|
require "spec_helper"
|
||||||
|
require "logstash/util/worker_threads_default_printer"
|
||||||
|
|
||||||
|
describe LogStash::Util::WorkerThreadsDefaultPrinter do
|
||||||
|
let(:settings) { {} }
|
||||||
|
let(:collector) { [] }
|
||||||
|
|
||||||
|
subject { described_class.new(settings) }
|
||||||
|
|
||||||
|
context 'when the settings hash is empty' do
|
||||||
|
it 'the #visit method returns a string with 1 filter worker' do
|
||||||
|
subject.visit(collector)
|
||||||
|
expect(collector.first).to eq("Filter workers: 1")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when the settings hash has content' do
|
||||||
|
let(:settings) { {'filter-workers' => 42} }
|
||||||
|
|
||||||
|
it 'the #visit method returns a string with 42 filter workers' do
|
||||||
|
subject.visit(collector)
|
||||||
|
expect(collector.first).to eq("Filter workers: 42")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue