Merge branch 'feature/3852'

This commit is contained in:
guyboertje 2015-09-24 15:30:12 +01:00
commit e0ab036b34
10 changed files with 322 additions and 9 deletions

View file

@ -2,6 +2,7 @@
require "clamp" # gem 'clamp'
require "logstash/environment"
require "logstash/errors"
require "logstash/config/cpu_core_strategy"
require "uri"
require "net/http"
LogStash::Environment.load_locale!
@ -21,7 +22,8 @@ class LogStash::Agent < Clamp::Command
option ["-w", "--filterworkers"], "COUNT",
I18n.t("logstash.agent.flag.filterworkers"),
:attribute_name => :filter_workers, :default => 1, &:to_i
:attribute_name => :filter_workers,
:default => LogStash::Config::CpuCoreStrategy.fifty_percent, &:to_i
option ["-l", "--log"], "FILE",
I18n.t("logstash.agent.flag.log"),

View file

@ -0,0 +1,32 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/config/defaults"
module LogStash module Config module CpuCoreStrategy
extend self
def maximum
LogStash::Config::Defaults.cpu_cores
end
def fifty_percent
[1, (maximum * 0.5)].max.floor
end
def seventy_five_percent
[1, (maximum * 0.75)].max.floor
end
def twenty_five_percent
[1, (maximum * 0.25)].max.floor
end
def max_minus_one
[1, (maximum - 1)].max.floor
end
def max_minus_two
[1, (maximum - 2)].max.floor
end
end end end

View file

@ -0,0 +1,12 @@
# encoding: utf-8
require "logstash/namespace"
require "concurrent"
module LogStash module Config module Defaults
extend self
def cpu_cores
Concurrent.processor_count
end
end end end

View file

@ -10,6 +10,8 @@ require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/util/reporter"
require "logstash/config/cpu_core_strategy"
require "logstash/util/defaults_printer"
class LogStash::Pipeline
@ -38,7 +40,7 @@ class LogStash::Pipeline
# if no filters, pipe inputs directly to outputs
@filter_to_output = filters? ? SizedQueue.new(20) : @input_to_filter
@settings = {
"filter-workers" => 1,
"filter-workers" => LogStash::Config::CpuCoreStrategy.fifty_percent
}
# @ready requires thread safety since it is typically polled from outside the pipeline thread
@ -51,10 +53,10 @@ class LogStash::Pipeline
end
def configure(setting, value)
if setting == "filter-workers"
if setting == "filter-workers" && value > 1
# Abort if we have any filters that aren't threadsafe
if value > 1 && @filters.any? { |f| !f.threadsafe? }
plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
plugins = @filters.select { |f| !f.threadsafe? }.collect { |f| f.class.config_name }
if !plugins.size.zero?
raise LogStash::ConfigurationError, "Cannot use more than 1 filter worker because the following plugins don't work with more than one worker: #{plugins.join(", ")}"
end
end
@ -66,6 +68,8 @@ class LogStash::Pipeline
end
def run
@logger.terminal(LogStash::Util::DefaultsPrinter.print(@settings))
begin
start_inputs
start_filters if filters?
@ -139,10 +143,17 @@ class LogStash::Pipeline
def start_filters
@filters.each(&:register)
@filter_threads = @settings["filter-workers"].times.collect do
to_start = @settings["filter-workers"]
@filter_threads = to_start.times.collect do
Thread.new { filterworker }
end
actually_started = @filter_threads.select(&:alive?).size
msg = "Worker threads expected: #{to_start}, worker threads started: #{actually_started}"
if actually_started < to_start
@logger.warn(msg)
else
@logger.info(msg)
end
@flusher_thread = Thread.new { Stud.interval(5) { @input_to_filter.push(LogStash::FLUSH) } }
end
@ -191,7 +202,7 @@ class LogStash::Pipeline
end # def inputworker
def filterworker
LogStash::Util::set_thread_name("|worker")
LogStash::Util.set_thread_name("|worker")
begin
while true
event = @input_to_filter.pop
@ -218,7 +229,7 @@ class LogStash::Pipeline
end # def filterworker
def outputworker
LogStash::Util::set_thread_name(">output")
LogStash::Util.set_thread_name(">output")
@outputs.each(&:worker_setup)
while true

View file

@ -0,0 +1,31 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/util"
require "logstash/util/worker_threads_default_printer"
# This class exists to format the settings for defaults used
module LogStash module Util class DefaultsPrinter
def self.print(settings)
new(settings).print
end
def initialize(settings)
@settings = settings
@printers = [workers]
end
def print
collector = []
@printers.each do |printer|
printer.visit(collector)
end
"Default settings used: " + collector.join(', ')
end
private
def workers
WorkerThreadsDefaultPrinter.new(@settings)
end
end end end

View file

@ -0,0 +1,17 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/util"
# This class exists to format the settings for default worker threads
module LogStash module Util class WorkerThreadsDefaultPrinter
def initialize(settings)
@setting = settings.fetch('filter-workers', 1)
end
def visit(collector)
collector.push "Filter workers: #{@setting}"
end
end end end

View file

