Allow Finite pipeline to shutdown internal pipelines

With the creation of the x-pack we have added our first internal
pipeline, but if you were running the monitoring pipeline with a
finite pipeline (LIKE generator count => X) when the finite has
completed processing all the events logstash would refuse to stop.

This PR fixes the problem by adding a new pipeline settings called
system in the shutdown loop we will check if all the user defined
pipeline are completed if its the case we will shutdown any internal
pipeline and logtash will stop gracefully.

Fixes #6943
This commit is contained in:
Pier-Hugues Pellerin 2017-04-05 23:36:35 -04:00
parent 02d46525a7
commit 111c95c254
5 changed files with 79 additions and 3 deletions

View file

@ -75,9 +75,12 @@ class LogStash::Agent
Stud.interval(@reload_interval) { reload_state! }
else
while !Stud.stop?
if clean_state? || running_pipelines?
sleep 0.5
else
if running_user_defined_pipelines?
sleep(0.5)
elsif running_pipelines?
logger.debug("Shutting down system pipelines")
shutdown_pipelines
elsif clean_state? || !running_pipelines?
break
end
end
@ -189,6 +192,15 @@ class LogStash::Agent
end
end
def running_user_defined_pipelines?
@upgrade_mutex.synchronize do
@pipelines.select do |pipeline_id, _|
pipeline = @pipelines[pipeline_id]
pipeline.running? && !pipeline.system?
end.any?
end
end
def close_pipeline(id)
pipeline = @pipelines[id]
if pipeline

View file

@ -25,6 +25,7 @@ module LogStash
Setting::Numeric.new("config.reload.interval", 3), # in seconds
Setting::Boolean.new("metric.collect", true),
Setting::String.new("pipeline.id", "main"),
Setting::Boolean.new("pipeline.system", false),
Setting::PositiveInteger.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum),
Setting::PositiveInteger.new("pipeline.output.workers", 1),
Setting::PositiveInteger.new("pipeline.batch.size", 125),

View file

@ -257,6 +257,10 @@ module LogStash; class Pipeline < BasePipeline
@running.false?
end
def system?
settings.get_value("pipeline.system")
end
# register_plugin simply calls the plugin #register method and catches & logs any error
# @param plugin [Plugin] the plugin to register
# @return [Plugin] the registered plugin

View file

@ -99,6 +99,30 @@ describe LogStash::Agent do
subject.register_pipeline(pipeline_settings)
end
context "when a system pipeline is running" do
context "when one pipeline is finite" do
let(:pipeline_args) {
{
"path.config" => "a",
"config.string" => "input { generator { count => 1000 }} output { null {} }"
}
}
let(:system_pipeline_settings) do
s = agent_settings.clone
s.set("path.config", "")
s.set("config.string", "input { generator {}} output { null {} }")
s.set("pipeline.id", ".monitoring")
s.set("pipeline.system", true)
s
end
it "stops logstash at the end of the execution of the finite pipeline" do
subject.register_pipeline(system_pipeline_settings)
expect(subject.execute).to be_nil
end
end
end
context "if state is clean" do
before :each do
allow(subject).to receive(:running_pipelines?).and_return(true)

View file

@ -854,4 +854,39 @@ describe LogStash::Pipeline do
expect(pipeline1.instance_variables).to eq(pipeline2.instance_variables)
end
end
context "#system" do
after do
pipeline.close # close the queue
end
let(:pipeline) { LogStash::Pipeline.new(config_string, settings) }
let(:config_string) { "input { generator {} } output { null {} }" }
context "when the pipeline is a system pipeline" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set("pipeline.system", true)
s.set("config.string", config_string)
s
end
it "returns true" do
expect(pipeline.system?).to be_truthy
end
end
context "when the pipeline is not a system pipeline" do
let(:settings) do
s = LogStash::SETTINGS.clone
s.set("pipeline.system", false)
s.set("config.string", config_string)
s
end
it "returns true" do
expect(pipeline.system?).to be_falsey
end
end
end
end