enable agent to stop pipeline by pipeline_id (#12582) (#12616)

* This PR allows the agent to stop pipeline by pipeline_id instead of fetching the full set of pipelines from elasticsearch and compute the pipeline actions internally

Fixed: #12560
This commit is contained in:
kaisecheng 2021-02-02 15:37:17 +01:00 committed by GitHub
parent ed7fb06a36
commit 73154014cb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 0 deletions

View file

@ -213,6 +213,15 @@ class LogStash::Agent
logger.error("An exception happened when converging configuration", attributes)
end
##
# Shut down a pipeline and wait for it to fully stop.
# WARNING: Calling from `Plugin#initialize` or `Plugin#register` will result in deadlock.
# @param pipeline_id [String]
def stop_pipeline(pipeline_id)
action = LogStash::PipelineAction::Stop.new(pipeline_id.to_sym)
converge_state_with_resolved_actions([action])
end
# Calculate the Logstash uptime in milliseconds
#
# @return [Integer] Uptime in milliseconds
@ -329,6 +338,15 @@ class LogStash::Agent
end
end
# Beware the usage with #resolve_actions_and_converge_state
# Calling this method in `Plugin#register` causes deadlock.
# For example, resolve_actions_and_converge_state -> pipeline reload_action -> plugin register -> converge_state_with_resolved_actions
def converge_state_with_resolved_actions(pipeline_actions)
@convergence_lock.synchronize do
converge_state(pipeline_actions)
end
end
# We depends on a series of task derived from the internal state and what
# need to be run, theses actions are applied to the current pipelines to converge to
# the desired state.

View file

@ -333,6 +333,32 @@ describe LogStash::Agent do
end
end
describe "#stop_pipeline" do
let(:config_string) { "input { generator { id => 'old'} } output { }" }
let(:mock_config_pipeline) { mock_pipeline_config(:main, config_string, pipeline_settings) }
let(:source_loader) { TestSourceLoader.new(mock_config_pipeline) }
subject { described_class.new(agent_settings, source_loader) }
before(:each) do
expect(subject.converge_state_and_update).to be_a_successful_converge
expect(subject.get_pipeline('main').running?).to be_truthy
end
after(:each) do
subject.shutdown
end
context "when agent stops the pipeline" do
it "should stop successfully", :aggregate_failures do
converge_result = subject.stop_pipeline('main')
expect(converge_result).to be_a_successful_converge
expect(subject.get_pipeline('main').stopped?).to be_truthy
end
end
end
context "#started_at" do
it "return the start time when the agent is started" do
expect(described_class::STARTED_AT).to be_kind_of(Time)