diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb index b9892df3a..61546f547 100644 --- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb @@ -253,7 +253,7 @@ module LogStash; module Util @generated = Hash.new @iterating_temp = Hash.new - @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads + @is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads @acked_batch = nil end @@ -274,7 +274,7 @@ module LogStash; module Util return if event.nil? || @originals.key?(event) # take care not to cause @generated to change during iteration # @iterating_temp is merged after the iteration - if iterating? + if @is_iterating @iterating_temp[event] = true else # the periodic flush could generate events outside of an each iteration @@ -293,14 +293,14 @@ module LogStash; module Util # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor - @iterating = true + @is_iterating = true @originals.each do |e, _| blk.call(e) unless e.cancelled? end @generated.each do |e, _| blk.call(e) unless e.cancelled? end - @iterating = false + @is_iterating = false update_generated end @@ -332,10 +332,6 @@ module LogStash; module Util private - def iterating? - @iterating - end - def update_generated @generated.update(@iterating_temp) @iterating_temp.clear diff --git a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb index 676d47e69..706149f34 100644 --- a/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb @@ -3,8 +3,8 @@ module LogStash; module Util class WrappedSynchronousQueue java_import java.util.concurrent.ArrayBlockingQueue - java_import java.util.concurrent.SynchronousQueue java_import java.util.concurrent.TimeUnit + java_import java.util.HashSet def initialize (size) @queue = ArrayBlockingQueue.new(size) @@ -177,10 +177,8 @@ module LogStash; module Util # @cancelled = Hash.new #Sizing HashSet to size/load_factor to ensure no rehashing - @originals = java.util.HashSet.new(size * 4 / 3 + 1, 0.75) - @generated = Hash.new - @iterating_temp = Hash.new - @iterating = false # Atomic Boolean maybe? Although batches are not shared across threads + @originals = HashSet.new(size * 4 / 3 + 1, 0.75) + @is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads @acked_batch = nil end @@ -199,11 +197,12 @@ module LogStash; module Util return if event.nil? || @originals.contains(event) # take care not to cause @generated to change during iteration # @iterating_temp is merged after the iteration - if iterating? - @iterating_temp[event] = true + if @is_iterating + @iterating_temp = HashSet.new if @iterating_temp.nil? + @iterating_temp.add(event) else # the periodic flush could generate events outside of an each iteration - @generated[event] = true + @originals.add(event) end end @@ -215,33 +214,23 @@ module LogStash; module Util def each(&blk) # take care not to cause @originals or @generated to change during iteration - @iterating = true + @is_iterating = true # below the checks for @cancelled.include?(e) have been replaced by e.cancelled? # TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor @originals.each do |e| blk.call(e) unless e.cancelled? end - - @generated.each do |e, _| - blk.call(e) unless e.cancelled? - end - @iterating = false - update_generated - end - - def size - filtered_size - end - - def starting_size - @originals.size + @is_iterating = false + update_generated unless @iterating_temp.nil? end def filtered_size - @originals.size + @generated.size + @originals.size end + alias_method(:size, :filtered_size) + def cancelled_size # TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor raise("cancelled_size is unsupported ") @@ -250,13 +239,11 @@ module LogStash; module Util private - def iterating? - @iterating - end - def update_generated - @generated.update(@iterating_temp) - @iterating_temp.clear + @originals.add_all(@iterating_temp) + # Iterating Temp will not be used again in the lifecycle of the batch so we + # give a hint to the garbage collector here + @iterating_temp = nil end end