prevent reloading of configurations containing non reloadable plugins

Fixes #4884
This commit is contained in:
Joao Duarte 2016-03-23 22:38:43 +00:00 committed by João Duarte
parent 15aed859de
commit 0793ea7b8b
4 changed files with 116 additions and 14 deletions

View file

@ -77,6 +77,12 @@ class LogStash::Agent
def register_pipeline(pipeline_id, settings)
pipeline = create_pipeline(settings.merge(:pipeline_id => pipeline_id, :metric => metric))
return unless pipeline.is_a?(LogStash::Pipeline)
if @auto_reload && pipeline.non_reloadable_plugins.any?
@logger.error(I18n.t("logstash.agent.non_reloadable_config_register"),
:pipeline_id => pipeline_id,
:plugins => pipeline.non_reloadable_plugins.map(&:class))
return
end
@pipelines[pipeline_id] = pipeline
end
@ -109,6 +115,12 @@ class LogStash::Agent
@node_uuid ||= SecureRandom.uuid
end
def running_pipelines?
@upgrade_mutex.synchronize do
@pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }.any?
end
end
private
def start_webserver
options = {:http_host => @web_api_http_host, :http_port => @web_api_http_port }
@ -183,6 +195,10 @@ class LogStash::Agent
if old_pipeline.config_str == new_pipeline.config_str
@logger.debug("no configuration change for pipeline",
:pipeline => id, :config => old_pipeline.config_str)
elsif new_pipeline.non_reloadable_plugins.any?
@logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"),
:pipeline_id => id,
:plugins => new_pipeline.non_reloadable_plugins.map(&:class))
else
@logger.warn("fetched new config for pipeline. upgrading..",
:pipeline => id, :config => new_pipeline.config_str)
@ -222,12 +238,6 @@ class LogStash::Agent
@pipelines.each { |id, _| stop_pipeline(id) }
end
def running_pipelines?
@upgrade_mutex.synchronize do
@pipelines.select {|pipeline_id, _| running_pipeline?(pipeline_id) }.any?
end
end
def running_pipeline?(pipeline_id)
thread = @pipelines[pipeline_id].thread
thread.is_a?(Thread) && thread.alive?

View file

@ -46,6 +46,10 @@ module LogStash; class Pipeline
}
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
RELOAD_INCOMPATIBLE_PLUGINS = [
"LogStash::Inputs::Stdin"
]
def self.validate_config(config_str, settings = {})
begin
# There should be a better way to test this
@ -546,4 +550,11 @@ module LogStash; class Pipeline
.each {|t| t.delete("blocked_on") }
.each {|t| t.delete("status") }
end
def non_reloadable_plugins
(inputs + filters + outputs).select do |plugin|
RELOAD_INCOMPATIBLE_PLUGINS.include?(plugin.class.name)
end
end
end end

View file

@ -68,6 +68,11 @@ en:
data loss.
forced_sigint: >-
SIGINT received. Terminating immediately..
non_reloadable_config_reload: >-
Unable to reload configuration because it does not support dynamic reloading
non_reloadable_config_register: |-
Logstash was not able to load configuration since it does not support
dynamic reloading and -r or --auto-reload flag was enabled
web_api:
flag:
http_host: Web API binding host

View file

@ -19,8 +19,9 @@ describe LogStash::Agent do
describe "register_pipeline" do
let(:pipeline_id) { "main" }
let(:config_string) { "input { } filter { } output { }" }
let(:settings) { {
:config_string => "input { } filter { } output { }",
:config_string => config_string,
:pipeline_workers => 4
} }
@ -37,7 +38,7 @@ describe LogStash::Agent do
end
describe "#execute" do
let(:sample_config) { "input { generator { count => 100000 } } output { stdout { } }" }
let(:sample_config) { "input { generator { count => 100000 } } output { }" }
let(:config_file) { Stud::Temporary.pathname }
before :each do
@ -51,15 +52,21 @@ describe LogStash::Agent do
end
context "when auto_reload is false" do
let(:agent_args) { { :logger => logger, :auto_reload => false, :reload_interval => 0.01, :config_path => config_file } }
let(:agent_args) { { :logger => logger, :auto_reload => false } }
let(:pipeline_id) { "main" }
let(:pipeline_settings) { { :config_path => config_file } }
before :each do
allow(subject).to receive(:sleep)
allow(subject).to receive(:clean_state?).and_return(false)
allow(subject).to receive(:running_pipelines?).and_return(true)
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
end
context "if state is clean" do
before :each do
allow(subject).to receive(:running_pipelines?).and_return(true)
allow(subject).to receive(:sleep)
allow(subject).to receive(:clean_state?).and_return(false)
end
it "should not reload_state!" do
expect(subject).to_not receive(:reload_state!)
t = Thread.new { subject.execute }
@ -68,10 +75,49 @@ describe LogStash::Agent do
t.join
end
end
context "when calling reload_state!" do
context "with a config that contains reload incompatible plugins" do
let(:second_pipeline_config) { "input { stdin {} } filter { } output { }" }
it "does not reload if new config contains reload incompatible plugins" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to_not receive(:upgrade_pipeline)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.send(:reload_state!)
sleep 0.1
Stud.stop!(t)
t.join
end
end
context "with a config that does not contain reload incompatible plugins" do
let(:second_pipeline_config) { "input { generator { } } filter { } output { }" }
it "does not reload if new config contains reload incompatible plugins" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to receive(:upgrade_pipeline)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
subject.send(:reload_state!)
sleep 0.1
Stud.stop!(t)
t.join
end
end
end
end
context "when auto_reload is true" do
let(:agent_args) { { :logger => logger, :auto_reload => true, :reload_interval => 0.01 } }
let(:pipeline_id) { "main" }
let(:pipeline_settings) { { :config_path => config_file } }
before(:each) do
subject.register_pipeline(pipeline_id, pipeline_settings)
end
context "if state is clean" do
it "should periodically reload_state" do
allow(subject).to receive(:clean_state?).and_return(false)
@ -82,6 +128,36 @@ describe LogStash::Agent do
t.join
end
end
context "when calling reload_state!" do
context "with a config that contains reload incompatible plugins" do
let(:second_pipeline_config) { "input { stdin {} } filter { } output { }" }
it "does not reload if new config contains reload incompatible plugins" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to_not receive(:upgrade_pipeline)
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
sleep 0.1
Stud.stop!(t)
t.join
end
end
context "with a config that does not contain reload incompatible plugins" do
let(:second_pipeline_config) { "input { generator { } } filter { } output { }" }
it "does not reload if new config contains reload incompatible plugins" do
t = Thread.new { subject.execute }
sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready?
expect(subject).to receive(:upgrade_pipeline).at_least(2).times
File.open(config_file, "w") { |f| f.puts second_pipeline_config }
sleep 0.1
Stud.stop!(t)
t.join
end
end
end
end
end
@ -166,7 +242,7 @@ describe LogStash::Agent do
end
describe "#fetch_config" do
let(:file_config) { "input { generator { count => 100 } } output { stdout { } }" }
let(:file_config) { "input { generator { count => 100 } } output { }" }
let(:cli_config) { "filter { drop { } } " }
let(:tmp_config_path) { Stud::Temporary.pathname }
let(:agent_args) { { :logger => logger, :config_string => "filter { drop { } } ", :config_path => tmp_config_path } }