Further improve check on "running pipelines" after SIGHUP (#13995) (#14040)

As pointed out in this post merge review comment, there is a window
where we could miss a pipeline transitioning from 'loading' to 'running'
in the original fix, as separate calls are made to the pipeline registry.
This commit fixes that by making a single call to the pipeline registry which
allows for also returning pipelines in the `loading` state.

Co-authored-by: Ry Biesemeyer <yaauie@users.noreply.github.com>
(cherry picked from commit 1291b5edcc)

Co-authored-by: Rob Bavey <rob.bavey@elastic.co>
This commit is contained in:
github-actions[bot] 2022-04-27 15:44:08 +01:00 committed by GitHub
parent 8079f0a434
commit 788c72b67e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 6 deletions

View file

@ -321,7 +321,7 @@ class LogStash::Agent
end
def no_pipeline?
@pipelines_registry.running_pipelines.empty? && @pipelines_registry.loading_pipelines.empty?
@pipelines_registry.running_pipelines(include_loading: true).empty?
end
private

View file

@ -24,8 +24,8 @@ module LogStash
@pipeline = pipeline
@loading = Concurrent::AtomicBoolean.new(false)
# this class uses a lock to ensure thread safe visibility.
@lock = Mutex.new
# this class uses a reentrant lock to ensure thread safe visibility.
@lock = Monitor.new
end
def terminated?
@ -61,6 +61,12 @@ module LogStash
end
end
def synchronize
@lock.synchronize do
yield self
end
end
def pipeline
@lock.synchronize { @pipeline }
end
@ -265,8 +271,8 @@ module LogStash
end
# @return [Hash{String=>Pipeline}]
def running_pipelines
select_pipelines { |state| state.running? }
def running_pipelines(include_loading: false)
select_pipelines { |state| state.running? || (include_loading && state.loading?) }
end
def loading_pipelines
@ -296,7 +302,7 @@ module LogStash
# @return [Hash{String=>Pipeline}]
def select_pipelines(&optional_state_filter)
@states.each_with_object({}) do |(id, state), memo|
if state && (!block_given? || yield(state))
if state && (!block_given? || state.synchronize(&optional_state_filter))
memo[id] = state.pipeline
end
end