diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 32ad2ba8b..0a42eeae9 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -88,9 +88,9 @@ class LogStash::Agent # @param pipeline_id [String] pipeline string identifier # @param settings [Hash] settings that will be passed when creating the pipeline. # keys should be symbols such as :pipeline_workers and :pipeline_batch_delay - def register_pipeline(pipeline_id, settings = @settings) + def register_pipeline(settings) pipeline_settings = settings.clone - pipeline_settings.set("pipeline.id", pipeline_id) + pipeline_id = pipeline_settings.get("pipeline.id") pipeline = create_pipeline(pipeline_settings) return unless pipeline.is_a?(LogStash::Pipeline) diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 78d291829..924d42a8f 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -261,7 +261,7 @@ class LogStash::Runner < Clamp::StrictCommand @agent = create_agent(@settings) - @agent.register_pipeline("main", @settings) + @agent.register_pipeline(@settings) # enable sigint/sigterm before starting the agent # to properly handle a stalled agent diff --git a/logstash-core/spec/api/spec_helper.rb b/logstash-core/spec/api/spec_helper.rb index bcb37ecfc..a984f13cd 100644 --- a/logstash-core/spec/api/spec_helper.rb +++ b/logstash-core/spec/api/spec_helper.rb @@ -59,7 +59,7 @@ class LogStashRunner def start # We start a pipeline that will generate a finite number of events # before starting the expectations - agent.register_pipeline("main", @settings) + agent.register_pipeline(@settings) @agent_task = Stud::Task.new { agent.execute } @agent_task.wait end diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 699eecd9d..27be48cd7 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -9,6 +9,7 @@ require_relative "../support/helpers" describe LogStash::Agent do let(:agent_settings) { LogStash::SETTINGS } + let(:default_pipeline_id) { LogStash::SETTINGS.get("pipeline.id") } let(:agent_args) { {} } let(:pipeline_settings) { agent_settings.clone } let(:pipeline_args) { {} } @@ -41,7 +42,6 @@ describe LogStash::Agent do end describe "register_pipeline" do - let(:pipeline_id) { "main" } let(:config_string) { "input { } filter { } output { }" } let(:agent_args) do { @@ -57,7 +57,7 @@ describe LogStash::Agent do expect(arg1).to eq(config_string) expect(arg2.to_hash).to include(agent_args) end - subject.register_pipeline(pipeline_id, agent_settings) + subject.register_pipeline(agent_settings) end end @@ -90,10 +90,9 @@ describe LogStash::Agent do "path.config" => config_file } end - let(:pipeline_id) { "main" } before(:each) do - subject.register_pipeline(pipeline_id, pipeline_settings) + subject.register_pipeline(pipeline_settings) end context "if state is clean" do @@ -195,10 +194,9 @@ describe LogStash::Agent do "path.config" => config_file, } end - let(:pipeline_id) { "main" } before(:each) do - subject.register_pipeline(pipeline_id, pipeline_settings) + subject.register_pipeline(pipeline_settings) end context "if state is clean" do @@ -249,7 +247,6 @@ describe LogStash::Agent do end describe "#reload_state!" do - let(:pipeline_id) { "main" } let(:first_pipeline_config) { "input { } filter { } output { }" } let(:second_pipeline_config) { "input { generator {} } filter { } output { }" } let(:pipeline_args) { { @@ -259,13 +256,13 @@ describe LogStash::Agent do } } before(:each) do - subject.register_pipeline(pipeline_id, pipeline_settings) + subject.register_pipeline(pipeline_settings) end context "when fetching a new state" do it "upgrades the state" do expect(subject).to receive(:fetch_config).and_return(second_pipeline_config) - expect(subject).to receive(:upgrade_pipeline).with(pipeline_id, kind_of(LogStash::Pipeline)) + expect(subject).to receive(:upgrade_pipeline).with(default_pipeline_id, kind_of(LogStash::Pipeline)) subject.reload_state! end end @@ -285,7 +282,6 @@ describe LogStash::Agent do "config.reload.interval" => 0.01, "config.string" => pipeline_config } } - let(:pipeline_id) { "main" } context "environment variable templating" do before :each do @@ -299,14 +295,13 @@ describe LogStash::Agent do it "doesn't upgrade the state" do allow(subject).to receive(:fetch_config).and_return(pipeline_config) - subject.register_pipeline(pipeline_id, pipeline_settings) - expect(subject.pipelines[pipeline_id].inputs.first.message).to eq("foo-bar") + subject.register_pipeline(pipeline_settings) + expect(subject.pipelines[default_pipeline_id].inputs.first.message).to eq("foo-bar") end end end describe "#upgrade_pipeline" do - let(:pipeline_id) { "main" } let(:pipeline_config) { "input { } filter { } output { }" } let(:pipeline_args) { { "config.string" => pipeline_config, @@ -315,7 +310,7 @@ describe LogStash::Agent do let(:new_pipeline_config) { "input { generator {} } output { }" } before(:each) do - subject.register_pipeline(pipeline_id, pipeline_settings) + subject.register_pipeline(pipeline_settings) end after(:each) do @@ -330,14 +325,14 @@ describe LogStash::Agent do end it "leaves the state untouched" do - subject.send(:"reload_pipeline!", pipeline_id) - expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config) + subject.send(:"reload_pipeline!", default_pipeline_id) + expect(subject.pipelines[default_pipeline_id].config_str).to eq(pipeline_config) end context "and current state is empty" do it "should not start a pipeline" do expect(subject).to_not receive(:start_pipeline) - subject.send(:"reload_pipeline!", pipeline_id) + subject.send(:"reload_pipeline!", default_pipeline_id) end end end @@ -350,13 +345,13 @@ describe LogStash::Agent do allow(subject).to receive(:start_pipeline) end it "updates the state" do - subject.send(:"reload_pipeline!", pipeline_id) - expect(subject.pipelines[pipeline_id].config_str).to eq(new_config) + subject.send(:"reload_pipeline!", default_pipeline_id) + expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config) end it "starts the pipeline" do expect(subject).to receive(:stop_pipeline) expect(subject).to receive(:start_pipeline) - subject.send(:"reload_pipeline!", pipeline_id) + subject.send(:"reload_pipeline!", default_pipeline_id) end end end @@ -430,7 +425,7 @@ describe LogStash::Agent do Thread.abort_on_exception = true @t = Thread.new do - subject.register_pipeline("main", pipeline_settings) + subject.register_pipeline(pipeline_settings) subject.execute end