JAVAFICATION: Move in memory queue read batch to Java

Fixes #8956
This commit is contained in:
Armin 2018-01-15 21:13:04 +01:00 committed by Armin Braun
parent 0aa609c9ff
commit c77b6b5cbe
4 changed files with 79 additions and 36 deletions

View file

@ -83,11 +83,11 @@ module LogStash; module Util
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, 0, 0)
LogStash::MemoryReadBatch.new(java.util.LinkedHashSet.new(0))
end
def read_batch
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
batch = LogStash::MemoryReadBatch.new(LsQueueUtils.drain(@queue, @batch_size, @wait_for))
start_metrics(batch)
batch
end
@ -125,38 +125,6 @@ module LogStash; module Util
end
end
class ReadBatch
def initialize(queue, size, wait)
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
# @cancelled = Hash.new
@originals = LsQueueUtils.drain(queue, size, wait)
end
def merge(event)
return if event.nil?
@originals.add(event)
end
def to_a
events = []
@originals.each {|e| events << e unless e.cancelled?}
events
end
def each(&blk)
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
@originals.each {|e| blk.call(e) unless e.cancelled?}
end
def filtered_size
@originals.size
end
alias_method(:size, :filtered_size)
end
class WriteClient
def initialize(queue)
@queue = queue

View file

@ -11,6 +11,7 @@ import org.jruby.runtime.ObjectAllocator;
import org.logstash.ackedqueue.ext.AbstractJRubyQueue;
import org.logstash.ackedqueue.ext.RubyAckedBatch;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyMemoryReadBatchExt;
import org.logstash.ext.JrubyTimestampExtLibrary;
/**
@ -42,12 +43,16 @@ public final class RubyUtil {
public static final RubyClass TIMESTAMP_PARSER_ERROR;
public static final RubyClass MEMORY_READ_BATCH_CLASS;
static {
RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
RUBY_TIMESTAMP_CLASS = setupLogstashClass(
JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class
);
MEMORY_READ_BATCH_CLASS =
setupLogstashClass(JrubyMemoryReadBatchExt::new, JrubyMemoryReadBatchExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);

View file

@ -45,12 +45,12 @@ public final class LsQueueUtils {
* @throws InterruptedException On Interrupt during {@link BlockingQueue#poll()} or
* {@link BlockingQueue#drainTo(Collection)}
*/
public static Collection<JrubyEventExtLibrary.RubyEvent> drain(
public static LinkedHashSet<JrubyEventExtLibrary.RubyEvent> drain(
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue, final int count, final long nanos
) throws InterruptedException {
int left = count;
//todo: make this an ArrayList once we remove the Ruby pipeline/execution
final Collection<JrubyEventExtLibrary.RubyEvent> collection =
final LinkedHashSet<JrubyEventExtLibrary.RubyEvent> collection =
new LinkedHashSet<>(4 * count / 3 + 1);
do {
final int drained = drain(queue, collection, left, nanos);

View file

@ -0,0 +1,70 @@
package org.logstash.ext;
import java.util.LinkedHashSet;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyEnumerator;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "MemoryReadBatch")
public class JrubyMemoryReadBatchExt extends RubyObject {
private LinkedHashSet<IRubyObject> events;
public JrubyMemoryReadBatchExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize", required = 1)
@SuppressWarnings("unchecked")
public void ruby_initialize(final ThreadContext context, final IRubyObject events) {
this.events = (LinkedHashSet<IRubyObject>) events.toJava(LinkedHashSet.class);
}
@JRubyMethod(name = "to_a")
public RubyArray toArray(final ThreadContext context) {
final RubyArray result = context.runtime.newArray(events.size());
for (final IRubyObject event : events) {
if (!isCancelled(event)) {
result.add(event);
}
}
return result;
}
@JRubyMethod(required = 1)
public IRubyObject merge(final ThreadContext context, final IRubyObject event) {
events.add(event);
return this;
}
@JRubyMethod(name = "filtered_size", alias = "size")
public IRubyObject filteredSize(final ThreadContext context) {
return context.runtime.newFixnum(events.size());
}
@JRubyMethod
public IRubyObject each(final ThreadContext context, final Block block) {
if (!block.isGiven()) {
return RubyEnumerator.enumeratorizeWithSize(
context, this, "each", args -> getRuntime().newFixnum(events.size())
);
}
for (final IRubyObject event : events) {
if (!isCancelled(event)) {
block.yield(context, event);
}
}
return this;
}
private static boolean isCancelled(final IRubyObject event) {
return ((JrubyEventExtLibrary.RubyEvent) event).getEvent().isCancelled();
}
}