mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
7ab80bfd2d
commit
125ef10f2c
2 changed files with 7 additions and 23 deletions
|
@ -4,7 +4,6 @@ module LogStash; module Util
|
|||
class WrappedSynchronousQueue
|
||||
java_import java.util.concurrent.ArrayBlockingQueue
|
||||
java_import java.util.concurrent.TimeUnit
|
||||
java_import java.util.HashSet
|
||||
java_import org.logstash.common.LsQueueUtils
|
||||
|
||||
def initialize (size)
|
||||
|
@ -13,26 +12,12 @@ module LogStash; module Util
|
|||
|
||||
attr_reader :queue
|
||||
|
||||
# Push an object to the queue if the queue is full
|
||||
# it will block until the object can be added to the queue.
|
||||
#
|
||||
# @param [obj] Object to add to the queue
|
||||
def push(obj)
|
||||
@queue.put(obj)
|
||||
end
|
||||
alias_method(:<<, :push)
|
||||
|
||||
# Block for X millis
|
||||
def poll(millis)
|
||||
@queue.poll(millis, TimeUnit::MILLISECONDS)
|
||||
end
|
||||
|
||||
def write_client
|
||||
WriteClient.new(self)
|
||||
WriteClient.new(@queue)
|
||||
end
|
||||
|
||||
def read_client
|
||||
ReadClient.new(self)
|
||||
ReadClient.new(@queue)
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -61,7 +46,7 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def empty?
|
||||
@queue.queue.isEmpty
|
||||
@queue.isEmpty
|
||||
end
|
||||
|
||||
def set_batch_dimensions(batch_size, wait_for)
|
||||
|
@ -159,8 +144,7 @@ module LogStash; module Util
|
|||
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
# @cancelled = Hash.new
|
||||
|
||||
#Sizing HashSet to size/load_factor to ensure no rehashing
|
||||
@originals = LsQueueUtils.drain(queue.queue, size, wait)
|
||||
@originals = LsQueueUtils.drain(queue, size, wait)
|
||||
end
|
||||
|
||||
def merge(event)
|
||||
|
@ -201,7 +185,7 @@ module LogStash; module Util
|
|||
|
||||
class WriteClient
|
||||
def initialize(queue)
|
||||
@queue = queue.queue
|
||||
@queue = queue
|
||||
end
|
||||
|
||||
def push(event)
|
||||
|
|
|
@ -21,8 +21,8 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
|||
end
|
||||
|
||||
describe "WriteClient | ReadClient" do
|
||||
let(:write_client) { LogStash::Util::WrappedSynchronousQueue::WriteClient.new(subject)}
|
||||
let(:read_client) { LogStash::Util::WrappedSynchronousQueue::ReadClient.new(subject)}
|
||||
let(:write_client) { subject.write_client }
|
||||
let(:read_client) { subject.read_client }
|
||||
|
||||
context "when reading from the queue" do
|
||||
let(:collector) { LogStash::Instrument::Collector.new }
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue