diff --git a/logstash-core/lib/logstash/output_delegator.rb b/logstash-core/lib/logstash/output_delegator.rb index 6c2b83cab..751523d0d 100644 --- a/logstash-core/lib/logstash/output_delegator.rb +++ b/logstash-core/lib/logstash/output_delegator.rb @@ -17,25 +17,35 @@ module LogStash; class OutputDelegator @threadsafe = klass.threadsafe? @config = args.reduce({}, :merge) @klass = klass - @worker_count = calculate_worker_count(default_worker_count) - - warn_on_worker_override! - - @worker_queue = SizedQueue.new(@worker_count) # We define this as an array regardless of threadsafety - # to make reporting simpler - @workers = @worker_count.times.map do - w = @klass.new(*args) - w.register - @worker_queue << w - w + # to make reporting simpler, even though a threadsafe plugin will just have + # a single instance + # + # Older plugins invoke the instance method Outputs::Base#workers_not_supported + # To detect these we need an instance to be created first :() + # TODO: In the next major version after 2.x remove support for this + @workers = [@klass.new(*args)] + @workers.first.register # Needed in case register calls `workers_not_supported` + + # DO NOT move this statement before the instantiation of the first single instance + # Read the note above to understand why + @worker_count = calculate_worker_count(default_worker_count) + warn_on_worker_override! + # This queue is used to manage sharing across threads + @worker_queue = SizedQueue.new(@worker_count) + + @workers += (@worker_count - 1).times.map do + inst = @klass.new(*args) + inst.register + inst end + @workers.each { |w| @worker_queue << w } @events_received = Concurrent::AtomicFixnum.new(0) - if threadsafe + if threadsafe? @threadsafe_worker = @workers.first self.define_singleton_method(:multi_receive, method(:threadsafe_multi_receive)) else @@ -43,18 +53,26 @@ module LogStash; class OutputDelegator end end + def threadsafe? + !!@threadsafe + end + def warn_on_worker_override! # The user has configured extra workers, but this plugin doesn't support it :( - if @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported? - message = @workers_not_supported_message + if worker_limits_overriden? + message = @klass.workers_not_supported_message if message - @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message)) + @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => @klass.config_name, :worker_count => @config["workers"], :message => message)) else - @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers)) + @logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => @klass.config_name, :worker_count => @config["workers"], :message => message)) end end end + def worker_limits_overriden? + @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported? + end + def calculate_worker_count(default_worker_count) if @threadsafe || @klass.workers_not_supported? 1 @@ -71,7 +89,6 @@ module LogStash; class OutputDelegator @workers.each {|w| w.register} end - # Threadsafe outputs have a much simpler def threadsafe_multi_receive(events) @events_received.increment(events.length) @@ -81,6 +98,7 @@ module LogStash; class OutputDelegator def worker_multi_receive(events) @events_received.increment(events.length) + @logger.debug("worker queue pop") worker = @worker_queue.pop begin worker.multi_receive(events) @@ -110,4 +128,16 @@ module LogStash; class OutputDelegator @workers.size - @worker_queue.size end end + + private + + # Needed for tests + def threadsafe_worker + @threadsafe_worker + end + + # Needed for tests + def worker_queue + @worker_queue + end end end \ No newline at end of file diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 2be042d66..bc72367e3 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -25,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin # Note that this setting may not be useful for all outputs. config :workers, :validate => :number, :default => 1 - attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins + attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins, :workers_not_supported def self.declare_threadsafe! declare_workers_not_supported! @@ -41,14 +41,20 @@ class LogStash::Outputs::Base < LogStash::Plugin @workers_not_supported = true end + def self.workers_not_supported_message + @workers_not_supported_message + end + def self.workers_not_supported? - @workers_not_supported == true + !!@workers_not_supported end public # TODO: Remove this in the next major version after Logstash 2.x + # Post 2.x it should raise an error and tell people to use the class level + # declaration def workers_not_supported(message=nil) - raise ArgumentError, "Outputs::Base#workers_not_supported is no longer a valid part of the logstash API, please use the class method Outputs::Base.declare_workers_not_supported!" + self.class.declare_workers_not_supported!(message) end public diff --git a/logstash-core/spec/logstash/output_delegator_spec.rb b/logstash-core/spec/logstash/output_delegator_spec.rb index 323ccc636..4c9a2241c 100644 --- a/logstash-core/spec/logstash/output_delegator_spec.rb +++ b/logstash-core/spec/logstash/output_delegator_spec.rb @@ -1,51 +1,122 @@ # encoding: utf-8 require 'spec_helper' - - describe LogStash::OutputDelegator do let(:logger) { double("logger") } - let(:out_klass) { double("output klass") } - let(:out_inst) { double("output instance") } + let(:events) { 7.times.map { LogStash::Event.new }} + let(:default_worker_count) { 1 } - subject { described_class.new(logger, out_klass, 1) } + subject { described_class.new(logger, out_klass, default_worker_count) } - before do - allow(out_klass).to receive(:new).with(any_args).and_return(out_inst) - allow(out_klass).to receive(:threadsafe?).and_return(false) - allow(out_klass).to receive(:workers_not_supported?).and_return(false) - allow(out_inst).to receive(:register) - allow(logger).to receive(:debug).with(any_args) - end - - it "should initialize cleanly" do - expect { subject }.not_to raise_error - end - - context "after having received a batch of events" do - let(:events) { 7.times.map { LogStash::Event.new }} + context "with a plain output plugin" do + let(:out_klass) { double("output klass") } + let(:out_inst) { double("output instance") } before do + allow(out_klass).to receive(:new).with(any_args).and_return(out_inst) + allow(out_klass).to receive(:threadsafe?).and_return(false) + allow(out_klass).to receive(:workers_not_supported?).and_return(false) + allow(out_inst).to receive(:register) allow(out_inst).to receive(:multi_receive) - subject.multi_receive(events) + allow(logger).to receive(:debug).with(any_args) end - it "should pass the events through" do - expect(out_inst).to have_received(:multi_receive).with(events) + it "should initialize cleanly" do + expect { subject }.not_to raise_error end - it "should increment the number of events received" do - expect(subject.events_received).to eql(events.length) + context "after having received a batch of events" do + before do + subject.multi_receive(events) + end + + it "should pass the events through" do + expect(out_inst).to have_received(:multi_receive).with(events) + end + + it "should increment the number of events received" do + expect(subject.events_received).to eql(events.length) + end + end + + it "should register all workers on register" do + expect(out_inst).to receive(:register) + subject.register + end + + it "should close all workers when closing" do + expect(out_inst).to receive(:do_close) + subject.do_close + end + + describe "concurrency and worker support" do + describe "non-threadsafe outputs that allow workers" do + let(:default_worker_count) { 3 } + + before do + allow(out_klass).to receive(:threadsafe?).and_return(false) + allow(out_klass).to receive(:workers_not_supported?).and_return(false) + end + + it "should instantiate multiple workers" do + expect(subject.workers.length).to eql(default_worker_count) + end + + it "should send received events to the worker" do + expect(out_inst).to receive(:multi_receive).with(events) + subject.multi_receive(events) + end + end + + describe "threadsafe outputs" do + before do + allow(out_klass).to receive(:threadsafe?).and_return(true) + allow(out_klass).to receive(:workers_not_supported?).and_return(false) + end + + it "should return true when threadsafe? is invoked" do + expect(subject.threadsafe?).to eql(true) + end + + it "should define a threadsafe_worker" do + expect(subject.send(:threadsafe_worker)).to eql(out_inst) + end + + it "should utilize threadsafe_multi_receive" do + expect(subject.send(:threadsafe_worker)).to receive(:multi_receive).with(events) + subject.multi_receive(events) + end + + it "should not utilize the worker queue" do + expect(subject.send(:worker_queue)).not_to receive(:pop) + subject.multi_receive(events) + end + + it "should send received events to the worker" do + expect(out_inst).to receive(:multi_receive).with(events) + subject.multi_receive(events) + end + end end end - it "should register all workers on register" do - expect(out_inst).to receive(:register) - subject.register + # This may seem suspiciously similar to the class in outputs/base_spec + # but, in fact, we need a whole new class because using this even once + # will immutably modify the base class + class LogStash::Outputs::NOOPDelLegacyNoWorkers < ::LogStash::Outputs::Base + LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason" + + def register + workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON) + end end - it "should close all workers when closing" do - expect(out_inst).to receive(:do_close) - subject.do_close + describe "legacy output workers_not_supported" do + let(:default_worker_count) { 2 } + let(:out_klass) { LogStash::Outputs::NOOPDelLegacyNoWorkers } + + it "should only setup one worker" do + expect(subject.worker_count).to eql(1) + end end end diff --git a/logstash-core/spec/logstash/outputs/base_spec.rb b/logstash-core/spec/logstash/outputs/base_spec.rb index 59c5a8919..44d49a60b 100644 --- a/logstash-core/spec/logstash/outputs/base_spec.rb +++ b/logstash-core/spec/logstash/outputs/base_spec.rb @@ -15,6 +15,14 @@ class LogStash::Outputs::NOOP < LogStash::Outputs::Base end end +class LogStash::Outputs::NOOPLegacyNoWorkers < ::LogStash::Outputs::Base + LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason" + + def register + workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON) + end +end + describe "LogStash::Outputs::Base#new" do it "should instantiate cleanly" do params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 } @@ -24,4 +32,9 @@ describe "LogStash::Outputs::Base#new" do LogStash::Outputs::NOOP.new(params.dup) end.not_to raise_error end + + it "should move workers_not_supported declarations up to the class level" do + LogStash::Outputs::NOOPLegacyNoWorkers.new.register + expect(LogStash::Outputs::NOOPLegacyNoWorkers.workers_not_supported?).to eql(true) + end end