Fix threadsafe workers in output delegators to not instantiate extras

Fixes #4763
This commit is contained in:
Andrew Cholakian 2016-03-04 14:42:33 -06:00
parent 0793ea7b8b
commit 92ba0bfb95
2 changed files with 110 additions and 74 deletions

View file

@ -1,5 +1,6 @@
# encoding: utf-8
require "concurrent/atomic/atomic_fixnum"
java_import "java.util.concurrent.CopyOnWriteArrayList"
# This class goes hand in hand with the pipeline to provide a pool of
# free workers to be used by pipeline worker threads. The pool is
@ -8,72 +9,27 @@ require "concurrent/atomic/atomic_fixnum"
#
# This plugin also records some basic statistics
module LogStash class OutputDelegator
attr_reader :workers, :config, :worker_count, :threadsafe
attr_reader :workers, :config, :threadsafe
# The *args this takes are the same format that a Outputs::Base takes. A list of hashes with parameters in them
# Internally these just get merged together into a single hash
def initialize(logger, klass, default_worker_count, metric, *args)
def initialize(logger, klass, default_worker_count, metric, *plugin_args)
@logger = logger
@threadsafe = klass.threadsafe?
@config = args.reduce({}, :merge)
@config = plugin_args.reduce({}, :merge)
@klass = klass
@workers = java.util.concurrent.CopyOnWriteArrayList.new
@default_worker_count = default_worker_count
@registered = false
# Create an instance of the input so we can fetch the identifier
output = @klass.new(*args)
output = @klass.new(@config)
# Scope the metrics to the plugin
namespaced_metric = metric.namespace(output.plugin_unique_name.to_sym)
@metric_events = namespaced_metric.namespace(:events)
# We define this as an array regardless of threadsafety
# to make reporting simpler, even though a threadsafe plugin will just have
# a single instance
#
# Older plugins invoke the instance method Outputs::Base#workers_not_supported
# To detect these we need an instance to be created first :()
# TODO: In the next major version after 2.x remove support for this
@workers = [@klass.new(*args)]
@workers.first.register # Needed in case register calls `workers_not_supported`
# DO NOT move this statement before the instantiation of the first single instance
# Read the note above to understand why
@worker_count = calculate_worker_count(default_worker_count)
@logger.debug("Will start workers for output", :worker_count => @worker_count, :class => klass)
warn_on_worker_override!
# This queue is used to manage sharing across threads
@worker_queue = SizedQueue.new(@worker_count)
@workers += (@worker_count - 1).times.map do
inst = @klass.new(*args)
inst.metric = @metric
inst.register
inst
end
@workers.each { |w| @worker_queue << w }
@events_received = Concurrent::AtomicFixnum.new(0)
# One might wonder why we don't use something like
# define_singleton_method(:multi_receive, method(:threadsafe_multi_receive)
# and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time!
# The other 2% you get weird errors about rebinding to the same object
# Until we switch to Jruby 9.x keep the define_singleton_method parts
# the way they are, with a block
# See https://github.com/jruby/jruby/issues/3582
if threadsafe?
@threadsafe_worker = @workers.first
define_singleton_method(:multi_receive) do |events|
threadsafe_multi_receive(events)
end
else
define_singleton_method(:multi_receive) do |events|
worker_multi_receive(events)
end
end
end
def threadsafe?
@ -98,11 +54,14 @@ module LogStash class OutputDelegator
@config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported?
end
def calculate_worker_count(default_worker_count)
def target_worker_count
# Remove in 5.0 after all plugins upgraded to use class level declarations
raise ArgumentError, "Attempted to detect target worker count before instantiating a worker to test for legacy workers_not_supported!" if @workers.size == 0
if @threadsafe || @klass.workers_not_supported?
1
else
@config["workers"] || default_worker_count
@config["workers"] || @default_worker_count
end
end
@ -111,7 +70,59 @@ module LogStash class OutputDelegator
end
def register
@workers.each {|w| w.register}
raise ArgumentError, "Attempted to register #{self} twice!" if @registered
@registered = true
# We define this as an array regardless of threadsafety
# to make reporting simpler, even though a threadsafe plugin will just have
# a single instance
#
# Older plugins invoke the instance method Outputs::Base#workers_not_supported
# To detect these we need an instance to be created first :()
# TODO: In the next major version after 2.x remove support for this
@workers << @klass.new(@config)
@workers.first.register # Needed in case register calls `workers_not_supported`
@logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass)
# Threadsafe versions don't need additional workers
setup_additional_workers!(target_worker_count) unless @threadsafe
# We skip the first worker because that's pre-registered to deal with legacy workers_not_supported
@workers.subList(1,@workers.size).each(&:register)
setup_multi_receive!
end
def setup_additional_workers!(target_worker_count)
warn_on_worker_override!
(target_worker_count - 1).times do
inst = @klass.new(@config)
inst.metric = @metric
@workers << inst
end
# This queue is used to manage sharing across threads
@worker_queue = SizedQueue.new(target_worker_count)
@workers.each {|w| @worker_queue << w }
end
def setup_multi_receive!
# One might wonder why we don't use something like
# define_singleton_method(:multi_receive, method(:threadsafe_multi_receive)
# and the answer is this is buggy on Jruby 1.7.x . It works 98% of the time!
# The other 2% you get weird errors about rebinding to the same object
# Until we switch to Jruby 9.x keep the define_singleton_method parts
# the way they are, with a block
# See https://github.com/jruby/jruby/issues/3582
if threadsafe?
@threadsafe_worker = @workers.first
define_singleton_method(:multi_receive) do |events|
threadsafe_multi_receive(events)
end
else
define_singleton_method(:multi_receive) do |events|
worker_multi_receive(events)
end
end
end
def threadsafe_multi_receive(events)
@ -138,9 +149,13 @@ module LogStash class OutputDelegator
def do_close
@logger.debug("closing output delegator", :klass => self)
@worker_count.times do
worker = @worker_queue.pop
worker.do_close
if @threadsafe
@workers.each(&:do_close)
else
worker_count.times do
worker = @worker_queue.pop
worker.do_close
end
end
end
@ -153,10 +168,17 @@ module LogStash class OutputDelegator
if @threadsafe
0
else
# The pipeline reporter can run before the outputs are registered trying to pull a value here
# In that case @worker_queue is empty, we just return 0
return 0 unless @worker_queue
@workers.size - @worker_queue.size
end
end
def worker_count
@workers.size
end
private
# Needed for testing, so private
attr_reader :threadsafe_worker, :worker_queue

