From 2952abedc656ad88b0ed86427f4557d1e41470a8 Mon Sep 17 00:00:00 2001 From: Joao Duarte Date: Tue, 12 Jan 2016 16:27:59 +0000 Subject: [PATCH] distinguish between user set and default pipeline settings --- logstash-core/lib/logstash/agent.rb | 31 +++++++++++++++------- logstash-core/spec/logstash/runner_spec.rb | 28 +++++++++++++++++++ 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index de0b9ca8c..06b096b1c 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -24,17 +24,18 @@ class LogStash::Agent < Clamp::Command option ["-w", "--pipeline-workers"], "COUNT", I18n.t("logstash.runner.flag.pipeline-workers"), :attribute_name => :pipeline_workers, - :default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] {|s| validate_positive_integer(s) } + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] + option ["-b", "--pipeline-batch-size"], "SIZE", I18n.t("logstash.runner.flag.pipeline-batch-size"), :attribute_name => :pipeline_batch_size, - :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size] {|s| validate_positive_integer(s) } + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size] option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS", I18n.t("logstash.runner.flag.pipeline-batch-delay"), :attribute_name => :pipeline_batch_delay, - :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay] {|s| validate_positive_integer(s) } + :default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay] option ["-l", "--log"], "FILE", I18n.t("logstash.agent.flag.log"), @@ -66,6 +67,23 @@ class LogStash::Agent < Clamp::Command :attribute_name => :unsafe_shutdown, :default => false + def initialize(*args) + super(*args) + @pipeline_settings ||= { :pipeline_id => "base" } + end + + def pipeline_workers=(pipeline_workers_value) + @pipeline_settings[:pipeline_workers] = validate_positive_integer(pipeline_workers_value) + end + + def pipeline_batch_size=(pipeline_batch_size_value) + @pipeline_settings[:pipeline_batch_size] = validate_positive_integer(pipeline_batch_size_value) + end + + def pipeline_batch_delay=(pipeline_batch_delay_value) + @pipeline_settings[:pipeline_batch_delay] = validate_positive_integer(pipeline_batch_delay_value) + end + def validate_positive_integer(str_arg) int_arg = str_arg.to_i if str_arg !~ /^\d+$/ || int_arg < 1 @@ -141,12 +159,7 @@ class LogStash::Agent < Clamp::Command end begin - pipeline = LogStash::Pipeline.new(@config_string, { - :pipeline_workers => pipeline_workers, - :pipeline_batch_size => pipeline_batch_size, - :pipeline_batch_delay => pipeline_batch_delay, - :pipeline_id => "base" - }) + pipeline = LogStash::Pipeline.new(@config_string, @pipeline_settings) rescue LoadError => e fail("Configuration problem.") end diff --git a/logstash-core/spec/logstash/runner_spec.rb b/logstash-core/spec/logstash/runner_spec.rb index b61cab7bf..cc9415be0 100644 --- a/logstash-core/spec/logstash/runner_spec.rb +++ b/logstash-core/spec/logstash/runner_spec.rb @@ -2,6 +2,7 @@ require "spec_helper" require "logstash/runner" require "stud/task" +require "stud/trap" class NullRunner def run(args); end @@ -35,6 +36,33 @@ describe LogStash::Runner do args = ["welp"] expect(subject.run(args).wait).to eq(1) end + end + describe "pipeline settings" do + let(:pipeline_string) { "input { stdin {} } output { stdout {} }" } + let(:base_pipeline_settings) { { :pipeline_id => "base" } } + let(:pipeline) { double("pipeline") } + + before(:each) do + task = Stud::Task.new { 1 } + allow(pipeline).to receive(:run).and_return(task) + end + + context "when pipeline workers is not defined by the user" do + it "should not pass the value to the pipeline" do + expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline) + args = ["agent", "-e", pipeline_string] + subject.run(args).wait + end + end + + context "when pipeline workers is defined by the user" do + it "should pass the value to the pipeline" do + base_pipeline_settings[:pipeline_workers] = 2 + expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline) + args = ["agent", "-w", "2", "-e", pipeline_string] + subject.run(args).wait + end + end end end