JAVAFICATION: Move QueueFactory to Java

Fixes #9693
This commit is contained in:
Armin 2018-05-31 11:46:32 +02:00 committed by Armin Braun
parent 0bbcf9a921
commit 0aa798f350
10 changed files with 174 additions and 85 deletions

View file

@ -5,7 +5,6 @@ require "logstash/filters/base"
require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/instrument/collector"
require "logstash/queue_factory"
require "logstash/compiler"
java_import org.logstash.common.DeadLetterQueueFactory

View file

@ -10,7 +10,6 @@ require "logstash/inputs/base"
require "logstash/outputs/base"
require "logstash/instrument/collector"
require "logstash/filter_delegator"
require "logstash/queue_factory"
require "logstash/compiler"
java_import org.logstash.common.DeadLetterQueueFactory

View file

@ -1,33 +0,0 @@
# encoding: utf-8
require "fileutils"
require "logstash/event"
module LogStash
class QueueFactory
def self.create(settings)
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
queue_max_bytes = settings.get("queue.max_bytes")
queue_max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
queue_path = ::File.join(settings.get("path.queue"), settings.get("pipeline.id"))
case queue_type
when "persisted"
# persisted is the disk based acked queue
FileUtils.mkdir_p(queue_path)
LogStash::WrappedAckedQueue.new(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
when "memory"
# memory is the legacy and default setting
LogStash::WrappedSynchronousQueue.new(
settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2
)
else
raise ConfigurationError, "Invalid setting `#{queue_type}` for `queue.type`, supported types are: 'memory' or 'persisted'"
end
end
end
end

View file

@ -1,5 +1,4 @@
# encoding: utf-8
require "logstash/queue_factory"
require "logstash/settings"
require "stud/temporary"

View file

@ -8,6 +8,7 @@ import org.jruby.RubyModule;
import org.jruby.anno.JRubyClass;
import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.ObjectAllocator;
import org.logstash.ackedqueue.QueueFactoryExt;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
@ -15,6 +16,7 @@ import org.logstash.common.BufferedTokenizerExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.EventDispatcherExt;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.LogstashPipelineExt;
@ -79,6 +81,8 @@ public final class RubyUtil {
public static final RubyClass ACKED_WRITE_CLIENT_CLASS;
public static final RubyClass ABSTRACT_WRAPPED_QUEUE_CLASS;
public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS;
public static final RubyClass WRAPPED_ACKED_QUEUE_CLASS;
@ -169,6 +173,8 @@ public final class RubyUtil {
public static final RubyClass PIPELINE_REPORTER_SNAPSHOT_CLASS;
public static final RubyClass QUEUE_FACTORY_CLASS;
public static final RubyClass HOOKS_REGISTRY_CLASS;
public static final RubyClass LOGSTASH_PIPELINE_CLASS;
@ -339,6 +345,11 @@ public final class RubyUtil {
RUBY_TIMESTAMP_CLASS = setupLogstashClass(
JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class
);
ABSTRACT_WRAPPED_QUEUE_CLASS = LOGSTASH_MODULE.defineClassUnder(
"AbstractWrappedQueue", RUBY.getObject(),
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
ABSTRACT_WRAPPED_QUEUE_CLASS.defineAnnotatedMethods(AbstractWrappedQueueExt.class);
WRAPPED_WRITE_CLIENT_CLASS =
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
QUEUE_READ_CLIENT_BASE_CLASS =
@ -351,12 +362,16 @@ public final class RubyUtil {
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
ACKED_WRITE_CLIENT_CLASS =
setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class);
WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
JrubyWrappedSynchronousQueueExt.class);
WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(JRubyWrappedAckedQueueExt::new,
JRubyWrappedAckedQueueExt.class);
WRAPPED_SYNCHRONOUS_QUEUE_CLASS = setupLogstashClass(
ABSTRACT_WRAPPED_QUEUE_CLASS, JrubyWrappedSynchronousQueueExt::new,
JrubyWrappedSynchronousQueueExt.class
);
WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(
ABSTRACT_WRAPPED_QUEUE_CLASS, JRubyWrappedAckedQueueExt::new,
JRubyWrappedAckedQueueExt.class
);
ACKED_QUEUE_CLASS = setupLogstashClass(JRubyAckedQueueExt::new, JRubyAckedQueueExt.class);
QUEUE_FACTORY_CLASS = setupLogstashClass(QueueFactoryExt::new, QueueFactoryExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);

View file

@ -0,0 +1,74 @@
package org.logstash.ackedqueue;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
@JRubyClass(name = "QueueFactory")
public final class QueueFactoryExt extends RubyBasicObject {
public QueueFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(meta = true)
public static AbstractWrappedQueueExt create(final ThreadContext context, final IRubyObject recv,
final IRubyObject settings) throws IOException {
final String type = getSetting(context, settings, "queue.type").asJavaString();
if ("persisted".equals(type)) {
final Path queuePath = Paths.get(
getSetting(context, settings, "path.queue").asJavaString(),
getSetting(context, settings, "pipeline.id").asJavaString()
);
Files.createDirectories(queuePath);
return new JRubyWrappedAckedQueueExt(context.runtime, RubyUtil.WRAPPED_ACKED_QUEUE_CLASS)
.initialize(
context, new IRubyObject[]{
context.runtime.newString(queuePath.toString()),
getSetting(context, settings, "queue.page_capacity"),
getSetting(context, settings, "queue.max_events"),
getSetting(context, settings, "queue.checkpoint.writes"),
getSetting(context, settings, "queue.checkpoint.acks"),
getSetting(context, settings, "queue.checkpoint.interval"),
getSetting(context, settings, "queue.max_bytes")
}
);
} else if ("memory".equals(type)) {
return new JrubyWrappedSynchronousQueueExt(
context.runtime, RubyUtil.WRAPPED_SYNCHRONOUS_QUEUE_CLASS
).initialize(
context, context.runtime.newFixnum(
getSetting(context, settings, "pipeline.batch.size")
.convertToInteger().getIntValue()
* getSetting(context, settings, "pipeline.workers")
.convertToInteger().getIntValue()
)
);
} else {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Invalid setting `%s` for `queue.type`, supported types are: 'memory' or 'persisted'",
type
)
);
}
}
private static IRubyObject getSetting(final ThreadContext context, final IRubyObject settings,
final String name) {
return settings.callMethod(context, "get_value", context.runtime.newString(name));
}
}

View file

@ -6,25 +6,25 @@ import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Arity;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
@JRubyClass(name = "WrappedAckedQueue")
public final class JRubyWrappedAckedQueueExt extends RubyObject {
public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
private JRubyAckedQueueExt queue;
private final AtomicBoolean isClosed = new AtomicBoolean();
@JRubyMethod(name = "initialize", optional = 7)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) throws IOException {
@JRubyMethod(optional = 7)
public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxEvents = RubyFixnum.num2int(args[2]);
@ -36,7 +36,7 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);
this.queue.open();
return context.nil;
return this;
}
public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass) {
@ -53,16 +53,6 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
isClosed.set(true);
}
@JRubyMethod(name = "close")
public IRubyObject rubyClose(ThreadContext context) {
try {
close();
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return context.nil;
}
@JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write");
@ -75,22 +65,31 @@ public final class JRubyWrappedAckedQueueExt extends RubyObject {
return queue.ruby_read_batch(context, size, wait);
}
@JRubyMethod(name = "write_client")
public IRubyObject rubyWriteClient(final ThreadContext context) {
return JrubyAckedWriteClientExt.create(queue, isClosed);
}
@JRubyMethod(name = "read_client")
public IRubyObject rubyReadClient(final ThreadContext context) {
return JrubyAckedReadClientExt.create(queue);
}
@JRubyMethod(name = "is_empty?")
public IRubyObject rubyIsEmpty(ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}
@Override
protected IRubyObject getWriteClient(final ThreadContext context) {
return JrubyAckedWriteClientExt.create(queue, isClosed);
}
@Override
protected IRubyObject getReadClient() {
return JrubyAckedReadClientExt.create(queue);
}
@Override
protected IRubyObject doClose(final ThreadContext context) {
try {
close();
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return context.nil;
}
private void checkIfClosed(String action) {
if (isClosed.get()) {
throw new RuntimeException("Attempted to " + action + " on a closed AckedQueue");

View file

@ -0,0 +1,38 @@
package org.logstash.execution;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "AbstractWrappedQueue")
public abstract class AbstractWrappedQueueExt extends RubyBasicObject {
public AbstractWrappedQueueExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "write_client")
public final IRubyObject writeClient(final ThreadContext context) {
return getWriteClient(context);
}
@JRubyMethod(name = "read_client")
public final IRubyObject readClient() {
return getReadClient();
}
@JRubyMethod
public final IRubyObject close(final ThreadContext context) {
return doClose(context);
}
protected abstract IRubyObject doClose(ThreadContext context);
protected abstract IRubyObject getWriteClient(ThreadContext context);
protected abstract IRubyObject getReadClient();
}

View file

@ -1,5 +1,7 @@
package org.logstash.ext;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
@ -7,14 +9,10 @@ import org.logstash.RubyUtil;
import org.logstash.common.LsQueueUtils;
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
import org.logstash.execution.QueueReadClient;
import org.logstash.execution.QueueReadClientBase;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@JRubyClass(name = "MemoryReadClient", parent = "QueueReadClientBase")
public final class JrubyMemoryReadClientExt extends QueueReadClientBase implements QueueReadClient {
public final class JrubyMemoryReadClientExt extends QueueReadClientBase {
private BlockingQueue queue;

View file

@ -1,19 +1,18 @@
package org.logstash.ext;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.logstash.execution.AbstractWrappedQueueExt;
@JRubyClass(name = "WrappedSynchronousQueue")
public final class JrubyWrappedSynchronousQueueExt extends RubyObject {
public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueExt {
private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
@ -21,27 +20,29 @@ public final class JrubyWrappedSynchronousQueueExt extends RubyObject {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize")
@JRubyMethod
@SuppressWarnings("unchecked")
public void rubyInitialize(final ThreadContext context, IRubyObject size) {
public JrubyWrappedSynchronousQueueExt initialize(final ThreadContext context,
IRubyObject size) {
int typedSize = ((RubyNumeric)size).getIntValue();
this.queue = new ArrayBlockingQueue<>(typedSize);
return this;
}
@JRubyMethod(name = "write_client")
public IRubyObject getWriteClient(final ThreadContext context) {
@Override
protected IRubyObject getWriteClient(final ThreadContext context) {
return JrubyMemoryWriteClientExt.create(queue);
}
@JRubyMethod(name = "read_client")
public IRubyObject getReadClient(final ThreadContext context) {
@Override
protected IRubyObject getReadClient() {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
return JrubyMemoryReadClientExt.create(queue, 125, 50);
}
@JRubyMethod(name = "close")
public IRubyObject rubyClose(final ThreadContext context) {
@Override
public IRubyObject doClose(final ThreadContext context) {
// no op
return this;
}