View file

@ -31,6 +31,7 @@ describe LogStash::OutputDelegator do
context "after having received a batch of events" do
before do
subject.register
subject.multi_receive(events)
end
@ -43,25 +44,35 @@ describe LogStash::OutputDelegator do
end
end
it "should register all workers on register" do
expect(out_inst).to receive(:register)
subject.register
end
it "should close all workers when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
describe "closing" do
before do
subject.register
end
it "should register all workers on register" do
expect(out_inst).to have_received(:register)
end
it "should close all workers when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
end
end
describe "concurrency and worker support" do
before do
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
end
describe "non-threadsafe outputs that allow workers" do
let(:default_worker_count) { 3 }
before do
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
subject.register
end
it "should instantiate multiple workers" do
@ -77,9 +88,7 @@ describe LogStash::OutputDelegator do
describe "threadsafe outputs" do
before do
allow(out_klass).to receive(:threadsafe?).and_return(true)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
subject.register
end
it "should return true when threadsafe? is invoked" do
@ -96,14 +105,18 @@ describe LogStash::OutputDelegator do
end
it "should not utilize the worker queue" do
expect(subject.send(:worker_queue)).not_to receive(:pop)
subject.multi_receive(events)
expect(subject.send(:worker_queue)).to be_nil
end
it "should send received events to the worker" do
expect(out_inst).to receive(:multi_receive).with(events)
subject.multi_receive(events)
end
it "should close all workers when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
end
end
end
end
@ -123,11 +136,12 @@ describe LogStash::OutputDelegator do
let(:default_worker_count) { 2 }
let(:out_klass) { LogStash::Outputs::NOOPDelLegacyNoWorkers }
before do
before(:each) do
allow(logger).to receive(:debug).with(any_args)
end
it "should only setup one worker" do
subject.register
expect(subject.worker_count).to eql(1)
end
end