@ -0,0 +1,123 @@
# encoding: utf-8
require "spec_helper"
require "logstash/config/cpu_core_strategy"
describe LogStash::Config::CpuCoreStrategy do
before do
allow(LogStash::Config::Defaults).to receive(:cpu_cores).and_return(cores)
end
context 'when the machine has 6 cores' do
let(:cores) { 6 }
it ".maximum should return 6" do
expect(described_class.maximum).to eq(6)
end
it ".fifty_percent should return 3" do
expect(described_class.fifty_percent).to eq(3)
end
it ".seventy_five_percent should return 4" do
expect(described_class.seventy_five_percent).to eq(4)
end
it ".twenty_five_percent should return 1" do
expect(described_class.twenty_five_percent).to eq(1)
end
it ".max_minus_one should return 5" do
expect(described_class.max_minus_one).to eq(5)
end
it ".max_minus_two should return 4" do
expect(described_class.max_minus_two).to eq(4)
end
end
context 'when the machine has 4 cores' do
let(:cores) { 4 }
it ".maximum should return 4" do
expect(described_class.maximum).to eq(4)
end
it ".fifty_percent should return 2" do
expect(described_class.fifty_percent).to eq(2)
end
it ".seventy_five_percent should return 3" do
expect(described_class.seventy_five_percent).to eq(3)
end
it ".twenty_five_percent should return 1" do
expect(described_class.twenty_five_percent).to eq(1)
end
it ".max_minus_one should return 3" do
expect(described_class.max_minus_one).to eq(3)
end
it ".max_minus_two should return 2" do
expect(described_class.max_minus_two).to eq(2)
end
end
context 'when the machine has 2 cores' do
let(:cores) { 2 }
it ".maximum should return 2" do
expect(described_class.maximum).to eq(2)
end
it ".fifty_percent should return 1" do
expect(described_class.fifty_percent).to eq(1)
end
it ".seventy_five_percent should return 1" do
expect(described_class.seventy_five_percent).to eq(1)
end
it ".twenty_five_percent should return 1" do
expect(described_class.twenty_five_percent).to eq(1)
end
it ".max_minus_one should return 1" do
expect(described_class.max_minus_one).to eq(1)
end
it ".max_minus_two should return 1" do
expect(described_class.max_minus_two).to eq(1)
end
end
context 'when the machine has 1 core' do
let(:cores) { 1 }
it ".maximum should return 1" do
expect(described_class.maximum).to eq(1)
end
it ".fifty_percent should return 1" do
expect(described_class.fifty_percent).to eq(1)
end
it ".seventy_five_percent should return 1" do
expect(described_class.seventy_five_percent).to eq(1)
end
it ".twenty_five_percent should return 1" do
expect(described_class.twenty_five_percent).to eq(1)
end
it ".max_minus_one should return 1" do
expect(described_class.max_minus_one).to eq(1)
end
it ".max_minus_two should return 1" do
expect(described_class.max_minus_two).to eq(1)
end
end
end

View file

@ -0,0 +1,10 @@
# encoding: utf-8
require "spec_helper"
require "logstash/config/defaults"
describe LogStash::Config::Defaults do
it ".cpu_cores should return a positive integer" do
expect(described_class.cpu_cores.nil?).to be false
expect(described_class.cpu_cores.zero?).to be false
end
end

View file

@ -0,0 +1,49 @@
# encoding: utf-8
require "spec_helper"
require "logstash/util/defaults_printer"
describe LogStash::Util::DefaultsPrinter do
shared_examples "a defaults printer" do
it 'the .print method returns a defaults description' do
expect(actual_block.call).to eq(expected)
end
end
let(:workers) { 1 }
let(:expected) { "Default settings used: Filter workers: #{workers}" }
let(:settings) { {} }
describe 'class methods API' do
let(:actual_block) do
-> {described_class.print(settings)}
end
context 'when the settings hash is empty' do
it_behaves_like "a defaults printer"
end
context 'when the settings hash has content' do
let(:workers) { 42 }
let(:settings) { {'filter-workers' => workers} }
it_behaves_like "a defaults printer"
end
end
describe 'instance method API' do
let(:actual_block) do
-> {described_class.new(settings).print}
end
context 'when the settings hash is empty' do
it_behaves_like "a defaults printer"
end
context 'when the settings hash has content' do
let(:workers) { 13 }
let(:settings) { {'filter-workers' => workers} }
it_behaves_like "a defaults printer"
end
end
end

View file

@ -0,0 +1,26 @@
# encoding: utf-8
require "spec_helper"
require "logstash/util/worker_threads_default_printer"
describe LogStash::Util::WorkerThreadsDefaultPrinter do
let(:settings) { {} }
let(:collector) { [] }
subject { described_class.new(settings) }
context 'when the settings hash is empty' do
it 'the #visit method returns a string with 1 filter worker' do
subject.visit(collector)
expect(collector.first).to eq("Filter workers: 1")
end
end
context 'when the settings hash has content' do
let(:settings) { {'filter-workers' => 42} }
it 'the #visit method returns a string with 42 filter workers' do
subject.visit(collector)
expect(collector.first).to eq("Filter workers: 42")
end
end
end