mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
Separate "not terminated" pipeline state into "running" and "loading".
This change fix the behavior of considering as "running" also pipelines that are still in "loading", both "loading" and "running" is considered as "not terminated".
Fixed a flakyness in tests due to different ways to looks at the same thing: pipeline status.
The pipeline status is determined by both `pipeline.running?` and by `agent.pipelines_running`.
The first checks for an atomic boolean in pipeline object, the second check for the status in PipelineRegistry.
Fixes #12190
(cherry picked from commit 79d8f47437
)
This commit is contained in:
parent
7608deecfb
commit
4e82485d39
4 changed files with 58 additions and 4 deletions
|
@ -297,6 +297,10 @@ class LogStash::Agent
|
|||
@pipelines_registry.running_pipelines
|
||||
end
|
||||
|
||||
def loading_pipelines
|
||||
@pipelines_registry.loading_pipelines
|
||||
end
|
||||
|
||||
def non_running_pipelines
|
||||
@pipelines_registry.non_running_pipelines
|
||||
end
|
||||
|
|
|
@ -35,6 +35,19 @@ module LogStash
|
|||
end
|
||||
end
|
||||
|
||||
def running?
|
||||
@lock.synchronize do
|
||||
# not terminated and not loading
|
||||
@loading.false? && !@pipeline.finished_execution?
|
||||
end
|
||||
end
|
||||
|
||||
def loading?
|
||||
@lock.synchronize do
|
||||
@loading.true?
|
||||
end
|
||||
end
|
||||
|
||||
def set_loading(is_loading)
|
||||
@lock.synchronize do
|
||||
@loading.value = is_loading
|
||||
|
@ -253,7 +266,11 @@ module LogStash
|
|||
|
||||
# @return [Hash{String=>Pipeline}]
|
||||
def running_pipelines
|
||||
select_pipelines { |state| !state.terminated? }
|
||||
select_pipelines { |state| state.running? }
|
||||
end
|
||||
|
||||
def loading_pipelines
|
||||
select_pipelines { |state| state.loading? }
|
||||
end
|
||||
|
||||
# @return [Hash{String=>Pipeline}]
|
||||
|
|
|
@ -210,13 +210,14 @@ describe LogStash::PipelinesRegistry do
|
|||
end
|
||||
end
|
||||
|
||||
# make sure we entered the block executioin
|
||||
# make sure we entered the block execution
|
||||
wait(10).for {in_block.true?}.to be_truthy
|
||||
|
||||
# at this point the thread is suspended waiting on queue
|
||||
|
||||
# since in reloading state, running_pipelines is not empty
|
||||
expect(subject.running_pipelines).not_to be_empty
|
||||
expect(subject.running_pipelines).to be_empty
|
||||
expect(subject.loading_pipelines).not_to be_empty
|
||||
|
||||
# unblock thread
|
||||
queue.push(:dummy)
|
||||
|
@ -224,6 +225,7 @@ describe LogStash::PipelinesRegistry do
|
|||
|
||||
# 3rd call: finished_execution? is true
|
||||
expect(subject.running_pipelines).to be_empty
|
||||
expect(subject.loading_pipelines).to be_empty
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -271,6 +273,33 @@ describe LogStash::PipelinesRegistry do
|
|||
end
|
||||
|
||||
context "pipelines collections" do
|
||||
context "with a reloading pipeline" do
|
||||
before :each do
|
||||
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||
# expect(pipeline).to receive(:finished_execution?).and_return(false)
|
||||
in_block = Concurrent::AtomicBoolean.new(false)
|
||||
queue = Queue.new # threadsafe queue
|
||||
thread = Thread.new(in_block) do |in_block|
|
||||
subject.reload_pipeline(pipeline_id) do
|
||||
in_block.make_true
|
||||
# sleep(3) # simulate a long loading pipeline
|
||||
queue.pop
|
||||
end
|
||||
end
|
||||
# make sure we entered the block execution
|
||||
wait(10).for {in_block.true?}.to be_truthy
|
||||
end
|
||||
|
||||
it "should not find running pipelines" do
|
||||
expect(subject.running_pipelines).to be_empty
|
||||
end
|
||||
|
||||
it "should not find non_running pipelines" do
|
||||
# non running pipelines are those terminated
|
||||
expect(subject.non_running_pipelines).to be_empty
|
||||
end
|
||||
end
|
||||
|
||||
context "with a non terminated pipelines" do
|
||||
before :each do
|
||||
subject.create_pipeline(pipeline_id, pipeline) { true }
|
||||
|
|
|
@ -95,7 +95,7 @@ RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|
|
|||
expect(pipeline.running?).to be_truthy
|
||||
end
|
||||
expect(pipeline.config_str).to eq(pipeline_config.config_string)
|
||||
expect(agent.running_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
|
||||
expect(agent.running_pipelines.keys.map(&:to_s) + agent.loading_pipelines.keys.map(&:to_s)).to include(pipeline_config.pipeline_id.to_s)
|
||||
end
|
||||
|
||||
failure_message do |agent|
|
||||
|
@ -108,6 +108,10 @@ RSpec::Matchers.define :have_running_pipeline? do |pipeline_config|
|
|||
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines but its not running"
|
||||
elsif pipeline.config_str != pipeline_config.config_string
|
||||
"Found '#{pipeline_config.pipeline_id.to_s}' in the list of pipelines and running, but the config_string doesn't match,\nExpected:\n#{pipeline_config.config_string}\n\ngot:\n#{pipeline.config_str}"
|
||||
elsif agent.running_pipelines.keys.map(&:to_s).include?(pipeline_config.pipeline_id.to_s)
|
||||
"Found '#{pipeline_config.pipeline_id.to_s}' in running but not included in the list of agent.running_pipelines or agent.loading_pipelines"
|
||||
else
|
||||
"Unrecognized error condition, probably you missed to track properly a newly added expect in :have_running_pipeline?"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue