PERFORMANCE: Simplify batch iteration

Fixes #8443
This commit is contained in:
Armin 2017-10-06 09:28:05 +02:00 committed by Armin Braun
parent 826c4e9307
commit 67347529f4
3 changed files with 6 additions and 52 deletions

View file

@ -250,8 +250,6 @@ module LogStash; module Util
# @cancelled = Hash.new
@generated = Hash.new
@iterating_temp = Hash.new
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@acked_batch = nil
end
@ -270,14 +268,7 @@ module LogStash; module Util
def merge(event)
return if event.nil? || @originals.key?(event)
# take care not to cause @generated to change during iteration
# @iterating_temp is merged after the iteration
if @is_iterating
@iterating_temp[event] = true
else
# the periodic flush could generate events outside of an each iteration
@generated[event] = true
end
@generated[event] = true
end
def cancel(event)
@ -293,19 +284,14 @@ module LogStash; module Util
end
def each(&blk)
# take care not to cause @originals or @generated to change during iteration
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@is_iterating = true
@originals.each do |e, _|
blk.call(e) unless e.cancelled?
end
@generated.each do |e, _|
blk.call(e) unless e.cancelled?
end
@is_iterating = false
update_generated
end
def size
@ -333,13 +319,6 @@ module LogStash; module Util
def flush_signal_received?
false
end
private
def update_generated
@generated.update(@iterating_temp)
@iterating_temp.clear
end
end
class WriteClient

View file

@ -161,21 +161,12 @@ module LogStash; module Util
# @cancelled = Hash.new
#Sizing HashSet to size/load_factor to ensure no rehashing
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
@originals = LsQueueUtils.drain(queue.queue, size, wait)
end
def merge(event)
return if event.nil? || @originals.contains(event)
# take care not to cause @generated to change during iteration
# @iterating_temp is merged after the iteration
if @is_iterating
@iterating_temp = HashSet.new if @iterating_temp.nil?
@iterating_temp.add(event)
else
# the periodic flush could generate events outside of an each iteration
@originals.add(event)
end
return if event.nil?
@originals.add(event)
end
def cancel(event)
@ -186,21 +177,14 @@ module LogStash; module Util
def to_a
events = []
each {|e| events << e}
@originals.each {|e| events << e unless e.cancelled?}
events
end
def each(&blk)
# take care not to cause @originals or @generated to change during iteration
@is_iterating = true
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@originals.each do |e|
blk.call(e) unless e.cancelled?
end
@is_iterating = false
update_generated unless @iterating_temp.nil?
@originals.each {|e| blk.call(e) unless e.cancelled?}
end
def filtered_size
@ -214,15 +198,6 @@ module LogStash; module Util
raise("cancelled_size is unsupported ")
# @cancelled.size
end
private
def update_generated
@originals.add_all(@iterating_temp)
# Iterating Temp will not be used again in the lifecycle of the batch so we
# give a hint to the garbage collector here
@iterating_temp = nil
end
end
class WriteClient

View file

@ -100,7 +100,7 @@ describe LogStash::Util::WrappedSynchronousQueue do
write_client.push_batch(batch)
read_batch = read_client.read_batch
expect(read_batch.size).to eq(5)
read_batch.each do |data|
read_batch.to_a.each do |data|
message = data.get("message")
expect(messages).to include(message)
messages.delete(message)