mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
JAVAFICATION: Move more of the pipeline code to Java
* More pipeline code pulled to Java superclass * Stronger typing for PQ write client code Fixes #9697
This commit is contained in:
parent
1cc5358892
commit
3c30459c80
10 changed files with 130 additions and 87 deletions
|
@ -7,11 +7,7 @@ require "logstash/outputs/base"
|
|||
require "logstash/instrument/collector"
|
||||
require "logstash/compiler"
|
||||
|
||||
java_import org.logstash.common.DeadLetterQueueFactory
|
||||
java_import org.logstash.common.SourceWithMetadata
|
||||
java_import org.logstash.common.io.DeadLetterQueueWriter
|
||||
java_import org.logstash.config.ir.CompiledPipeline
|
||||
java_import org.logstash.config.ir.ConfigCompiler
|
||||
|
||||
module LogStash; class JavaBasePipeline < AbstractPipeline
|
||||
include LogStash::Util::Loggable
|
||||
|
@ -20,7 +16,7 @@ module LogStash; class JavaBasePipeline < AbstractPipeline
|
|||
|
||||
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
|
||||
@logger = self.logger
|
||||
super pipeline_config, namespaced_metric, @logger, @queue
|
||||
super pipeline_config, namespaced_metric, @logger
|
||||
@lir_execution = CompiledPipeline.new(
|
||||
lir,
|
||||
LogStash::Plugins::PluginFactory.new(
|
||||
|
@ -64,23 +60,15 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
:events_filtered,
|
||||
:started_at,
|
||||
:thread,
|
||||
:filter_queue_client,
|
||||
:input_queue_client
|
||||
:filter_queue_client
|
||||
|
||||
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
|
||||
|
||||
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
|
||||
begin
|
||||
@queue = LogStash::QueueFactory.create(pipeline_config.settings)
|
||||
rescue => e
|
||||
@logger.error("Logstash failed to create queue", default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
|
||||
raise e
|
||||
end
|
||||
super
|
||||
@worker_threads = []
|
||||
|
||||
@input_queue_client = @queue.write_client
|
||||
@filter_queue_client = @queue.read_client
|
||||
@filter_queue_client = queue.read_client
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients here.
|
||||
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
|
||||
|
@ -215,7 +203,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
|
||||
def close
|
||||
@filter_queue_client.close
|
||||
@queue.close
|
||||
queue.close
|
||||
close_dlq_writer
|
||||
end
|
||||
|
||||
|
@ -336,7 +324,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
|
|||
def inputworker(plugin)
|
||||
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
|
||||
begin
|
||||
plugin.run(LogStash::WrappedWriteClient.new(@input_queue_client, pipeline_id.to_s.to_sym, metric, plugin.id.to_sym))
|
||||
plugin.run(LogStash::WrappedWriteClient.new(input_queue_client, pipeline_id.to_s.to_sym, metric, plugin.id.to_sym))
|
||||
rescue => e
|
||||
if plugin.stop?
|
||||
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
|
||||
|
|
|
@ -12,11 +12,6 @@ require "logstash/instrument/collector"
|
|||
require "logstash/filter_delegator"
|
||||
require "logstash/compiler"
|
||||
|
||||
java_import org.logstash.common.DeadLetterQueueFactory
|
||||
java_import org.logstash.common.SourceWithMetadata
|
||||
java_import org.logstash.common.io.DeadLetterQueueWriter
|
||||
java_import org.logstash.config.ir.ConfigCompiler
|
||||
|
||||
module LogStash; class BasePipeline < AbstractPipeline
|
||||
include LogStash::Util::Loggable
|
||||
|
||||
|
@ -24,7 +19,7 @@ module LogStash; class BasePipeline < AbstractPipeline
|
|||
|
||||
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
|
||||
@logger = self.logger
|
||||
super pipeline_config, namespaced_metric, @logger, @queue
|
||||
super pipeline_config, namespaced_metric, @logger
|
||||
@mutex = Mutex.new
|
||||
|
||||
@inputs = nil
|
||||
|
@ -89,24 +84,16 @@ module LogStash; class Pipeline < BasePipeline
|
|||
:events_filtered,
|
||||
:started_at,
|
||||
:thread,
|
||||
:filter_queue_client,
|
||||
:input_queue_client
|
||||
:filter_queue_client
|
||||
|
||||
MAX_INFLIGHT_WARN_THRESHOLD = 10_000
|
||||
|
||||
def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
|
||||
begin
|
||||
@queue = LogStash::QueueFactory.create(pipeline_config.settings)
|
||||
rescue => e
|
||||
@logger.error("Logstash failed to create queue", default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
|
||||
raise e
|
||||
end
|
||||
super
|
||||
|
||||
@worker_threads = []
|
||||
|
||||
@input_queue_client = @queue.write_client
|
||||
@filter_queue_client = @queue.read_client
|
||||
@filter_queue_client = queue.read_client
|
||||
@signal_queue = java.util.concurrent.LinkedBlockingQueue.new
|
||||
# Note that @inflight_batches as a central mechanism for tracking inflight
|
||||
# batches will fail if we have multiple read clients here.
|
||||
|
@ -241,7 +228,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
|
||||
def close
|
||||
@filter_queue_client.close
|
||||
@queue.close
|
||||
queue.close
|
||||
close_dlq_writer
|
||||
end
|
||||
|
||||
|
@ -432,8 +419,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
def inputworker(plugin)
|
||||
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
|
||||
begin
|
||||
input_queue_client = wrapped_write_client(plugin.id.to_sym)
|
||||
plugin.run(input_queue_client)
|
||||
plugin.run(wrapped_write_client(plugin.id.to_sym))
|
||||
rescue => e
|
||||
if plugin.stop?
|
||||
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
|
||||
|
@ -636,7 +622,7 @@ module LogStash; class Pipeline < BasePipeline
|
|||
def wrapped_write_client(plugin_id)
|
||||
#need to ensure that metrics are initialized one plugin at a time, else a race condition can exist.
|
||||
@mutex.synchronize do
|
||||
LogStash::WrappedWriteClient.new(@input_queue_client, pipeline_id.to_s.to_sym, metric, plugin_id)
|
||||
LogStash::WrappedWriteClient.new(input_queue_client, pipeline_id.to_s.to_sym, metric, plugin_id)
|
||||
end
|
||||
end
|
||||
end; end
|
||||
|
|
|
@ -16,14 +16,15 @@ import org.logstash.common.BufferedTokenizerExt;
|
|||
import org.logstash.config.ir.compiler.FilterDelegatorExt;
|
||||
import org.logstash.config.ir.compiler.OutputDelegatorExt;
|
||||
import org.logstash.config.ir.compiler.OutputStrategyExt;
|
||||
import org.logstash.execution.*;
|
||||
import org.logstash.execution.AbstractPipelineExt;
|
||||
import org.logstash.execution.AbstractWrappedQueueExt;
|
||||
import org.logstash.execution.ConvergeResultExt;
|
||||
import org.logstash.execution.EventDispatcherExt;
|
||||
import org.logstash.execution.ExecutionContextExt;
|
||||
import org.logstash.execution.AbstractPipelineExt;
|
||||
import org.logstash.execution.PipelineReporterExt;
|
||||
import org.logstash.execution.QueueReadClientBase;
|
||||
import org.logstash.execution.ShutdownWatcherExt;
|
||||
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
||||
import org.logstash.ext.JRubyLogstashErrorsExt;
|
||||
import org.logstash.ext.JRubyWrappedWriteClientExt;
|
||||
import org.logstash.ext.JrubyAckedReadClientExt;
|
||||
|
@ -78,6 +79,8 @@ public final class RubyUtil {
|
|||
|
||||
public static final RubyClass ACKED_READ_CLIENT_CLASS;
|
||||
|
||||
public static final RubyClass ABSTRACT_WRITE_CLIENT_CLASS;
|
||||
|
||||
public static final RubyClass MEMORY_WRITE_CLIENT_CLASS;
|
||||
|
||||
public static final RubyClass ACKED_WRITE_CLIENT_CLASS;
|
||||
|
@ -359,6 +362,11 @@ public final class RubyUtil {
|
|||
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
|
||||
);
|
||||
ABSTRACT_WRAPPED_QUEUE_CLASS.defineAnnotatedMethods(AbstractWrappedQueueExt.class);
|
||||
ABSTRACT_WRITE_CLIENT_CLASS = LOGSTASH_MODULE.defineClassUnder(
|
||||
"AbstractQueueWriteClient", RUBY.getObject(),
|
||||
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
|
||||
);
|
||||
ABSTRACT_WRITE_CLIENT_CLASS.defineAnnotatedMethods(JRubyAbstractQueueWriteClientExt.class);
|
||||
WRAPPED_WRITE_CLIENT_CLASS =
|
||||
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
|
||||
QUEUE_READ_CLIENT_BASE_CLASS =
|
||||
|
@ -367,10 +375,14 @@ public final class RubyUtil {
|
|||
setupLogstashClass(QUEUE_READ_CLIENT_BASE_CLASS, JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class);
|
||||
ACKED_READ_CLIENT_CLASS =
|
||||
setupLogstashClass(QUEUE_READ_CLIENT_BASE_CLASS, JrubyAckedReadClientExt::new, JrubyAckedReadClientExt.class);
|
||||
MEMORY_WRITE_CLIENT_CLASS =
|
||||
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
|
||||
ACKED_WRITE_CLIENT_CLASS =
|
||||
setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class);
|
||||
MEMORY_WRITE_CLIENT_CLASS = setupLogstashClass(
|
||||
ABSTRACT_WRITE_CLIENT_CLASS, JrubyMemoryWriteClientExt::new,
|
||||
JrubyMemoryWriteClientExt.class
|
||||
);
|
||||
ACKED_WRITE_CLIENT_CLASS = setupLogstashClass(
|
||||
ABSTRACT_WRITE_CLIENT_CLASS, JrubyAckedWriteClientExt::new,
|
||||
JrubyAckedWriteClientExt.class
|
||||
);
|
||||
WRAPPED_SYNCHRONOUS_QUEUE_CLASS = setupLogstashClass(
|
||||
ABSTRACT_WRAPPED_QUEUE_CLASS, JrubyWrappedSynchronousQueueExt::new,
|
||||
JrubyWrappedSynchronousQueueExt.class
|
||||
|
@ -480,15 +492,15 @@ public final class RubyUtil {
|
|||
);
|
||||
CONVERGE_RESULT_CLASS = setupLogstashClass(ConvergeResultExt::new, ConvergeResultExt.class);
|
||||
ACTION_RESULT_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"ActionResult", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
|
||||
"ActionResult", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
|
||||
);
|
||||
ACTION_RESULT_CLASS.defineAnnotatedMethods(ConvergeResultExt.ActionResultExt.class);
|
||||
SUCCESSFUL_ACTION_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"SuccessfulAction", ACTION_RESULT_CLASS, ConvergeResultExt.SuccessfulActionExt::new
|
||||
"SuccessfulAction", ACTION_RESULT_CLASS, ConvergeResultExt.SuccessfulActionExt::new
|
||||
);
|
||||
SUCCESSFUL_ACTION_CLASS.defineAnnotatedMethods(ConvergeResultExt.SuccessfulActionExt.class);
|
||||
FAILED_ACTION_CLASS = CONVERGE_RESULT_CLASS.defineClassUnder(
|
||||
"FailedAction", ACTION_RESULT_CLASS, ConvergeResultExt.FailedActionExt::new
|
||||
"FailedAction", ACTION_RESULT_CLASS, ConvergeResultExt.FailedActionExt::new
|
||||
);
|
||||
FAILED_ACTION_CLASS.defineAnnotatedMethods(ConvergeResultExt.FailedActionExt.class);
|
||||
HOOKS_REGISTRY_CLASS =
|
||||
|
|
|
@ -13,6 +13,7 @@ import org.jruby.runtime.ThreadContext;
|
|||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.execution.AbstractWrappedQueueExt;
|
||||
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
||||
import org.logstash.ext.JrubyAckedReadClientExt;
|
||||
import org.logstash.ext.JrubyAckedWriteClientExt;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
|
@ -71,7 +72,7 @@ public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected IRubyObject getWriteClient(final ThreadContext context) {
|
||||
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
|
||||
return JrubyAckedWriteClientExt.create(queue, isClosed);
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,8 @@ import java.security.NoSuchAlgorithmException;
|
|||
import java.util.Arrays;
|
||||
import java.util.UUID;
|
||||
import org.apache.commons.codec.binary.Hex;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyBasicObject;
|
||||
|
@ -21,12 +23,14 @@ import org.jruby.javasupport.JavaUtil;
|
|||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.ackedqueue.QueueFactoryExt;
|
||||
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
|
||||
import org.logstash.common.DeadLetterQueueFactory;
|
||||
import org.logstash.common.IncompleteSourceWithMetadataException;
|
||||
import org.logstash.config.ir.ConfigCompiler;
|
||||
import org.logstash.config.ir.PipelineIR;
|
||||
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
||||
import org.logstash.instrument.metrics.AbstractMetricExt;
|
||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||
import org.logstash.instrument.metrics.NullMetricExt;
|
||||
|
@ -34,6 +38,8 @@ import org.logstash.instrument.metrics.NullMetricExt;
|
|||
@JRubyClass(name = "AbstractPipeline")
|
||||
public final class AbstractPipelineExt extends RubyBasicObject {
|
||||
|
||||
private static final Logger LOGGER = LogManager.getLogger(AbstractPipelineExt.class);
|
||||
|
||||
private static final RubyArray CAPACITY_NAMESPACE =
|
||||
RubyArray.newArray(RubyUtil.RUBY, RubyUtil.RUBY.newSymbol("capacity"));
|
||||
|
||||
|
@ -96,21 +102,23 @@ public final class AbstractPipelineExt extends RubyBasicObject {
|
|||
|
||||
private PipelineReporterExt reporter;
|
||||
|
||||
private IRubyObject queue;
|
||||
private AbstractWrappedQueueExt queue;
|
||||
|
||||
private JRubyAbstractQueueWriteClientExt inputQueueClient;
|
||||
|
||||
public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod(required = 4)
|
||||
public AbstractPipelineExt initialize(final ThreadContext context, final IRubyObject[] args)
|
||||
@JRubyMethod
|
||||
public AbstractPipelineExt initialize(final ThreadContext context,
|
||||
final IRubyObject pipelineConfig, final IRubyObject namespacedMetric,
|
||||
final IRubyObject rubyLogger)
|
||||
throws NoSuchAlgorithmException, IncompleteSourceWithMetadataException {
|
||||
final IRubyObject namespacedMetric = args[1];
|
||||
queue = args[3];
|
||||
reporter = new PipelineReporterExt(
|
||||
context.runtime, RubyUtil.PIPELINE_REPORTER_CLASS).initialize(context, args[2], this
|
||||
context.runtime, RubyUtil.PIPELINE_REPORTER_CLASS).initialize(context, rubyLogger, this
|
||||
);
|
||||
pipelineSettings = args[0];
|
||||
pipelineSettings = pipelineConfig;
|
||||
configString = (RubyString) pipelineSettings.callMethod(context, "config_string");
|
||||
configHash = context.runtime.newString(
|
||||
Hex.encodeHexString(
|
||||
|
@ -118,6 +126,13 @@ public final class AbstractPipelineExt extends RubyBasicObject {
|
|||
)
|
||||
);
|
||||
settings = pipelineSettings.callMethod(context, "settings");
|
||||
try {
|
||||
queue = QueueFactoryExt.create(context, null, settings);
|
||||
} catch (final Exception ex) {
|
||||
LOGGER.error("Logstash failed to create queue.", ex);
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
inputQueueClient = queue.writeClient(context);
|
||||
final IRubyObject id = getSetting(context, "pipeline.id");
|
||||
if (id.isNil()) {
|
||||
pipelineId = id();
|
||||
|
@ -286,8 +301,13 @@ public final class AbstractPipelineExt extends RubyBasicObject {
|
|||
return context.nil;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "input_queue_client")
|
||||
public JRubyAbstractQueueWriteClientExt inputQueueClient() {
|
||||
return inputQueueClient;
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public IRubyObject queue() {
|
||||
public AbstractWrappedQueueExt queue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import org.jruby.anno.JRubyClass;
|
|||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
|
||||
|
||||
@JRubyClass(name = "AbstractWrappedQueue")
|
||||
public abstract class AbstractWrappedQueueExt extends RubyBasicObject {
|
||||
|
@ -16,7 +17,7 @@ public abstract class AbstractWrappedQueueExt extends RubyBasicObject {
|
|||
}
|
||||
|
||||
@JRubyMethod(name = "write_client")
|
||||
public final IRubyObject writeClient(final ThreadContext context) {
|
||||
public final JRubyAbstractQueueWriteClientExt writeClient(final ThreadContext context) {
|
||||
return getWriteClient(context);
|
||||
}
|
||||
|
||||
|
@ -32,7 +33,7 @@ public abstract class AbstractWrappedQueueExt extends RubyBasicObject {
|
|||
|
||||
protected abstract IRubyObject doClose(ThreadContext context);
|
||||
|
||||
protected abstract IRubyObject getWriteClient(ThreadContext context);
|
||||
protected abstract JRubyAbstractQueueWriteClientExt getWriteClient(ThreadContext context);
|
||||
|
||||
protected abstract IRubyObject getReadClient();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
package org.logstash.ext;
|
||||
|
||||
import java.util.Collection;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyBasicObject;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
|
||||
@JRubyClass(name = "AbstractQueueWriteClient")
|
||||
public abstract class JRubyAbstractQueueWriteClientExt extends RubyBasicObject {
|
||||
|
||||
protected JRubyAbstractQueueWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
|
||||
super(runtime, metaClass);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = {"push", "<<"}, required = 1)
|
||||
public final JRubyAbstractQueueWriteClientExt rubyPush(final ThreadContext context,
|
||||
final IRubyObject event) throws InterruptedException {
|
||||
doPush(context, (JrubyEventExtLibrary.RubyEvent) event);
|
||||
return this;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@JRubyMethod(name = "push_batch", required = 1)
|
||||
public final JRubyAbstractQueueWriteClientExt rubyPushBatch(final ThreadContext context,
|
||||
final IRubyObject batch) throws InterruptedException {
|
||||
doPushBatch(context, (Collection<JrubyEventExtLibrary.RubyEvent>) batch);
|
||||
return this;
|
||||
}
|
||||
|
||||
protected abstract JRubyAbstractQueueWriteClientExt doPush(ThreadContext context,
|
||||
JrubyEventExtLibrary.RubyEvent event) throws InterruptedException;
|
||||
|
||||
protected abstract JRubyAbstractQueueWriteClientExt doPushBatch(ThreadContext context,
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> batch) throws InterruptedException;
|
||||
}
|
|
@ -4,7 +4,6 @@ import java.util.Collection;
|
|||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
|
@ -13,14 +12,14 @@ import org.logstash.RubyUtil;
|
|||
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||
|
||||
@JRubyClass(name = "AckedWriteClient")
|
||||
public final class JrubyAckedWriteClientExt extends RubyObject {
|
||||
public final class JrubyAckedWriteClientExt extends JRubyAbstractQueueWriteClientExt {
|
||||
|
||||
private JRubyAckedQueueExt queue;
|
||||
|
||||
private AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
@JRubyMethod(meta = true, required = 2)
|
||||
public static JrubyAckedWriteClientExt create(final ThreadContext context, IRubyObject recv,
|
||||
public static JrubyAckedWriteClientExt create(final ThreadContext context, final IRubyObject recv,
|
||||
final IRubyObject queue, final IRubyObject closed) {
|
||||
return new JrubyAckedWriteClientExt(
|
||||
context.runtime, RubyUtil.ACKED_WRITE_CLIENT_CLASS,
|
||||
|
@ -31,9 +30,9 @@ public final class JrubyAckedWriteClientExt extends RubyObject {
|
|||
);
|
||||
}
|
||||
|
||||
public static JrubyAckedWriteClientExt create(JRubyAckedQueueExt queue, AtomicBoolean closed) {
|
||||
public static JrubyAckedWriteClientExt create(final JRubyAckedQueueExt queue, final AtomicBoolean closed) {
|
||||
return new JrubyAckedWriteClientExt(
|
||||
RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue, closed);
|
||||
RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue, closed);
|
||||
}
|
||||
|
||||
public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
|
||||
|
@ -47,17 +46,19 @@ public final class JrubyAckedWriteClientExt extends RubyObject {
|
|||
this.closed = closed;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = {"push", "<<"}, required = 1)
|
||||
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) {
|
||||
@Override
|
||||
protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context,
|
||||
final JrubyEventExtLibrary.RubyEvent event) {
|
||||
ensureOpen();
|
||||
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
|
||||
queue.rubyWrite(context, event.getEvent());
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "push_batch", required = 1)
|
||||
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) {
|
||||
@Override
|
||||
protected JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> batch) {
|
||||
ensureOpen();
|
||||
for (final IRubyObject event : (Collection<JrubyEventExtLibrary.RubyEvent>) batch) {
|
||||
for (final IRubyObject event : batch) {
|
||||
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
|
||||
}
|
||||
return this;
|
||||
|
|
|
@ -4,16 +4,13 @@ import java.util.Collection;
|
|||
import java.util.concurrent.BlockingQueue;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.common.LsQueueUtils;
|
||||
|
||||
@JRubyClass(name = "MemoryWriteClient")
|
||||
public final class JrubyMemoryWriteClientExt extends RubyObject {
|
||||
public final class JrubyMemoryWriteClientExt extends JRubyAbstractQueueWriteClientExt {
|
||||
|
||||
private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
|
||||
|
||||
|
@ -22,31 +19,29 @@ public final class JrubyMemoryWriteClientExt extends RubyObject {
|
|||
}
|
||||
|
||||
private JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass,
|
||||
BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
|
||||
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
|
||||
super(runtime, metaClass);
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
public static JrubyMemoryWriteClientExt create(
|
||||
BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
|
||||
final BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
|
||||
return new JrubyMemoryWriteClientExt(RubyUtil.RUBY,
|
||||
RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue);
|
||||
RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = {"push", "<<"}, required = 1)
|
||||
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event)
|
||||
throws InterruptedException {
|
||||
queue.put((JrubyEventExtLibrary.RubyEvent) event);
|
||||
@Override
|
||||
protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context,
|
||||
final JrubyEventExtLibrary.RubyEvent event)
|
||||
throws InterruptedException {
|
||||
queue.put(event);
|
||||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "push_batch", required = 1)
|
||||
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch)
|
||||
throws InterruptedException {
|
||||
Collection<JrubyEventExtLibrary.RubyEvent> typedBatch =
|
||||
(Collection<JrubyEventExtLibrary.RubyEvent>)batch;
|
||||
LsQueueUtils.addAll(queue, typedBatch);
|
||||
@Override
|
||||
public JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context,
|
||||
final Collection<JrubyEventExtLibrary.RubyEvent> batch) throws InterruptedException {
|
||||
LsQueueUtils.addAll(queue, batch);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueE
|
|||
}
|
||||
|
||||
@Override
|
||||
protected IRubyObject getWriteClient(final ThreadContext context) {
|
||||
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
|
||||
return JrubyMemoryWriteClientExt.create(queue);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue