diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 8d148bc41..e7d7a8952 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -17,7 +17,7 @@ require "logstash/pipeline_reporter" require "logstash/output_delegator" module LogStash; class Pipeline - attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id + attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger DEFAULT_SETTINGS = { :default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent, @@ -26,6 +26,7 @@ module LogStash; class Pipeline :flush_interval => 5, # in seconds :flush_timeout_interval => 60 # in seconds } + MAX_INFLIGHT_WARN_THRESHOLD = 10_000 def initialize(config_str, settings = {}) @pipeline_id = settings[:pipeline_id] || self.object_id @@ -157,11 +158,16 @@ module LogStash; class Pipeline pipeline_workers = safe_pipeline_worker_count batch_size = @settings[:pipeline_batch_size] batch_delay = @settings[:pipeline_batch_delay] + max_inflight = batch_size * pipeline_workers @logger.info("Starting pipeline", :id => self.pipeline_id, :pipeline_workers => pipeline_workers, :batch_size => batch_size, - :batch_delay => batch_delay) + :batch_delay => batch_delay, + :max_inflight => max_inflight) + if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD + @logger.warn "CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})" + end pipeline_workers.times do |t| @worker_threads << Thread.new do diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index b410816ef..43926eecd 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -276,6 +276,36 @@ describe LogStash::Pipeline do end end + describe "max inflight warning" do + let(:config) { "input { dummyinput {} } output { dummyoutput {} }" } + let(:batch_size) { 1 } + let(:pipeline) { LogStash::Pipeline.new(config, :pipeline_batch_size => batch_size, :pipeline_workers => 1) } + let(:logger) { pipeline.logger } + let(:warning_prefix) { /CAUTION: Recommended inflight events max exceeded!/ } + + before(:each) do + allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput) + allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec) + allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput) + allow(logger).to receive(:warn) + thread = Thread.new { pipeline.run } + pipeline.shutdown + thread.join + end + + it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do + expect(logger).not_to have_received(:warn).with(warning_prefix) + end + + context "with a too large inflight count" do + let(:batch_size) { LogStash::Pipeline::MAX_INFLIGHT_WARN_THRESHOLD + 1 } + + it "should raise a max inflight warning if the max_inflight count is exceeded" do + expect(logger).to have_received(:warn).with(warning_prefix) + end + end + end + context "compiled filter funtions" do context "new events should propagate down the filters" do