Warn on potential memory issues when a large number of inflight messages are present

Fixes #4410
This commit is contained in:
Andrew Cholakian 2015-12-31 16:47:20 -06:00
parent 90a12e79b6
commit e5cc9c407d
2 changed files with 38 additions and 2 deletions

View file

@ -17,7 +17,7 @@ require "logstash/pipeline_reporter"
require "logstash/output_delegator"
module LogStash; class Pipeline
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id
attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger
DEFAULT_SETTINGS = {
:default_pipeline_workers => LogStash::Config::CpuCoreStrategy.fifty_percent,
@ -26,6 +26,7 @@ module LogStash; class Pipeline
:flush_interval => 5, # in seconds
:flush_timeout_interval => 60 # in seconds
}
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
def initialize(config_str, settings = {})
@pipeline_id = settings[:pipeline_id] || self.object_id
@ -157,11 +158,16 @@ module LogStash; class Pipeline
pipeline_workers = safe_pipeline_worker_count
batch_size = @settings[:pipeline_batch_size]
batch_delay = @settings[:pipeline_batch_delay]
max_inflight = batch_size * pipeline_workers
@logger.info("Starting pipeline",
:id => self.pipeline_id,
:pipeline_workers => pipeline_workers,
:batch_size => batch_size,
:batch_delay => batch_delay)
:batch_delay => batch_delay,
:max_inflight => max_inflight)
if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
@logger.warn "CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})"
end
pipeline_workers.times do |t|
@worker_threads << Thread.new do

View file

@ -276,6 +276,36 @@ describe LogStash::Pipeline do
end
end
describe "max inflight warning" do
let(:config) { "input { dummyinput {} } output { dummyoutput {} }" }
let(:batch_size) { 1 }
let(:pipeline) { LogStash::Pipeline.new(config, :pipeline_batch_size => batch_size, :pipeline_workers => 1) }
let(:logger) { pipeline.logger }
let(:warning_prefix) { /CAUTION: Recommended inflight events max exceeded!/ }
before(:each) do
allow(LogStash::Plugin).to receive(:lookup).with("input", "dummyinput").and_return(DummyInput)
allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec)
allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(DummyOutput)
allow(logger).to receive(:warn)
thread = Thread.new { pipeline.run }
pipeline.shutdown
thread.join
end
it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do
expect(logger).not_to have_received(:warn).with(warning_prefix)
end
context "with a too large inflight count" do
let(:batch_size) { LogStash::Pipeline::MAX_INFLIGHT_WARN_THRESHOLD + 1 }
it "should raise a max inflight warning if the max_inflight count is exceeded" do
expect(logger).to have_received(:warn).with(warning_prefix)
end
end
end
context "compiled filter funtions" do
context "new events should propagate down the filters" do