ensure pipeline.id is correctly propagated

Fixes #6530
This commit is contained in:
Joao Duarte 2017-01-12 16:25:14 +00:00 committed by João Duarte
parent 903cdd6331
commit a2158e5608
4 changed files with 20 additions and 25 deletions

View file

@ -88,9 +88,9 @@ class LogStash::Agent
# @param pipeline_id [String] pipeline string identifier
# @param settings [Hash] settings that will be passed when creating the pipeline.
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
def register_pipeline(pipeline_id, settings = @settings)
def register_pipeline(settings)
pipeline_settings = settings.clone
pipeline_settings.set("pipeline.id", pipeline_id)
pipeline_id = pipeline_settings.get("pipeline.id")
pipeline = create_pipeline(pipeline_settings)
return unless pipeline.is_a?(LogStash::Pipeline)

View file

@ -261,7 +261,7 @@ class LogStash::Runner < Clamp::StrictCommand
@agent = create_agent(@settings)
@agent.register_pipeline("main", @settings)
@agent.register_pipeline(@settings)
# enable sigint/sigterm before starting the agent
# to properly handle a stalled agent

View file

@ -59,7 +59,7 @@ class LogStashRunner
def start
# We start a pipeline that will generate a finite number of events
# before starting the expectations
agent.register_pipeline("main", @settings)
agent.register_pipeline(@settings)
@agent_task = Stud::Task.new { agent.execute }
@agent_task.wait
end

View file

@ -9,6 +9,7 @@ require_relative "../support/helpers"
describe LogStash::Agent do
let(:agent_settings) { LogStash::SETTINGS }
let(:default_pipeline_id) { LogStash::SETTINGS.get("pipeline.id") }
let(:agent_args) { {} }
let(:pipeline_settings) { agent_settings.clone }
let(:pipeline_args) { {} }
@ -41,7 +42,6 @@ describe LogStash::Agent do
end
describe "register_pipeline" do
let(:pipeline_id) { "main" }
let(:config_string) { "input { } filter { } output { }" }
let(:agent_args) do
{
@ -57,7 +57,7 @@ describe LogStash::Agent do
expect(arg1).to eq(config_string)
expect(arg2.to_hash).to include(agent_args)
end
subject.register_pipeline(pipeline_id, agent_settings)
subject.register_pipeline(agent_settings)
end
end
@ -90,10 +90,9 @@ describe LogStash::Agent do
"path.config" => config_file
}
end
let(:pipeline_id) { "main" }
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
subject.register_pipeline(pipeline_settings)
end
context "if state is clean" do
@ -195,10 +194,9 @@ describe LogStash::Agent do
"path.config" => config_file,
}
end
let(:pipeline_id) { "main" }
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
subject.register_pipeline(pipeline_settings)
end
context "if state is clean" do
@ -249,7 +247,6 @@ describe LogStash::Agent do
end
describe "#reload_state!" do
let(:pipeline_id) { "main" }
let(:first_pipeline_config) { "input { } filter { } output { }" }
let(:second_pipeline_config) { "input { generator {} } filter { } output { }" }
let(:pipeline_args) { {
@ -259,13 +256,13 @@ describe LogStash::Agent do
} }
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
subject.register_pipeline(pipeline_settings)
end
context "when fetching a new state" do
it "upgrades the state" do
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
expect(subject).to receive(:upgrade_pipeline).with(pipeline_id, kind_of(LogStash::Pipeline))
expect(subject).to receive(:upgrade_pipeline).with(default_pipeline_id, kind_of(LogStash::Pipeline))
subject.reload_state!
end
end
@ -285,7 +282,6 @@ describe LogStash::Agent do
"config.reload.interval" => 0.01,
"config.string" => pipeline_config
} }
let(:pipeline_id) { "main" }
context "environment variable templating" do
before :each do
@ -299,14 +295,13 @@ describe LogStash::Agent do
it "doesn't upgrade the state" do
allow(subject).to receive(:fetch_config).and_return(pipeline_config)
subject.register_pipeline(pipeline_id, pipeline_settings)
expect(subject.pipelines[pipeline_id].inputs.first.message).to eq("foo-bar")
subject.register_pipeline(pipeline_settings)
expect(subject.pipelines[default_pipeline_id].inputs.first.message).to eq("foo-bar")
end
end
end
describe "#upgrade_pipeline" do
let(:pipeline_id) { "main" }
let(:pipeline_config) { "input { } filter { } output { }" }
let(:pipeline_args) { {
"config.string" => pipeline_config,
@ -315,7 +310,7 @@ describe LogStash::Agent do
let(:new_pipeline_config) { "input { generator {} } output { }" }
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
subject.register_pipeline(pipeline_settings)
end
after(:each) do
@ -330,14 +325,14 @@ describe LogStash::Agent do
end
it "leaves the state untouched" do
subject.send(:"reload_pipeline!", pipeline_id)
expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config)
subject.send(:"reload_pipeline!", default_pipeline_id)
expect(subject.pipelines[default_pipeline_id].config_str).to eq(pipeline_config)
end
context "and current state is empty" do
it "should not start a pipeline" do
expect(subject).to_not receive(:start_pipeline)
subject.send(:"reload_pipeline!", pipeline_id)
subject.send(:"reload_pipeline!", default_pipeline_id)
end
end
end
@ -350,13 +345,13 @@ describe LogStash::Agent do
allow(subject).to receive(:start_pipeline)
end
it "updates the state" do
subject.send(:"reload_pipeline!", pipeline_id)
expect(subject.pipelines[pipeline_id].config_str).to eq(new_config)
subject.send(:"reload_pipeline!", default_pipeline_id)
expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config)
end
it "starts the pipeline" do
expect(subject).to receive(:stop_pipeline)
expect(subject).to receive(:start_pipeline)
subject.send(:"reload_pipeline!", pipeline_id)
subject.send(:"reload_pipeline!", default_pipeline_id)
end
end
end
@ -430,7 +425,7 @@ describe LogStash::Agent do
Thread.abort_on_exception = true
@t = Thread.new do
subject.register_pipeline("main", pipeline_settings)
subject.register_pipeline(pipeline_settings)
subject.execute
end