JAVAFICATION: Move AckedQueue Read Batch to Java

Fixes #9028
This commit is contained in:
Armin 2018-01-24 15:16:30 +01:00 committed by Armin Braun
parent a5bac509d5
commit 64bcbfb136
4 changed files with 126 additions and 53 deletions

View file

@ -129,7 +129,7 @@ module LogStash; module Util
# create a new empty batch
# @return [ReadBatch] a new empty read batch
def new_batch
ReadBatch.new(@queue, 0, 0)
LogStash::AckedReadBatch.new(@queue, 0, 0)
end
def read_batch
@ -137,7 +137,7 @@ module LogStash; module Util
raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue")
end
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
batch = LogStash::AckedReadBatch.new(@queue, @batch_size, @wait_for)
start_metrics(batch)
batch
end
@ -175,52 +175,5 @@ module LogStash; module Util
@pipeline_metric.increment(:out, filtered_size)
end
end
class ReadBatch
def initialize(queue, size, wait)
@generated = Hash.new
@acked_batch = queue.read_batch(size, wait)
@originals = @acked_batch.nil? ? Hash.new : @acked_batch.get_elements
end
def close
# this will ack the whole batch, regardless of whether some
# events were cancelled or failed
return if @acked_batch.nil?
@acked_batch.close
end
def merge(event)
return if event.nil? || @originals.key?(event)
@generated[event] = true
end
def to_a
events = []
each {|e| events << e}
events
end
def each(&blk)
@originals.each do |e, _|
blk.call(e) unless e.cancelled?
end
@generated.each do |e, _|
blk.call(e) unless e.cancelled?
end
end
def size
filtered_size
end
def starting_size
@originals.size
end
def filtered_size
@originals.size + @generated.size
end
end
end
end end

View file

@ -11,6 +11,7 @@ import org.jruby.runtime.ObjectAllocator;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.RubyAckedBatch;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.ext.JrubyAckedReadBatchExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyMemoryReadBatchExt;
@ -50,6 +51,8 @@ public final class RubyUtil {
public static final RubyClass MEMORY_READ_BATCH_CLASS;
public static final RubyClass ACKED_READ_BATCH_CLASS;
public static final RubyClass WRAPPED_WRITE_CLIENT_CLASS;
public static final RubyClass MEMORY_READ_CLIENT_CLASS;
@ -68,6 +71,8 @@ public final class RubyUtil {
);
MEMORY_READ_BATCH_CLASS =
setupLogstashClass(JrubyMemoryReadBatchExt::new, JrubyMemoryReadBatchExt.class);
ACKED_READ_BATCH_CLASS =
setupLogstashClass(JrubyAckedReadBatchExt::new, JrubyAckedReadBatchExt.class);
WRAPPED_WRITE_CLIENT_CLASS =
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
MEMORY_READ_CLIENT_CLASS =

View file

@ -0,0 +1,115 @@
package org.logstash.ext;
import java.util.Collection;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyEnumerator;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.ackedqueue.ext.RubyAckedBatch;
@JRubyClass(name = "AckedReadBatch")
public final class JrubyAckedReadBatchExt extends RubyObject {
private RubyAckedBatch ackedBatch;
private RubyHash originals;
private RubyHash generated;
public JrubyAckedReadBatchExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize", required = 3)
public IRubyObject ruby_initialize(final ThreadContext context, final IRubyObject queue,
final IRubyObject size, final IRubyObject timeout) {
final IRubyObject batch =
queue.callMethod(context, "read_batch", new IRubyObject[]{size, timeout});
if (batch.isNil()) {
originals = RubyHash.newHash(context.runtime);
ackedBatch = null;
} else {
ackedBatch = (RubyAckedBatch) batch;
originals = (RubyHash) ackedBatch.ruby_get_elements(context);
}
generated = RubyHash.newHash(context.runtime);
return this;
}
@JRubyMethod
public IRubyObject merge(final ThreadContext context, final IRubyObject event) {
if (!event.isNil() && !originals.containsKey(event)) {
generated.put(event, context.tru);
}
return this;
}
@JRubyMethod(name = "to_a")
public RubyArray toArray(final ThreadContext context) {
final RubyArray result = context.runtime.newArray(filteredSize());
for (final JrubyEventExtLibrary.RubyEvent event
: (Collection<JrubyEventExtLibrary.RubyEvent>) originals.keys()) {
if (!JrubyMemoryReadBatchExt.isCancelled(event)) {
result.add(event);
}
}
for (final JrubyEventExtLibrary.RubyEvent event
: (Collection<JrubyEventExtLibrary.RubyEvent>) generated.keys()) {
if (!JrubyMemoryReadBatchExt.isCancelled(event)) {
result.add(event);
}
}
return result;
}
@JRubyMethod
public IRubyObject each(final ThreadContext context, final Block block) {
if (!block.isGiven()) {
return RubyEnumerator.enumeratorizeWithSize(
context, this, "each", args -> getRuntime().newFixnum(filteredSize())
);
}
for (final JrubyEventExtLibrary.RubyEvent event :
(Collection<JrubyEventExtLibrary.RubyEvent>) originals.keys()) {
if (!JrubyMemoryReadBatchExt.isCancelled(event)) {
block.yield(context, event);
}
}
for (final JrubyEventExtLibrary.RubyEvent event :
(Collection<JrubyEventExtLibrary.RubyEvent>) generated.keys()) {
if (!JrubyMemoryReadBatchExt.isCancelled(event)) {
block.yield(context, event);
}
}
return this;
}
@JRubyMethod
public IRubyObject close(final ThreadContext context) {
if (ackedBatch != null) {
ackedBatch.ruby_close(context);
}
return this;
}
@JRubyMethod(name = {"size", "filtered_size"})
public IRubyObject rubySize(final ThreadContext context) {
return context.runtime.newFixnum(filteredSize());
}
@JRubyMethod(name = "starting_size")
public IRubyObject rubyStartingSize(final ThreadContext context) {
return context.runtime.newFixnum(originals.size());
}
public int filteredSize() {
return originals.size() + generated.size();
}
}

View file

@ -27,6 +27,10 @@ public final class JrubyMemoryReadBatchExt extends RubyObject {
this.events = events;
}
public static boolean isCancelled(final IRubyObject event) {
return ((JrubyEventExtLibrary.RubyEvent) event).getEvent().isCancelled();
}
public static JrubyMemoryReadBatchExt create(LinkedHashSet<IRubyObject> events) {
JrubyMemoryReadBatchExt batch = new JrubyMemoryReadBatchExt(RubyUtil.RUBY,
RubyUtil.MEMORY_READ_BATCH_CLASS, events);
@ -77,8 +81,4 @@ public final class JrubyMemoryReadBatchExt extends RubyObject {
}
return this;
}
private static boolean isCancelled(final IRubyObject event) {
return ((JrubyEventExtLibrary.RubyEvent) event).getEvent().isCancelled();
}
}