mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
distinguish between user set and default pipeline settings
This commit is contained in:
parent
b55d307cde
commit
2952abedc6
2 changed files with 50 additions and 9 deletions
|
@ -24,17 +24,18 @@ class LogStash::Agent < Clamp::Command
|
||||||
option ["-w", "--pipeline-workers"], "COUNT",
|
option ["-w", "--pipeline-workers"], "COUNT",
|
||||||
I18n.t("logstash.runner.flag.pipeline-workers"),
|
I18n.t("logstash.runner.flag.pipeline-workers"),
|
||||||
:attribute_name => :pipeline_workers,
|
:attribute_name => :pipeline_workers,
|
||||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers] {|s| validate_positive_integer(s) }
|
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:default_pipeline_workers]
|
||||||
|
|
||||||
|
|
||||||
option ["-b", "--pipeline-batch-size"], "SIZE",
|
option ["-b", "--pipeline-batch-size"], "SIZE",
|
||||||
I18n.t("logstash.runner.flag.pipeline-batch-size"),
|
I18n.t("logstash.runner.flag.pipeline-batch-size"),
|
||||||
:attribute_name => :pipeline_batch_size,
|
:attribute_name => :pipeline_batch_size,
|
||||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size] {|s| validate_positive_integer(s) }
|
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_size]
|
||||||
|
|
||||||
option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
|
option ["-u", "--pipeline-batch-delay"], "DELAY_IN_MS",
|
||||||
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
|
I18n.t("logstash.runner.flag.pipeline-batch-delay"),
|
||||||
:attribute_name => :pipeline_batch_delay,
|
:attribute_name => :pipeline_batch_delay,
|
||||||
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay] {|s| validate_positive_integer(s) }
|
:default => LogStash::Pipeline::DEFAULT_SETTINGS[:pipeline_batch_delay]
|
||||||
|
|
||||||
option ["-l", "--log"], "FILE",
|
option ["-l", "--log"], "FILE",
|
||||||
I18n.t("logstash.agent.flag.log"),
|
I18n.t("logstash.agent.flag.log"),
|
||||||
|
@ -66,6 +67,23 @@ class LogStash::Agent < Clamp::Command
|
||||||
:attribute_name => :unsafe_shutdown,
|
:attribute_name => :unsafe_shutdown,
|
||||||
:default => false
|
:default => false
|
||||||
|
|
||||||
|
def initialize(*args)
|
||||||
|
super(*args)
|
||||||
|
@pipeline_settings ||= { :pipeline_id => "base" }
|
||||||
|
end
|
||||||
|
|
||||||
|
def pipeline_workers=(pipeline_workers_value)
|
||||||
|
@pipeline_settings[:pipeline_workers] = validate_positive_integer(pipeline_workers_value)
|
||||||
|
end
|
||||||
|
|
||||||
|
def pipeline_batch_size=(pipeline_batch_size_value)
|
||||||
|
@pipeline_settings[:pipeline_batch_size] = validate_positive_integer(pipeline_batch_size_value)
|
||||||
|
end
|
||||||
|
|
||||||
|
def pipeline_batch_delay=(pipeline_batch_delay_value)
|
||||||
|
@pipeline_settings[:pipeline_batch_delay] = validate_positive_integer(pipeline_batch_delay_value)
|
||||||
|
end
|
||||||
|
|
||||||
def validate_positive_integer(str_arg)
|
def validate_positive_integer(str_arg)
|
||||||
int_arg = str_arg.to_i
|
int_arg = str_arg.to_i
|
||||||
if str_arg !~ /^\d+$/ || int_arg < 1
|
if str_arg !~ /^\d+$/ || int_arg < 1
|
||||||
|
@ -141,12 +159,7 @@ class LogStash::Agent < Clamp::Command
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
begin
|
||||||
pipeline = LogStash::Pipeline.new(@config_string, {
|
pipeline = LogStash::Pipeline.new(@config_string, @pipeline_settings)
|
||||||
:pipeline_workers => pipeline_workers,
|
|
||||||
:pipeline_batch_size => pipeline_batch_size,
|
|
||||||
:pipeline_batch_delay => pipeline_batch_delay,
|
|
||||||
:pipeline_id => "base"
|
|
||||||
})
|
|
||||||
rescue LoadError => e
|
rescue LoadError => e
|
||||||
fail("Configuration problem.")
|
fail("Configuration problem.")
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
require "spec_helper"
|
require "spec_helper"
|
||||||
require "logstash/runner"
|
require "logstash/runner"
|
||||||
require "stud/task"
|
require "stud/task"
|
||||||
|
require "stud/trap"
|
||||||
|
|
||||||
class NullRunner
|
class NullRunner
|
||||||
def run(args); end
|
def run(args); end
|
||||||
|
@ -35,6 +36,33 @@ describe LogStash::Runner do
|
||||||
args = ["welp"]
|
args = ["welp"]
|
||||||
expect(subject.run(args).wait).to eq(1)
|
expect(subject.run(args).wait).to eq(1)
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "pipeline settings" do
|
||||||
|
let(:pipeline_string) { "input { stdin {} } output { stdout {} }" }
|
||||||
|
let(:base_pipeline_settings) { { :pipeline_id => "base" } }
|
||||||
|
let(:pipeline) { double("pipeline") }
|
||||||
|
|
||||||
|
before(:each) do
|
||||||
|
task = Stud::Task.new { 1 }
|
||||||
|
allow(pipeline).to receive(:run).and_return(task)
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when pipeline workers is not defined by the user" do
|
||||||
|
it "should not pass the value to the pipeline" do
|
||||||
|
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline)
|
||||||
|
args = ["agent", "-e", pipeline_string]
|
||||||
|
subject.run(args).wait
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context "when pipeline workers is defined by the user" do
|
||||||
|
it "should pass the value to the pipeline" do
|
||||||
|
base_pipeline_settings[:pipeline_workers] = 2
|
||||||
|
expect(LogStash::Pipeline).to receive(:new).with(pipeline_string, base_pipeline_settings).and_return(pipeline)
|
||||||
|
args = ["agent", "-w", "2", "-e", pipeline_string]
|
||||||
|
subject.run(args).wait
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue