mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
b4c7c97452
commit
d6d0c672ae
4 changed files with 20 additions and 25 deletions
|
@ -88,9 +88,9 @@ class LogStash::Agent
|
||||||
# @param pipeline_id [String] pipeline string identifier
|
# @param pipeline_id [String] pipeline string identifier
|
||||||
# @param settings [Hash] settings that will be passed when creating the pipeline.
|
# @param settings [Hash] settings that will be passed when creating the pipeline.
|
||||||
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
|
# keys should be symbols such as :pipeline_workers and :pipeline_batch_delay
|
||||||
def register_pipeline(pipeline_id, settings = @settings)
|
def register_pipeline(settings)
|
||||||
pipeline_settings = settings.clone
|
pipeline_settings = settings.clone
|
||||||
pipeline_settings.set("pipeline.id", pipeline_id)
|
pipeline_id = pipeline_settings.get("pipeline.id")
|
||||||
|
|
||||||
pipeline = create_pipeline(pipeline_settings)
|
pipeline = create_pipeline(pipeline_settings)
|
||||||
return unless pipeline.is_a?(LogStash::Pipeline)
|
return unless pipeline.is_a?(LogStash::Pipeline)
|
||||||
|
|
|
@ -261,7 +261,7 @@ class LogStash::Runner < Clamp::StrictCommand
|
||||||
|
|
||||||
@agent = create_agent(@settings)
|
@agent = create_agent(@settings)
|
||||||
|
|
||||||
@agent.register_pipeline("main", @settings)
|
@agent.register_pipeline(@settings)
|
||||||
|
|
||||||
# enable sigint/sigterm before starting the agent
|
# enable sigint/sigterm before starting the agent
|
||||||
# to properly handle a stalled agent
|
# to properly handle a stalled agent
|
||||||
|
|
|
@ -59,7 +59,7 @@ class LogStashRunner
|
||||||
def start
|
def start
|
||||||
# We start a pipeline that will generate a finite number of events
|
# We start a pipeline that will generate a finite number of events
|
||||||
# before starting the expectations
|
# before starting the expectations
|
||||||
agent.register_pipeline("main", @settings)
|
agent.register_pipeline(@settings)
|
||||||
@agent_task = Stud::Task.new { agent.execute }
|
@agent_task = Stud::Task.new { agent.execute }
|
||||||
@agent_task.wait
|
@agent_task.wait
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,6 +9,7 @@ require_relative "../support/helpers"
|
||||||
describe LogStash::Agent do
|
describe LogStash::Agent do
|
||||||
|
|
||||||
let(:agent_settings) { LogStash::SETTINGS }
|
let(:agent_settings) { LogStash::SETTINGS }
|
||||||
|
let(:default_pipeline_id) { LogStash::SETTINGS.get("pipeline.id") }
|
||||||
let(:agent_args) { {} }
|
let(:agent_args) { {} }
|
||||||
let(:pipeline_settings) { agent_settings.clone }
|
let(:pipeline_settings) { agent_settings.clone }
|
||||||
let(:pipeline_args) { {} }
|
let(:pipeline_args) { {} }
|
||||||
|
@ -41,7 +42,6 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "register_pipeline" do
|
describe "register_pipeline" do
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
let(:config_string) { "input { } filter { } output { }" }
|
let(:config_string) { "input { } filter { } output { }" }
|
||||||
let(:agent_args) do
|
let(:agent_args) do
|
||||||
{
|
{
|
||||||
|
@ -57,7 +57,7 @@ describe LogStash::Agent do
|
||||||
expect(arg1).to eq(config_string)
|
expect(arg1).to eq(config_string)
|
||||||
expect(arg2.to_hash).to include(agent_args)
|
expect(arg2.to_hash).to include(agent_args)
|
||||||
end
|
end
|
||||||
subject.register_pipeline(pipeline_id, agent_settings)
|
subject.register_pipeline(agent_settings)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -90,10 +90,9 @@ describe LogStash::Agent do
|
||||||
"path.config" => config_file
|
"path.config" => config_file
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
end
|
end
|
||||||
|
|
||||||
context "if state is clean" do
|
context "if state is clean" do
|
||||||
|
@ -195,10 +194,9 @@ describe LogStash::Agent do
|
||||||
"path.config" => config_file,
|
"path.config" => config_file,
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
end
|
end
|
||||||
|
|
||||||
context "if state is clean" do
|
context "if state is clean" do
|
||||||
|
@ -249,7 +247,6 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#reload_state!" do
|
describe "#reload_state!" do
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
let(:first_pipeline_config) { "input { } filter { } output { }" }
|
let(:first_pipeline_config) { "input { } filter { } output { }" }
|
||||||
let(:second_pipeline_config) { "input { generator {} } filter { } output { }" }
|
let(:second_pipeline_config) { "input { generator {} } filter { } output { }" }
|
||||||
let(:pipeline_args) { {
|
let(:pipeline_args) { {
|
||||||
|
@ -259,13 +256,13 @@ describe LogStash::Agent do
|
||||||
} }
|
} }
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when fetching a new state" do
|
context "when fetching a new state" do
|
||||||
it "upgrades the state" do
|
it "upgrades the state" do
|
||||||
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
|
expect(subject).to receive(:fetch_config).and_return(second_pipeline_config)
|
||||||
expect(subject).to receive(:upgrade_pipeline).with(pipeline_id, kind_of(LogStash::Pipeline))
|
expect(subject).to receive(:upgrade_pipeline).with(default_pipeline_id, kind_of(LogStash::Pipeline))
|
||||||
subject.reload_state!
|
subject.reload_state!
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -285,7 +282,6 @@ describe LogStash::Agent do
|
||||||
"config.reload.interval" => 0.01,
|
"config.reload.interval" => 0.01,
|
||||||
"config.string" => pipeline_config
|
"config.string" => pipeline_config
|
||||||
} }
|
} }
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
|
|
||||||
context "environment variable templating" do
|
context "environment variable templating" do
|
||||||
before :each do
|
before :each do
|
||||||
|
@ -299,14 +295,13 @@ describe LogStash::Agent do
|
||||||
|
|
||||||
it "doesn't upgrade the state" do
|
it "doesn't upgrade the state" do
|
||||||
allow(subject).to receive(:fetch_config).and_return(pipeline_config)
|
allow(subject).to receive(:fetch_config).and_return(pipeline_config)
|
||||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
expect(subject.pipelines[pipeline_id].inputs.first.message).to eq("foo-bar")
|
expect(subject.pipelines[default_pipeline_id].inputs.first.message).to eq("foo-bar")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#upgrade_pipeline" do
|
describe "#upgrade_pipeline" do
|
||||||
let(:pipeline_id) { "main" }
|
|
||||||
let(:pipeline_config) { "input { } filter { } output { }" }
|
let(:pipeline_config) { "input { } filter { } output { }" }
|
||||||
let(:pipeline_args) { {
|
let(:pipeline_args) { {
|
||||||
"config.string" => pipeline_config,
|
"config.string" => pipeline_config,
|
||||||
|
@ -315,7 +310,7 @@ describe LogStash::Agent do
|
||||||
let(:new_pipeline_config) { "input { generator {} } output { }" }
|
let(:new_pipeline_config) { "input { generator {} } output { }" }
|
||||||
|
|
||||||
before(:each) do
|
before(:each) do
|
||||||
subject.register_pipeline(pipeline_id, pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
end
|
end
|
||||||
|
|
||||||
after(:each) do
|
after(:each) do
|
||||||
|
@ -330,14 +325,14 @@ describe LogStash::Agent do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "leaves the state untouched" do
|
it "leaves the state untouched" do
|
||||||
subject.send(:"reload_pipeline!", pipeline_id)
|
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||||
expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config)
|
expect(subject.pipelines[default_pipeline_id].config_str).to eq(pipeline_config)
|
||||||
end
|
end
|
||||||
|
|
||||||
context "and current state is empty" do
|
context "and current state is empty" do
|
||||||
it "should not start a pipeline" do
|
it "should not start a pipeline" do
|
||||||
expect(subject).to_not receive(:start_pipeline)
|
expect(subject).to_not receive(:start_pipeline)
|
||||||
subject.send(:"reload_pipeline!", pipeline_id)
|
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -350,13 +345,13 @@ describe LogStash::Agent do
|
||||||
allow(subject).to receive(:start_pipeline)
|
allow(subject).to receive(:start_pipeline)
|
||||||
end
|
end
|
||||||
it "updates the state" do
|
it "updates the state" do
|
||||||
subject.send(:"reload_pipeline!", pipeline_id)
|
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||||
expect(subject.pipelines[pipeline_id].config_str).to eq(new_config)
|
expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config)
|
||||||
end
|
end
|
||||||
it "starts the pipeline" do
|
it "starts the pipeline" do
|
||||||
expect(subject).to receive(:stop_pipeline)
|
expect(subject).to receive(:stop_pipeline)
|
||||||
expect(subject).to receive(:start_pipeline)
|
expect(subject).to receive(:start_pipeline)
|
||||||
subject.send(:"reload_pipeline!", pipeline_id)
|
subject.send(:"reload_pipeline!", default_pipeline_id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -430,7 +425,7 @@ describe LogStash::Agent do
|
||||||
Thread.abort_on_exception = true
|
Thread.abort_on_exception = true
|
||||||
|
|
||||||
@t = Thread.new do
|
@t = Thread.new do
|
||||||
subject.register_pipeline("main", pipeline_settings)
|
subject.register_pipeline(pipeline_settings)
|
||||||
subject.execute
|
subject.execute
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue