diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index f58a88c75..347516f9e 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -321,7 +321,7 @@ class LogStash::Agent end def no_pipeline? - @pipelines_registry.running_pipelines.empty? && @pipelines_registry.loading_pipelines.empty? + @pipelines_registry.running_pipelines(include_loading: true).empty? end private diff --git a/logstash-core/lib/logstash/pipelines_registry.rb b/logstash-core/lib/logstash/pipelines_registry.rb index 20186775b..7d63d4e97 100644 --- a/logstash-core/lib/logstash/pipelines_registry.rb +++ b/logstash-core/lib/logstash/pipelines_registry.rb @@ -24,8 +24,8 @@ module LogStash @pipeline = pipeline @loading = Concurrent::AtomicBoolean.new(false) - # this class uses a lock to ensure thread safe visibility. - @lock = Mutex.new + # this class uses a reentrant lock to ensure thread safe visibility. + @lock = Monitor.new end def terminated? @@ -61,6 +61,12 @@ module LogStash end end + def synchronize + @lock.synchronize do + yield self + end + end + def pipeline @lock.synchronize { @pipeline } end @@ -265,8 +271,8 @@ module LogStash end # @return [Hash{String=>Pipeline}] - def running_pipelines - select_pipelines { |state| state.running? } + def running_pipelines(include_loading: false) + select_pipelines { |state| state.running? || (include_loading && state.loading?) } end def loading_pipelines @@ -296,7 +302,7 @@ module LogStash # @return [Hash{String=>Pipeline}] def select_pipelines(&optional_state_filter) @states.each_with_object({}) do |(id, state), memo| - if state && (!block_given? || yield(state)) + if state && (!block_given? || state.synchronize(&optional_state_filter)) memo[id] = state.pipeline end end