mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
76aa2bbc76
commit
c7119fa608
4 changed files with 90 additions and 22 deletions
|
@ -8,5 +8,4 @@ gem.requirements << "jar org.apache.logging.log4j:log4j-core, 2.6.2"
|
|||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-core, 2.7.3"
|
||||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-databind, 2.7.3"
|
||||
gem.requirements << "jar com.fasterxml.jackson.core:jackson-annotations, 2.7.3"
|
||||
gem.requirements << "jar com.fasterxml.jackson.module:jackson-module-afterburner, 2.7.3"
|
||||
gem.requirements << "jar com.fasterxml.jackson.dataformat:jackson-dataformat-cbor, 2.7.3"
|
||||
|
|
|
@ -8,7 +8,6 @@ rescue LoadError
|
|||
require 'org/slf4j/slf4j-api/1.7.21/slf4j-api-1.7.21.jar'
|
||||
require 'com/fasterxml/jackson/core/jackson-annotations/2.7.3/jackson-annotations-2.7.3.jar'
|
||||
require 'org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar'
|
||||
require 'com/fasterxml/jackson/module/jackson-module-afterburner/2.7.3/jackson-module-afterburner-2.7.3.jar'
|
||||
require 'com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.7.3/jackson-dataformat-cbor-2.7.3.jar'
|
||||
require 'com/fasterxml/jackson/core/jackson-core/2.7.3/jackson-core-2.7.3.jar'
|
||||
end
|
||||
|
@ -20,7 +19,6 @@ if defined? Jars
|
|||
require_jar( 'org.slf4j', 'slf4j-api', '1.7.21' )
|
||||
require_jar( 'com.fasterxml.jackson.core', 'jackson-annotations', '2.7.3' )
|
||||
require_jar( 'org.apache.logging.log4j', 'log4j-slf4j-impl', '2.6.2' )
|
||||
require_jar( 'com.fasterxml.jackson.module', 'jackson-module-afterburner', '2.7.3' )
|
||||
require_jar( 'com.fasterxml.jackson.dataformat', 'jackson-dataformat-cbor', '2.7.3' )
|
||||
require_jar( 'com.fasterxml.jackson.core', 'jackson-core', '2.7.3' )
|
||||
end
|
||||
|
|
|
@ -15,7 +15,7 @@ module LogStash; module Util
|
|||
# Push an object to the queue if the queue is full
|
||||
# it will block until the object can be added to the queue.
|
||||
#
|
||||
# @param [Object] Object to add to the queue
|
||||
# @param [obj] Object to add to the queue
|
||||
def push(obj)
|
||||
@queue.put(obj)
|
||||
end
|
||||
|
@ -53,7 +53,7 @@ module LogStash; module Util
|
|||
# allow the worker thread to report the execution time of the filter + output
|
||||
@inflight_clocks = {}
|
||||
@batch_size = batch_size
|
||||
@wait_for = wait_for
|
||||
@wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS)
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -66,7 +66,7 @@ module LogStash; module Util
|
|||
|
||||
def set_batch_dimensions(batch_size, wait_for)
|
||||
@batch_size = batch_size
|
||||
@wait_for = wait_for
|
||||
@wait_for = TimeUnit::NANOSECONDS.convert(wait_for, TimeUnit::MILLISECONDS)
|
||||
end
|
||||
|
||||
def set_events_metric(metric)
|
||||
|
@ -107,12 +107,11 @@ module LogStash; module Util
|
|||
# create a new empty batch
|
||||
# @return [ReadBatch] a new empty read batch
|
||||
def new_batch
|
||||
ReadBatch.new(@queue, @batch_size, @wait_for)
|
||||
ReadBatch.new(@queue, 0, 0)
|
||||
end
|
||||
|
||||
def read_batch
|
||||
batch = new_batch
|
||||
batch.read_next
|
||||
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
|
||||
start_metrics(batch)
|
||||
batch
|
||||
end
|
||||
|
@ -174,7 +173,7 @@ module LogStash; module Util
|
|||
|
||||
class ReadBatch
|
||||
def initialize(queue, size, wait)
|
||||
@queue = queue
|
||||
@queue = queue.queue
|
||||
@size = size
|
||||
@wait = wait
|
||||
|
||||
|
@ -182,20 +181,9 @@ module LogStash; module Util
|
|||
# @cancelled = Hash.new
|
||||
|
||||
#Sizing HashSet to size/load_factor to ensure no rehashing
|
||||
@originals = HashSet.new(size * 4 / 3 + 1, 0.75)
|
||||
@is_iterating = false # Atomic Boolean maybe? Although batches are not shared across threads
|
||||
@acked_batch = nil
|
||||
end
|
||||
|
||||
def read_next
|
||||
read_size = @queue.queue.drainTo(@originals, @size)
|
||||
if read_size < @size
|
||||
(@size - read_size).times do |_|
|
||||
e = @queue.poll(@wait)
|
||||
return if e.nil?
|
||||
@originals.add(e)
|
||||
end
|
||||
end
|
||||
@originals = org.logstash.common.LsQueueUtils.drain(@queue, @size, @wait)
|
||||
end
|
||||
|
||||
def merge(event)
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
package org.logstash.common;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
||||
/**
|
||||
* Utilities around {@link BlockingQueue}.
|
||||
*/
|
||||
public final class LsQueueUtils {
|
||||
|
||||
private LsQueueUtils() {
|
||||
//Utility Class
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Drains {@link JrubyEventExtLibrary.RubyEvent} from {@link BlockingQueue} with a timeout.</p>
|
||||
* <p>The timeout will be reset as soon as a single {@link JrubyEventExtLibrary.RubyEvent} was
|
||||
* drained from the {@link BlockingQueue}. Draining {@link JrubyEventExtLibrary.RubyEvent}
|
||||
* stops as soon as either the required number of {@link JrubyEventExtLibrary.RubyEvent}s
|
||||
* were pulled from the queue or the timeout value has gone by without an event drained.</p>
|
||||
* @param queue Blocking Queue to drain {@link JrubyEventExtLibrary.RubyEvent}s
|
||||
* from
|
||||
* @param count Number of {@link JrubyEventExtLibrary.RubyEvent}s to drain from
|
||||
* {@link BlockingQueue}
|
||||
* @param nanos Timeout in Nanoseconds
|
||||
* @return Collection of {@link JrubyEventExtLibrary.RubyEvent} drained from
|
||||
* {@link BlockingQueue}
|
||||
* @throws InterruptedException On Interrupt during {@link BlockingQueue#poll()} or
|
||||
* {@link BlockingQueue#drainTo(Collection)}
|
||||
*/
|
||||
public static Collection<JrubyEventExtLibrary.RubyEvent> drain(
|
||||
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue, final int count, final long nanos
|
||||
) throws InterruptedException {
|
||||
int left = count;
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> collection =
|
||||
new HashSet<>(4 * count / 3 + 1);
|
||||
while (left > 0) {
|
||||
final int drained = drain(queue, collection, left, nanos);
|
||||
if (drained == 0) {
|
||||
break;
|
||||
}
|
||||
left -= drained;
|
||||
}
|
||||
return collection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to drain a given number of {@link JrubyEventExtLibrary.RubyEvent} from
|
||||
* {@link BlockingQueue} with a timeout.
|
||||
* @param queue Blocking Queue to drain {@link JrubyEventExtLibrary.RubyEvent}s
|
||||
* from
|
||||
* @param count Number of {@link JrubyEventExtLibrary.RubyEvent}s to drain from
|
||||
* {@link BlockingQueue}
|
||||
* @param nanos Timeout in Nanoseconds
|
||||
* @return Collection of {@link JrubyEventExtLibrary.RubyEvent} drained from
|
||||
* {@link BlockingQueue}
|
||||
* @throws InterruptedException On Interrupt during {@link BlockingQueue#poll()} or
|
||||
* {@link BlockingQueue#drainTo(Collection)}
|
||||
*/
|
||||
private static int drain(final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> collection, final int count,
|
||||
final long nanos) throws InterruptedException {
|
||||
final long deadline = System.nanoTime() + nanos;
|
||||
int added = 0;
|
||||
while (added < count) {
|
||||
added += queue.drainTo(collection, count - added);
|
||||
if (added < count) {
|
||||
final JrubyEventExtLibrary.RubyEvent event =
|
||||
queue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
|
||||
if (event == null) {
|
||||
break;
|
||||
}
|
||||
collection.add(event);
|
||||
added++;
|
||||
}
|
||||
}
|
||||
return added;
|
||||
}
|
||||
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue