mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
f75f8579bd
commit
d66e0c507c
3 changed files with 22 additions and 7 deletions
|
@ -5,6 +5,7 @@ module LogStash; module Util
|
|||
java_import java.util.concurrent.ArrayBlockingQueue
|
||||
java_import java.util.concurrent.TimeUnit
|
||||
java_import java.util.HashSet
|
||||
java_import org.logstash.common.LsQueueUtils
|
||||
|
||||
def initialize (size)
|
||||
@queue = ArrayBlockingQueue.new(size)
|
||||
|
@ -183,7 +184,7 @@ module LogStash; module Util
|
|||
#Sizing HashSet to size/load_factor to ensure no rehashing
|
||||
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
|
||||
@acked_batch = nil
|
||||
@originals = org.logstash.common.LsQueueUtils.drain(@queue, @size, @wait)
|
||||
@originals = LsQueueUtils.drain(@queue, @size, @wait)
|
||||
end
|
||||
|
||||
def merge(event)
|
||||
|
@ -242,7 +243,7 @@ module LogStash; module Util
|
|||
|
||||
class WriteClient
|
||||
def initialize(queue)
|
||||
@queue = queue
|
||||
@queue = queue.queue
|
||||
end
|
||||
|
||||
def get_new_batch
|
||||
|
@ -250,18 +251,18 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def push(event)
|
||||
@queue.push(event)
|
||||
@queue.put(event)
|
||||
end
|
||||
alias_method(:<<, :push)
|
||||
|
||||
def push_batch(batch)
|
||||
batch.each do |event|
|
||||
push(event)
|
||||
end
|
||||
LsQueueUtils.addAll(@queue, batch.events)
|
||||
end
|
||||
end
|
||||
|
||||
class WriteBatch
|
||||
attr_reader :events
|
||||
|
||||
def initialize
|
||||
@events = []
|
||||
end
|
||||
|
|
|
@ -61,7 +61,7 @@ describe LogStash::Util::WrappedSynchronousQueue do
|
|||
context "when we have item in the queue" do
|
||||
it "records the `duration_in_millis`" do
|
||||
batch = write_client.get_new_batch
|
||||
5.times {|i| batch.push("value-#{i}")}
|
||||
5.times {|i| batch.push(LogStash::Event.new({"message" => "value-#{i}"}))}
|
||||
write_client.push_batch(batch)
|
||||
|
||||
read_batch = read_client.read_batch
|
||||
|
|
|
@ -15,6 +15,20 @@ public final class LsQueueUtils {
|
|||
//Utility Class
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds all {@link JrubyEventExtLibrary.RubyEvent} in the given collection to the given queue
|
||||
* in a blocking manner, only returning once all events have been added to the queue.
|
||||
* @param queue Queue to add Events to
|
||||
* @param events Events to add to Queue
|
||||
* @throws InterruptedException On interrupt during blocking queue add
|
||||
*/
|
||||
public static void addAll(final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> events) throws InterruptedException {
|
||||
for (final JrubyEventExtLibrary.RubyEvent event : events) {
|
||||
queue.put(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Drains {@link JrubyEventExtLibrary.RubyEvent} from {@link BlockingQueue} with a timeout.</p>
|
||||
* <p>The timeout will be reset as soon as a single {@link JrubyEventExtLibrary.RubyEvent} was
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue