mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
44fa63eef8
commit
9622f9b5b1
2 changed files with 50 additions and 10 deletions
|
@ -97,7 +97,8 @@ class LogStash::Agent
|
|||
|
||||
def reload_state!
|
||||
@upgrade_mutex.synchronize do
|
||||
@pipelines.each do |pipeline_id, _|
|
||||
@pipelines.each do |pipeline_id, pipeline|
|
||||
next if pipeline.settings.get("config.reload.automatic") == false
|
||||
begin
|
||||
reload_pipeline!(pipeline_id)
|
||||
rescue => e
|
||||
|
|
|
@ -94,7 +94,7 @@ describe LogStash::Agent do
|
|||
end
|
||||
end
|
||||
|
||||
context "when calling reload_state!" do
|
||||
context "when calling reload_pipeline!" do
|
||||
context "with a config that contains reload incompatible plugins" do
|
||||
let(:second_pipeline_config) { "input { stdin {} } filter { } output { }" }
|
||||
|
||||
|
@ -103,7 +103,7 @@ describe LogStash::Agent do
|
|||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to_not receive(:upgrade_pipeline)
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
@ -119,6 +119,44 @@ describe LogStash::Agent do
|
|||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to receive(:upgrade_pipeline).once.and_call_original
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
||||
subject.shutdown
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
context "when calling reload_state!" do
|
||||
context "with a pipeline with auto reloading turned off" do
|
||||
let(:second_pipeline_config) { "input { generator { } } filter { } output { }" }
|
||||
let(:pipeline_args) { { "config.reload.automatic" => false } }
|
||||
|
||||
it "does not try to reload the pipeline" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to_not receive(:reload_pipeline!)
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.reload_state!
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
t.join
|
||||
|
||||
subject.shutdown
|
||||
end
|
||||
end
|
||||
|
||||
context "with a pipeline with auto reloading turned on" do
|
||||
let(:second_pipeline_config) { "input { generator { } } filter { } output { }" }
|
||||
let(:pipeline_args) { { "config.reload.automatic" => true } }
|
||||
|
||||
it "tries to reload the pipeline" do
|
||||
t = Thread.new { subject.execute }
|
||||
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
|
||||
expect(subject).to receive(:reload_pipeline!).once.and_call_original
|
||||
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
|
||||
subject.reload_state!
|
||||
sleep 0.1
|
||||
Stud.stop!(t)
|
||||
|
@ -197,7 +235,8 @@ describe LogStash::Agent do
|
|||
let(:second_pipeline_config) { "input { generator {} } filter { } output { }" }
|
||||
let(:pipeline_args) { {
|
||||
"config.string" => first_pipeline_config,
|
||||
"pipeline.workers" => 4
|
||||
"pipeline.workers" => 4,
|
||||
"config.reload.automatic" => true
|
||||
} }
|
||||
|
||||
before(:each) do
|
||||
|
@ -272,14 +311,14 @@ describe LogStash::Agent do
|
|||
end
|
||||
|
||||
it "leaves the state untouched" do
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", pipeline_id)
|
||||
expect(subject.pipelines[pipeline_id].config_str).to eq(pipeline_config)
|
||||
end
|
||||
|
||||
context "and current state is empty" do
|
||||
it "should not start a pipeline" do
|
||||
expect(subject).to_not receive(:start_pipeline)
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", pipeline_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -292,13 +331,13 @@ describe LogStash::Agent do
|
|||
allow(subject).to receive(:start_pipeline)
|
||||
end
|
||||
it "updates the state" do
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", pipeline_id)
|
||||
expect(subject.pipelines[pipeline_id].config_str).to eq(new_config)
|
||||
end
|
||||
it "starts the pipeline" do
|
||||
expect(subject).to receive(:stop_pipeline)
|
||||
expect(subject).to receive(:start_pipeline)
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", pipeline_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -406,7 +445,7 @@ describe LogStash::Agent do
|
|||
f.fsync
|
||||
end
|
||||
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
|
||||
# wait until pipeline restarts
|
||||
sleep(0.01) until dummy_output2.events.size > 0
|
||||
|
@ -471,7 +510,7 @@ describe LogStash::Agent do
|
|||
f.fsync
|
||||
end
|
||||
|
||||
subject.reload_state!
|
||||
subject.send(:"reload_pipeline!", "main")
|
||||
end
|
||||
|
||||
it "does not increase the successful reload count" do
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue