MINOR: Improve batch iteration

Fixes #7837
This commit is contained in:
Armin 2017-07-28 12:34:56 +02:00 committed by Armin Braun
parent 896fa6b7a8
commit d6006ece00
2 changed files with 21 additions and 38 deletions

View file

@ -253,7 +253,7 @@ module LogStash; module Util
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@acked_batch = nil
end
@ -274,7 +274,7 @@ module LogStash; module Util
return if event.nil? || @originals.key?(event)
# take care not to cause @generated to change during iteration
# @iterating_temp is merged after the iteration
if iterating?
if @is_iterating
@iterating_temp[event] = true
else
# the periodic flush could generate events outside of an each iteration
@ -293,14 +293,14 @@ module LogStash; module Util
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@iterating = true
@is_iterating = true
@originals.each do |e, _|
blk.call(e) unless e.cancelled?
end
@generated.each do |e, _|
blk.call(e) unless e.cancelled?
end
@iterating = false
@is_iterating = false
update_generated
end
@ -332,10 +332,6 @@ module LogStash; module Util
private
def iterating?
@iterating
end
def update_generated
@generated.update(@iterating_temp)
@iterating_temp.clear

View file

@ -3,8 +3,8 @@
module LogStash; module Util
class WrappedSynchronousQueue
java_import java.util.concurrent.ArrayBlockingQueue
java_import java.util.concurrent.SynchronousQueue
java_import java.util.concurrent.TimeUnit
java_import java.util.HashSet
def initialize (size)
@queue = ArrayBlockingQueue.new(size)
@ -177,10 +177,8 @@ module LogStash; module Util
# @cancelled = Hash.new
#Sizing HashSet to size/load_factor to ensure no rehashing
@originals = java.util.HashSet.new(size * 4 / 3 + 1, 0.75)
@generated = Hash.new
@iterating_temp = Hash.new
@iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@originals = HashSet.new(size * 4 / 3 + 1, 0.75)
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@acked_batch = nil
end
@ -199,11 +197,12 @@ module LogStash; module Util
return if event.nil? || @originals.contains(event)
# take care not to cause @generated to change during iteration
# @iterating_temp is merged after the iteration
if iterating?
@iterating_temp[event] = true
if @is_iterating
@iterating_temp = HashSet.new if @iterating_temp.nil?
@iterating_temp.add(event)
else
# the periodic flush could generate events outside of an each iteration
@generated[event] = true
@originals.add(event)
end
end
@ -215,33 +214,23 @@ module LogStash; module Util
def each(&blk)
# take care not to cause @originals or @generated to change during iteration
@iterating = true
@is_iterating = true
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@originals.each do |e|
blk.call(e) unless e.cancelled?
end
@generated.each do |e, _|
blk.call(e) unless e.cancelled?
end
@iterating = false
update_generated
end
def size
filtered_size
end
def starting_size
@originals.size
@is_iterating = false
update_generated unless @iterating_temp.nil?
end
def filtered_size
@originals.size + @generated.size
@originals.size
end
alias_method(:size, :filtered_size)
def cancelled_size
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
raise("cancelled_size is unsupported ")
@ -250,13 +239,11 @@ module LogStash; module Util
private
def iterating?
@iterating
end
def update_generated
@generated.update(@iterating_temp)
@iterating_temp.clear
@originals.add_all(@iterating_temp)
# Iterating Temp will not be used again in the lifecycle of the batch so we
# give a hint to the garbage collector here
@iterating_temp = nil
end
end