mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
- Add my attempt at a locking 'sliding window' class that would prevent
new additions if the 'want' buffer was full.
This commit is contained in:
parent
03707d1e20
commit
0e68c317cb
1 changed files with 43 additions and 0 deletions
43
lib/util.rb
43
lib/util.rb
|
@ -1,3 +1,12 @@
|
|||
|
||||
class TrackingMutex < Mutex
|
||||
def synchronize(&blk)
|
||||
puts "Enter #{self} @ #{Thread.current} + #{caller[0]}"
|
||||
super { blk.call }
|
||||
puts "Exit #{self} @ #{Thread.current} + #{caller[0]}"
|
||||
end
|
||||
end
|
||||
|
||||
module LogStash
|
||||
class Util
|
||||
def self.collapse(hash)
|
||||
|
@ -37,6 +46,40 @@ module LogStash
|
|||
return duration.to_s[0 .. precision]
|
||||
end # def to_s
|
||||
end # class StopWatch
|
||||
|
||||
class SlidingWindowSet
|
||||
def initialize(window_size = 2)
|
||||
@want = Set.new
|
||||
@lock = TrackingMutex.new
|
||||
@cv = ConditionVariable.new
|
||||
@window_size = window_size
|
||||
end
|
||||
|
||||
def <<(val)
|
||||
@lock.synchronize do
|
||||
if @want.length >= @window_size
|
||||
$stderr.puts "sliding window closed (#{@want.length} vs #{@window_size})"
|
||||
@cv.wait(@lock)
|
||||
$stderr.puts "sliding window reopend (#{@want.length} vs #{@window_size})"
|
||||
end
|
||||
@want << val
|
||||
end
|
||||
end
|
||||
|
||||
def delete(val)
|
||||
@lock.synchronize do
|
||||
$stderr.puts "Deleting #{val}"
|
||||
@want.delete(val)
|
||||
if @want.length < @window_size
|
||||
@cv.notify
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def include?(val)
|
||||
return @want.include?(val)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue