JAVAFICATION: Cleanup OutputDelegatorExt

Fixes #9740
This commit is contained in:
Armin 2018-06-13 17:09:08 +02:00 committed by Armin Braun
parent 30e055b1bf
commit 5c6e3e71d7
17 changed files with 358 additions and 207 deletions

View file

@ -13,15 +13,17 @@ import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.BufferedTokenizerExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.execution.AbstractPipelineExt;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.ConvergeResultExt;
import org.logstash.execution.EventDispatcherExt;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.execution.PipelineReporterExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.execution.ShutdownWatcherExt;
@ -94,7 +96,11 @@ public final class RubyUtil {
public static final RubyClass ACKED_QUEUE_CLASS;
public static final RubyClass OUTPUT_DELEGATOR_CLASS;
public static final RubyClass ABSTRACT_OUTPUT_DELEGATOR_CLASS;
public static final RubyClass RUBY_OUTPUT_DELEGATOR_CLASS;
public static final RubyClass JAVA_OUTPUT_DELEGATOR_CLASS;
public static final RubyClass FILTER_DELEGATOR_CLASS;
@ -399,8 +405,17 @@ public final class RubyUtil {
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);
OUTPUT_DELEGATOR_CLASS = setupLogstashClass(
OutputDelegatorExt::new, OutputDelegatorExt.class
ABSTRACT_OUTPUT_DELEGATOR_CLASS = LOGSTASH_MODULE.defineClassUnder(
"AbstractOutputDelegator", RUBY.getObject(),
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
ABSTRACT_OUTPUT_DELEGATOR_CLASS.defineAnnotatedMethods(AbstractOutputDelegatorExt.class);
RUBY_OUTPUT_DELEGATOR_CLASS = setupLogstashClass(
ABSTRACT_OUTPUT_DELEGATOR_CLASS, OutputDelegatorExt::new, OutputDelegatorExt.class
);
JAVA_OUTPUT_DELEGATOR_CLASS = setupLogstashClass(
ABSTRACT_OUTPUT_DELEGATOR_CLASS, JavaOutputDelegatorExt::new,
JavaOutputDelegatorExt.class
);
FILTER_DELEGATOR_CLASS = setupLogstashClass(
FilterDelegatorExt::new, FilterDelegatorExt.class

View file

@ -15,12 +15,12 @@ import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.Rubyfier;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.ComputeStepSyntaxElement;
import org.logstash.config.ir.compiler.Dataset;
import org.logstash.config.ir.compiler.DatasetCompiler;
import org.logstash.config.ir.compiler.EventCondition;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.compiler.SplitDataset;
import org.logstash.config.ir.graph.IfVertex;
@ -58,7 +58,7 @@ public final class CompiledPipeline {
/**
* Configured outputs.
*/
private final Map<String, OutputDelegatorExt> outputs;
private final Map<String, AbstractOutputDelegatorExt> outputs;
/**
* Parsed pipeline configuration graph.
@ -103,9 +103,9 @@ public final class CompiledPipeline {
/**
* Sets up all Ruby outputs learnt from {@link PipelineIR}.
*/
private Map<String, OutputDelegatorExt> setupOutputs() {
private Map<String, AbstractOutputDelegatorExt> setupOutputs() {
final Collection<PluginVertex> outs = pipelineIR.getOutputPluginVertices();
final Map<String, OutputDelegatorExt> res = new HashMap<>(outs.size());
final Map<String, AbstractOutputDelegatorExt> res = new HashMap<>(outs.size());
outs.forEach(v -> {
final PluginDefinition def = v.getPluginDefinition();
final SourceWithMetadata source = v.getSourceWithMetadata();

View file

@ -0,0 +1,130 @@
package org.logstash.config.ir.compiler;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
@JRubyClass(name = "AbstractOutputDelegator")
public abstract class AbstractOutputDelegatorExt extends RubyObject {
private AbstractMetricExt metric;
protected AbstractNamespacedMetricExt namespacedMetric;
private IRubyObject metricEvents;
private RubyString id;
private LongCounter eventMetricOut;
private LongCounter eventMetricIn;
private LongCounter eventMetricTime;
public AbstractOutputDelegatorExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public IRubyObject register(final ThreadContext context) {
doRegister(context);
return context.nil;
}
@JRubyMethod(name = "do_close")
public IRubyObject doClose(final ThreadContext context) {
close(context);
return context.nil;
}
@JRubyMethod(name = "reloadable?")
public IRubyObject isReloadable(final ThreadContext context) {
return reloadable(context);
}
@JRubyMethod
public IRubyObject concurrency(final ThreadContext context) {
return getConcurrency(context);
}
@JRubyMethod(name = "config_name")
public IRubyObject configName(final ThreadContext context) {
return getConfigName(context);
}
@JRubyMethod(name = "id")
public IRubyObject getId() {
return id;
}
@JRubyMethod
public IRubyObject metric() {
return metric;
}
@JRubyMethod(name = "namespaced_metric")
public IRubyObject namespacedMetric() {
return namespacedMetric;
}
@JRubyMethod(name = "metric_events")
public IRubyObject metricEvents() {
return metricEvents;
}
@JRubyMethod(name = "multi_receive")
public IRubyObject multiReceive(final IRubyObject events) {
final RubyArray batch = (RubyArray) events;
final int count = batch.size();
eventMetricIn.increment((long) count);
final long start = System.nanoTime();
doOutput(batch);
eventMetricTime.increment(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
);
eventMetricOut.increment((long) count);
return this;
}
protected void initMetrics(final String id, final AbstractMetricExt metric) {
this.metric = metric;
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
this.id = RubyString.newString(context.runtime, id);
namespacedMetric = metric.namespace(context, context.runtime.newSymbol(id));
metricEvents = namespacedMetric.namespace(context, MetricKeys.EVENTS_KEY);
namespacedMetric.gauge(
context, MetricKeys.NAME_KEY, configName(context)
);
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
eventMetricTime = LongCounter.fromRubyBase(
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
);
}
protected abstract IRubyObject getConfigName(ThreadContext context);
protected abstract IRubyObject getConcurrency(ThreadContext context);
protected abstract void doOutput(Collection<JrubyEventExtLibrary.RubyEvent> batch);
protected abstract void close(ThreadContext context);
protected abstract void doRegister(ThreadContext context);
protected abstract IRubyObject reloadable(ThreadContext context);
}

View file

@ -157,7 +157,7 @@ public final class DatasetCompiler {
* @return Output Dataset
*/
public static ComputeStepSyntaxElement<Dataset> outputDataset(final Collection<Dataset> parents,
final OutputDelegatorExt output, final boolean terminal) {
final AbstractOutputDelegatorExt output, final boolean terminal) {
final ClassFields fields = new ClassFields();
final Closure clearSyntax;
final Closure computeSyntax;

View file

@ -9,7 +9,6 @@ import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
@ -47,17 +46,14 @@ public final class FilterDelegatorExt extends RubyObject {
this.filter = filter;
this.filterClass = filter.getSingletonClass().getRealClass();
final IRubyObject namespacedMetric = filter.callMethod(context, "metric");
metricEvents = namespacedMetric.callMethod(context, "namespace", RubyUtil.RUBY.newSymbol("events"));
metricEvents = namespacedMetric.callMethod(context, "namespace", MetricKeys.EVENTS_KEY);
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
eventMetricTime = LongCounter.fromRubyBase(
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
);
namespacedMetric.callMethod(
context, "gauge",
new IRubyObject[]{
RubySymbol.newSymbol(context.runtime, "name"), configName(context)
}
context, "gauge", new IRubyObject[]{MetricKeys.NAME_KEY, configName(context)}
);
flushes = filter.respondsTo("flush");
return this;

View file

@ -0,0 +1,76 @@
package org.logstash.config.ir.compiler;
import java.util.Collection;
import java.util.function.Consumer;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;
@JRubyClass(name = "JavaOutputDelegator")
public final class JavaOutputDelegatorExt extends AbstractOutputDelegatorExt {
private static final RubySymbol CONCURRENCY = RubyUtil.RUBY.newSymbol("java");
private RubyString configName;
private Consumer<Collection<JrubyEventExtLibrary.RubyEvent>> outputFunction;
private Runnable closeAction;
private Runnable registerAction;
public JavaOutputDelegatorExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
public static JavaOutputDelegatorExt create(final String configName, final String id,
final AbstractMetricExt metric,
final Consumer<Collection<JrubyEventExtLibrary.RubyEvent>> outputFunction,
final Runnable closeAction, final Runnable registerAction) {
final JavaOutputDelegatorExt instance =
new JavaOutputDelegatorExt(RubyUtil.RUBY, RubyUtil.JAVA_OUTPUT_DELEGATOR_CLASS);
instance.initMetrics(id, metric);
instance.configName = RubyUtil.RUBY.newString(configName);
instance.outputFunction = outputFunction;
instance.closeAction = closeAction;
instance.registerAction = registerAction;
return instance;
}
@Override
protected IRubyObject getConfigName(final ThreadContext context) {
return configName;
}
@Override
protected IRubyObject getConcurrency(final ThreadContext context) {
return CONCURRENCY;
}
@Override
protected void doOutput(final Collection<JrubyEventExtLibrary.RubyEvent> batch) {
outputFunction.accept(batch);
}
@Override
protected void close(final ThreadContext context) {
closeAction.run();
}
@Override
protected void doRegister(final ThreadContext context) {
registerAction.run();
}
@Override
protected IRubyObject reloadable(final ThreadContext context) {
return context.tru;
}
}

View file

@ -1,25 +1,23 @@
package org.logstash.config.ir.compiler;
import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.WorkerLoop;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;
@JRubyClass(name = "OutputDelegator")
public final class OutputDelegatorExt extends RubyObject {
public final class OutputDelegatorExt extends AbstractOutputDelegatorExt {
private static final long serialVersionUID = 1L;
@ -27,59 +25,30 @@ public final class OutputDelegatorExt extends RubyObject {
private OutputStrategyExt.AbstractOutputStrategyExt strategy;
private IRubyObject metric;
private IRubyObject namespacedMetric;
private IRubyObject metricEvents;
private RubyString id;
private LongCounter eventMetricOut;
private LongCounter eventMetricIn;
private LongCounter eventMetricTime;
@JRubyMethod(name = "initialize", optional = 5)
public IRubyObject init(final ThreadContext context, final IRubyObject[] arguments) {
outputClass = arguments[0];
metric = arguments[1];
final RubyHash args = (RubyHash) arguments[4];
id = (RubyString) args.op_aref(context, RubyString.newString(context.runtime, "id"));
namespacedMetric = metric.callMethod(context, "namespace", id.intern19());
metricEvents = namespacedMetric.callMethod(
context, "namespace", RubySymbol.newSymbol(context.runtime, "events")
public OutputDelegatorExt initialize(final ThreadContext context, final IRubyObject[] arguments) {
return initialize(
context, (RubyHash) arguments[4], arguments[0], (AbstractMetricExt) arguments[1],
(ExecutionContextExt) arguments[2],
(OutputStrategyExt.OutputStrategyRegistryExt) arguments[3]
);
namespacedMetric.callMethod(
context, "gauge",
new IRubyObject[]{
RubySymbol.newSymbol(context.runtime, "name"), configName(context)
}
);
eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY);
eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY);
eventMetricTime = LongCounter.fromRubyBase(
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
);
strategy = (OutputStrategyExt.AbstractOutputStrategyExt) (
(OutputStrategyExt.OutputStrategyRegistryExt) arguments[3])
.classFor(context, concurrency(context)).newInstance(
context,
new IRubyObject[]{outputClass, namespacedMetric, arguments[2], args},
Block.NULL_BLOCK
);
return this;
}
@VisibleForTesting
public OutputDelegatorExt initForTesting(
final OutputStrategyExt.AbstractOutputStrategyExt strategy
) {
eventMetricOut = LongCounter.DUMMY_COUNTER;
eventMetricIn = LongCounter.DUMMY_COUNTER;
eventMetricTime = LongCounter.DUMMY_COUNTER;
this.strategy = strategy;
public OutputDelegatorExt initialize(final ThreadContext context, final RubyHash args,
final IRubyObject outputClass, final AbstractMetricExt metric,
final ExecutionContextExt executionContext,
final OutputStrategyExt.OutputStrategyRegistryExt strategyRegistry) {
this.outputClass = outputClass;
initMetrics(
args.op_aref(context, RubyString.newString(context.runtime, "id")).asJavaString(),
metric
);
strategy = (OutputStrategyExt.AbstractOutputStrategyExt) strategyRegistry.classFor(
context, concurrency(context)
).newInstance(
context, new IRubyObject[]{outputClass, namespacedMetric, executionContext, args},
Block.NULL_BLOCK
);
return this;
}
@ -88,75 +57,43 @@ public final class OutputDelegatorExt extends RubyObject {
}
@JRubyMethod
public IRubyObject register(final ThreadContext context) {
return strategy.register(context);
}
@JRubyMethod(name = "do_close")
public IRubyObject doClose(final ThreadContext context) {
return strategy.doClose(context);
}
@JRubyMethod(name = "reloadable?")
public IRubyObject isReloadable(final ThreadContext context) {
return outputClass.callMethod(context, "reloadable?");
}
@JRubyMethod
public IRubyObject concurrency(final ThreadContext context) {
return outputClass.callMethod(context, "concurrency");
}
@JRubyMethod(name = "config_name")
public IRubyObject configName(final ThreadContext context) {
return outputClass.callMethod(context, "config_name");
}
@JRubyMethod
public IRubyObject id(final ThreadContext context) {
return id;
}
@JRubyMethod
public IRubyObject metric(final ThreadContext context) {
return metric;
}
@JRubyMethod(name = "namespaced_metric")
public IRubyObject namespacedMetric(final ThreadContext context) {
return namespacedMetric;
}
@JRubyMethod(name = "metric_events")
public IRubyObject metricEvents(final ThreadContext context) {
return metricEvents;
}
@JRubyMethod
public IRubyObject strategy(final ThreadContext context) {
@VisibleForTesting
public IRubyObject strategy() {
return strategy;
}
public IRubyObject multiReceive(final RubyArray events) {
@Override
protected IRubyObject getConfigName(final ThreadContext context) {
return outputClass.callMethod(context, "config_name");
}
@Override
protected IRubyObject getConcurrency(final ThreadContext context) {
return outputClass.callMethod(context, "concurrency");
}
@Override
protected void doOutput(final Collection<JrubyEventExtLibrary.RubyEvent> batch) {
try {
return multiReceive(WorkerLoop.THREAD_CONTEXT.get(), events);
strategy.multiReceive(WorkerLoop.THREAD_CONTEXT.get(), (IRubyObject) batch);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@JRubyMethod(name = "multi_receive")
public IRubyObject multiReceive(final ThreadContext context, final IRubyObject events)
throws InterruptedException {
final RubyArray batch = (RubyArray) events;
final int count = batch.size();
eventMetricIn.increment((long) count);
final long start = System.nanoTime();
strategy.multiReceive(context, batch);
eventMetricTime.increment(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
);
eventMetricOut.increment((long) count);
return this;
@Override
protected void close(final ThreadContext context) {
strategy.doClose(context);
}
@Override
protected void doRegister(final ThreadContext context) {
strategy.register(context);
}
@Override
protected IRubyObject reloadable(final ThreadContext context) {
return outputClass.callMethod(context, "reloadable?");
}
}

View file

@ -21,7 +21,7 @@ public final class RubyIntegration {
IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
OutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column,
AbstractOutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column,
IRubyObject args);
FilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column,

View file

@ -33,6 +33,7 @@ import org.logstash.config.ir.PipelineIR;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.NullMetricExt;
@JRubyClass(name = "AbstractPipeline")
@ -69,8 +70,6 @@ public class AbstractPipelineExt extends RubyBasicObject {
private static final RubySymbol PIPELINES_KEY = RubyUtil.RUBY.newSymbol("pipelines");
private static final RubySymbol EVENTS_KEY = RubyUtil.RUBY.newSymbol("events");
private static final RubySymbol TYPE_KEY = RubyUtil.RUBY.newSymbol("type");
private static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue");
@ -296,7 +295,7 @@ public class AbstractPipelineExt extends RubyBasicObject {
);
dataMetrics.gauge(context, STORAGE_TYPE, context.runtime.newString(fileStore.type()));
dataMetrics.gauge(context, PATH, dirPath);
pipelineMetric.gauge(context, EVENTS_KEY, inner.ruby_unread_count(context));
pipelineMetric.gauge(context, MetricKeys.EVENTS_KEY, inner.ruby_unread_count(context));
}
return context.nil;
}

View file

@ -166,7 +166,7 @@ public final class PipelineReporterExt extends RubyBasicObject {
final OutputDelegatorExt delegator = (OutputDelegatorExt) output;
final RubyHash hash = RubyHash.newHash(context.runtime);
hash.op_aset(context, TYPE_KEY, delegator.configName(context));
hash.op_aset(context, ID_KEY, delegator.id(context));
hash.op_aset(context, ID_KEY, delegator.getId());
hash.op_aset(context, CONCURRENCY_KEY, delegator.concurrency(context));
result.add(hash);
});

View file

@ -9,6 +9,10 @@ public final class MetricKeys {
// Constant Holder
}
public static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name");
public static final RubySymbol EVENTS_KEY = RubyUtil.RUBY.newSymbol("events");
public static final RubySymbol OUT_KEY = RubyUtil.RUBY.newSymbol("out");
public static final RubySymbol IN_KEY = RubyUtil.RUBY.newSymbol("in");

View file

@ -17,6 +17,12 @@ public final class NullMetricExt extends AbstractSimpleMetricExt {
private IRubyObject collector;
public static NullMetricExt create() {
return new NullMetricExt(
RubyUtil.RUBY, RubyUtil.NULL_METRIC_CLASS
).initialize(RubyUtil.RUBY.getCurrentContext(), new IRubyObject[0]);
}
public NullMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}

View file

@ -21,6 +21,7 @@ import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
@ -110,7 +111,7 @@ public final class PluginFactoryExt {
@SuppressWarnings("unchecked")
@Override
public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return (OutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT,
@ -120,7 +121,7 @@ public final class PluginFactoryExt {
}
@JRubyMethod(required = 4)
public OutputDelegatorExt buildOutput(final ThreadContext context,
public AbstractOutputDelegatorExt buildOutput(final ThreadContext context,
final IRubyObject[] args) {
return buildOutput(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3]
@ -214,7 +215,7 @@ public final class PluginFactoryExt {
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
rubyArgs.putAll(newArgs);
if (type == PluginLookup.PluginType.OUTPUT) {
return new OutputDelegatorExt(context.runtime, RubyUtil.OUTPUT_DELEGATOR_CLASS).init(
return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize(
context,
new IRubyObject[]{
klass, typeScopedMetric, executionCntx,

View file

@ -7,22 +7,20 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyArray;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.ext.JrubyEventExtLibrary;
@ -181,15 +179,10 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
MatcherAssert.assertThat(testEvent.getEvent().getField("foo"), CoreMatchers.nullValue());
}
private Supplier<OutputStrategyExt.AbstractOutputStrategyExt> mockOutputSupplier() {
return () -> new OutputStrategyExt.SimpleAbstractOutputStrategyExt(RubyUtil.RUBY, RubyUtil.RUBY.getObject()) {
@Override
@SuppressWarnings("unchecked")
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
((RubyArray) events).forEach(event -> EVENT_SINKS.get(runId).add((JrubyEventExtLibrary.RubyEvent) event));
return this;
}
};
private Supplier<Consumer<Collection<JrubyEventExtLibrary.RubyEvent>>> mockOutputSupplier() {
return () -> events -> events.forEach(
event -> EVENT_SINKS.get(runId).add((JrubyEventExtLibrary.RubyEvent) event)
);
}
/**
@ -201,11 +194,12 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
private final Map<String, Supplier<IRubyObject>> filters;
private final Map<String, Supplier<OutputStrategyExt.AbstractOutputStrategyExt>> outputs;
private final Map<String, Supplier<Consumer<Collection<JrubyEventExtLibrary.RubyEvent>>>> outputs;
MockPluginFactory(final Map<String, Supplier<IRubyObject>> inputs,
final Map<String, Supplier<IRubyObject>> filters,
final Map<String, Supplier<OutputStrategyExt.AbstractOutputStrategyExt>> outputs) {
final Map<String, Supplier<Consumer<Collection<JrubyEventExtLibrary.RubyEvent>>>> outputs
) {
this.inputs = inputs;
this.filters = filters;
this.outputs = outputs;
@ -218,18 +212,16 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
}
@Override
public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
public AbstractOutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return new OutputDelegatorExt(
RubyUtil.RUBY, RubyUtil.OUTPUT_DELEGATOR_CLASS)
.initForTesting(setupPlugin(name, outputs));
return PipelineTestUtil.buildOutput(setupPlugin(name, outputs));
}
@Override
public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return new FilterDelegatorExt(
RubyUtil.RUBY, RubyUtil.OUTPUT_DELEGATOR_CLASS)
RubyUtil.RUBY, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS)
.initForTesting(setupPlugin(name, filters));
}

View file

@ -0,0 +1,23 @@
package org.logstash.config.ir;
import java.util.Collection;
import java.util.function.Consumer;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.NullMetricExt;
public final class PipelineTestUtil {
private PipelineTestUtil() {
//Utility Class
}
public static AbstractOutputDelegatorExt buildOutput(
final Consumer<Collection<JrubyEventExtLibrary.RubyEvent>> consumer) {
return JavaOutputDelegatorExt.create(
"someClassName", "someId", NullMetricExt.create(), consumer, () -> {},
() -> {}
);
}
}

View file

@ -2,12 +2,11 @@ package org.logstash.config.ir.compiler;
import java.util.Collections;
import org.jruby.RubyArray;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.FieldReference;
import org.logstash.RubyUtil;
import org.logstash.config.ir.PipelineTestUtil;
import org.logstash.ext.JrubyEventExtLibrary;
import static org.hamcrest.CoreMatchers.is;
@ -24,16 +23,7 @@ public final class DatasetCompilerTest {
assertThat(
DatasetCompiler.outputDataset(
Collections.emptyList(),
new OutputDelegatorExt(RubyUtil.RUBY, RubyUtil.OUTPUT_DELEGATOR_CLASS)
.initForTesting(
new OutputStrategyExt.SimpleAbstractOutputStrategyExt(
RubyUtil.RUBY, RubyUtil.RUBY.getObject()
) {
@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
return this;
}
}),
PipelineTestUtil.buildOutput(events -> {}),
true
).instantiate().compute(RubyUtil.RUBY.newArray(), false, false),
nullValue()

View file

@ -18,10 +18,9 @@ import org.logstash.instrument.metrics.NamespacedMetricExt;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.logstash.RubyUtil.EXECUTION_CONTEXT_CLASS;
import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS;
import static org.logstash.RubyUtil.OUTPUT_DELEGATOR_CLASS;
import static org.logstash.RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS;
import static org.logstash.RubyUtil.RUBY;
public class OutputDelegatorTest extends RubyEnvTestCase {
@ -77,23 +76,15 @@ public class OutputDelegatorTest extends RubyEnvTestCase {
@Test
public void multiReceivePassesBatch() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
assertEquals(events, fakeOutClass.getMultiReceiveArgs());
assertEquals(EVENT_COUNT, ((RubyArray) fakeOutClass.getMultiReceiveArgs()).size());
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
outputDelegator.multiReceive(events);
assertEquals(events, fakeOutClass.getMultiReceiveArgs());
assertEquals(EVENT_COUNT, ((RubyArray) fakeOutClass.getMultiReceiveArgs()).size());
}
@Test
public void multiReceiveIncrementsEventCount() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
outputDelegator.multiReceive(events);
assertEquals(EVENT_COUNT, getMetricLongValue("in"));
assertEquals(EVENT_COUNT, getMetricLongValue("out"));
@ -106,11 +97,7 @@ public class OutputDelegatorTest extends RubyEnvTestCase {
try {
fakeOutClass.setMultiReceiveDelay(delay);
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
outputDelegator.multiReceive(events);
millis = getMetricLongValue("duration_in_millis");
} finally {
fakeOutClass.setMultiReceiveDelay(0);
@ -159,11 +146,11 @@ public class OutputDelegatorTest extends RubyEnvTestCase {
assertEquals(pair.symbol, outStrategy);
// test that strategy classes are correctly instantiated
IRubyObject strategyClass = outputDelegator.strategy(RUBY.getCurrentContext());
IRubyObject strategyClass = outputDelegator.strategy();
assertThat(strategyClass).isInstanceOf(pair.klazz);
// test that metrics are properly set on the instance
assertEquals(outputDelegator.namespacedMetric(RUBY.getCurrentContext()), fakeOutClass.getMetricArgs());
assertEquals(outputDelegator.namespacedMetric(), fakeOutClass.getMetricArgs());
}
}
@ -185,12 +172,8 @@ public class OutputDelegatorTest extends RubyEnvTestCase {
outputDelegator.doClose(RUBY.getCurrentContext());
assertEquals(1, fakeOutClass.getCloseCallCount());
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), RUBY.newArray(0));
assertEquals(1, fakeOutClass.getMultiReceiveCallCount());
} catch (InterruptedException e) {
fail("multireceive error: " + e);
}
outputDelegator.multiReceive(RUBY.newArray(0));
assertEquals(1, fakeOutClass.getMultiReceiveCallCount());
}
}
@ -201,14 +184,13 @@ public class OutputDelegatorTest extends RubyEnvTestCase {
}
private OutputDelegatorExt constructOutputDelegator() {
OutputDelegatorExt outputDelegator = (OutputDelegatorExt)
new OutputDelegatorExt(RUBY, OUTPUT_DELEGATOR_CLASS).init(RUBY.getCurrentContext(), new IRubyObject[]{
fakeOutClass,
metric,
executionContext,
OutputStrategyExt.OutputStrategyRegistryExt.instance(RUBY.getCurrentContext(), null),
pluginArgs
});
OutputDelegatorExt outputDelegator = new OutputDelegatorExt(RUBY, RUBY_OUTPUT_DELEGATOR_CLASS).initialize(RUBY.getCurrentContext(), new IRubyObject[]{
fakeOutClass,
metric,
executionContext,
OutputStrategyExt.OutputStrategyRegistryExt.instance(RUBY.getCurrentContext(), null),
pluginArgs
});
return outputDelegator;
}