JAVAFICATION+PERFORMANCE: Port OutputDelegatorStrategies to Java

Fixes #9410
This commit is contained in:
Armin 2018-04-20 09:15:24 +02:00 committed by Armin Braun
parent cab415dd94
commit 155801520c
11 changed files with 402 additions and 171 deletions

View file

@ -1,33 +0,0 @@
# Remove this in Logstash 6.0
module LogStash module OutputDelegatorStrategies class Legacy
attr_reader :worker_count, :workers
def initialize(klass, metric, execution_context, plugin_args)
@worker_count = (plugin_args["workers"] || 1).to_i
@workers = @worker_count.times.map { klass.new(plugin_args) }
@workers.each do |w|
w.metric = metric
w.execution_context = execution_context
end
@worker_queue = SizedQueue.new(@worker_count)
@workers.each {|w| @worker_queue << w}
end
def register
@workers.each(&:register)
end
def multi_receive(events)
worker = @worker_queue.pop
worker.multi_receive(events)
ensure
@worker_queue << worker if worker
end
def do_close
# No mutex needed since this is only called when the pipeline is clear
@workers.each(&:do_close)
end
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:legacy, self)
end; end; end

View file

@ -1,22 +0,0 @@
module LogStash module OutputDelegatorStrategies class Shared
def initialize(klass, metric, execution_context, plugin_args)
@output = klass.new(plugin_args)
@output.metric = metric
@output.execution_context = execution_context
end
def register
@output.register
end
def multi_receive(events)
@output.multi_receive(events)
end
def do_close
@output.do_close
end
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:shared, self)
end; end; end

View file

@ -1,25 +0,0 @@
module LogStash module OutputDelegatorStrategies class Single
def initialize(klass, metric, execution_context, plugin_args)
@output = klass.new(plugin_args)
@output.metric = metric
@output.execution_context = execution_context
@mutex = Mutex.new
end
def register
@output.register
end
def multi_receive(events)
@mutex.synchronize do
@output.multi_receive(events)
end
end
def do_close
# No mutex needed since this is only called when the pipeline is clear
@output.do_close
end
::LogStash::OutputDelegatorStrategyRegistry.instance.register(:single, self)
end; end; end

View file

@ -1,36 +0,0 @@
module LogStash; class OutputDelegatorStrategyRegistry
class InvalidStrategyError < StandardError; end
# This is generally used as a singleton
# Except perhaps during testing
def self.instance
@instance ||= self.new
end
def initialize()
@map = {}
end
def classes
@map.values
end
def types
@map.keys
end
def class_for(type)
klass = @map[type]
if !klass
raise InvalidStrategyError, "Could not find output delegator strategy of type '#{type}'. Valid strategies: #{@strategy_registry.types}"
end
klass
end
def register(type, klass)
@map[type] = klass
end
end; end

View file

@ -1,10 +1,5 @@
# encoding: utf-8
require "logstash/output_delegator_strategy_registry"
require "logstash/output_delegator_strategies/shared"
require "logstash/output_delegator_strategies/single"
require "logstash/output_delegator_strategies/legacy"
module LogStash
module Plugins

View file

@ -2,10 +2,6 @@
require "logstash/execution_context"
require "spec_helper"
require "support/shared_contexts"
require "logstash/output_delegator_strategy_registry"
require "logstash/output_delegator_strategies/shared"
require "logstash/output_delegator_strategies/single"
require "logstash/output_delegator_strategies/legacy"
describe LogStash::OutputDelegator do
@ -34,24 +30,55 @@ describe LogStash::OutputDelegator do
include_context "execution_context"
class FakeOutClass
def self.set_out_strategy(out_strategy)
@@out_strategy = out_strategy
end
def self.set_out_inst(out_inst)
@@out_inst = out_inst
end
def self.name
"example"
end
def self.concurrency
@@out_strategy
end
def self.config_name
"dummy_plugin"
end
class << self
def new(args)
if args == {"id" => "foo", "arg1" => "val1"}
@@out_inst
else
raise "unexpected plugin arguments"
end
end
end
end
let(:out_klass) {FakeOutClass}
subject { described_class.new(out_klass, metric, execution_context, ::LogStash::OutputDelegatorStrategyRegistry.instance, plugin_args) }
context "with a plain output plugin" do
let(:out_klass) { double("output klass") }
let(:out_inst) { double("output instance") }
let(:concurrency) { :single }
before(:each) do
# use the same metric instance
allow(out_klass).to receive(:new).with(plugin_args).and_return(out_inst)
allow(out_klass).to receive(:name).and_return("example")
allow(out_klass).to receive(:concurrency).with(any_args).and_return concurrency
allow(out_klass).to receive(:config_name).and_return("dummy_plugin")
allow(out_inst).to receive(:register)
allow(out_inst).to receive(:multi_receive)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:execution_context=).with(execution_context)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
FakeOutClass.set_out_inst(out_inst)
FakeOutClass.set_out_strategy(:single)
end
it "should initialize cleanly" do
@ -120,6 +147,10 @@ describe LogStash::OutputDelegator do
context "with strategy #{strategy_concurrency}" do
let(:concurrency) { strategy_concurrency }
before(:each) do
FakeOutClass.set_out_strategy(strategy_concurrency)
end
it "should find the correct concurrency type for the output" do
expect(subject.concurrency).to eq(strategy_concurrency)
end
@ -135,15 +166,15 @@ describe LogStash::OutputDelegator do
[[:register], [:do_close], [:multi_receive, [[]] ] ].each do |method, args|
context "strategy objects" do
before do
allow(subject.strategy).to receive(method)
allow(out_inst).to receive(method)
end
it "should delegate #{method} to the strategy" do
subject.send(method, *args)
if args
expect(subject.strategy).to have_received(method).with(*args)
expect(out_inst).to have_received(method).with(*args)
else
expect(subject.strategy).to have_received(method).with(no_args)
expect(out_inst).to have_received(method).with(no_args)
end
end
end

View file

@ -12,6 +12,7 @@ import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.ext.JrubyAckedReadClientExt;
@ -32,11 +33,6 @@ public final class RubyUtil {
*/
public static final Ruby RUBY;
/**
* Logstash Ruby Module.
*/
public static final RubyModule LOGSTASH_MODULE;
public static final RubyClass RUBY_EVENT_CLASS;
public static final RubyClass RUBY_TIMESTAMP_CLASS;
@ -71,16 +67,77 @@ public final class RubyUtil {
public static final RubyClass FILTER_DELEGATOR_CLASS;
public static final RubyClass OUTPUT_STRATEGY_REGISTRY;
public static final RubyClass OUTPUT_STRATEGY_ABSTRACT;
public static final RubyClass OUTPUT_STRATEGY_SIMPLE_ABSTRACT;
public static final RubyClass OUTPUT_STRATEGY_LEGACY;
public static final RubyClass OUTPUT_STRATEGY_SINGLE;
public static final RubyClass OUTPUT_STRATEGY_SHARED;
/**
* Logstash Ruby Module.
*/
private static final RubyModule LOGSTASH_MODULE;
private static final RubyModule OUTPUT_DELEGATOR_STRATEGIES;
static {
RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
OUTPUT_STRATEGY_REGISTRY = setupLogstashClass(
OutputStrategyExt.OutputStrategyRegistryExt::new,
OutputStrategyExt.OutputStrategyRegistryExt.class
);
OUTPUT_DELEGATOR_STRATEGIES =
RUBY.defineModuleUnder("OutputDelegatorStrategies", LOGSTASH_MODULE);
OUTPUT_STRATEGY_ABSTRACT = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
"AbstractStrategy", RUBY.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
OUTPUT_STRATEGY_SIMPLE_ABSTRACT = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
"SimpleAbstractStrategy", OUTPUT_STRATEGY_ABSTRACT,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
OUTPUT_STRATEGY_LEGACY = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
"Legacy", OUTPUT_STRATEGY_ABSTRACT,
OutputStrategyExt.LegacyOutputStrategyExt::new
);
OUTPUT_STRATEGY_SINGLE = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
"Single", OUTPUT_STRATEGY_SIMPLE_ABSTRACT,
OutputStrategyExt.SingleOutputStrategyExt::new
);
OUTPUT_STRATEGY_SHARED = OUTPUT_DELEGATOR_STRATEGIES.defineClassUnder(
"Shared", OUTPUT_STRATEGY_SIMPLE_ABSTRACT,
OutputStrategyExt.SharedOutputStrategyExt::new
);
OUTPUT_STRATEGY_ABSTRACT.defineAnnotatedMethods(OutputStrategyExt.AbstractOutputStrategyExt.class);
OUTPUT_STRATEGY_ABSTRACT.defineAnnotatedMethods(OutputStrategyExt.SimpleAbstractOutputStrategyExt.class);
OUTPUT_STRATEGY_SHARED.defineAnnotatedMethods(OutputStrategyExt.SharedOutputStrategyExt.class);
OUTPUT_STRATEGY_SINGLE.defineAnnotatedMethods(OutputStrategyExt.SingleOutputStrategyExt.class);
OUTPUT_STRATEGY_LEGACY.defineAnnotatedMethods(OutputStrategyExt.LegacyOutputStrategyExt.class);
final OutputStrategyExt.OutputStrategyRegistryExt outputStrategyRegistry =
(OutputStrategyExt.OutputStrategyRegistryExt) OutputStrategyExt.OutputStrategyRegistryExt
.instance(RUBY.getCurrentContext(), OUTPUT_DELEGATOR_STRATEGIES);
outputStrategyRegistry.register(
RUBY.getCurrentContext(), RUBY.newSymbol("shared"), OUTPUT_STRATEGY_SHARED
);
outputStrategyRegistry.register(
RUBY.getCurrentContext(), RUBY.newSymbol("legacy"), OUTPUT_STRATEGY_LEGACY
);
outputStrategyRegistry.register(
RUBY.getCurrentContext(), RUBY.newSymbol("single"), OUTPUT_STRATEGY_SINGLE
);
RUBY_TIMESTAMP_CLASS = setupLogstashClass(
JrubyTimestampExtLibrary.RubyTimestamp::new, JrubyTimestampExtLibrary.RubyTimestamp.class
);
WRAPPED_WRITE_CLIENT_CLASS =
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
QUEUE_READ_CLIENT_BASE_CLASS =
setupLogstashClass(ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR, QueueReadClientBase.class);
setupLogstashClass(ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR, QueueReadClientBase.class);
MEMORY_READ_CLIENT_CLASS =
setupLogstashClass(QUEUE_READ_CLIENT_BASE_CLASS, JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class);
ACKED_READ_CLIENT_CLASS =
@ -91,9 +148,9 @@ public final class RubyUtil {
setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class);
WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
JrubyWrappedSynchronousQueueExt.class);
JrubyWrappedSynchronousQueueExt.class);
WRAPPED_ACKED_QUEUE_CLASS = setupLogstashClass(JRubyWrappedAckedQueueExt::new,
JRubyWrappedAckedQueueExt.class);
JRubyWrappedAckedQueueExt.class);
ACKED_QUEUE_CLASS = setupLogstashClass(JRubyAckedQueueExt::new, JRubyAckedQueueExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class

View file

@ -25,7 +25,7 @@ public final class OutputDelegatorExt extends RubyObject {
private IRubyObject outputClass;
private IRubyObject strategy;
private OutputStrategyExt.AbstractOutputStrategyExt strategy;
private IRubyObject metric;
@ -62,8 +62,9 @@ public final class OutputDelegatorExt extends RubyObject {
eventMetricTime = LongCounter.fromRubyBase(
metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY
);
strategy = ((RubyClass)
arguments[3].callMethod(context, "class_for", concurrency(context))
strategy = (OutputStrategyExt.AbstractOutputStrategyExt) ((RubyClass)
((OutputStrategyExt.OutputStrategyRegistryExt) arguments[3])
.classFor(context, concurrency(context))
).newInstance(
context,
new IRubyObject[]{outputClass, namespacedMetric, arguments[2], args},
@ -73,7 +74,9 @@ public final class OutputDelegatorExt extends RubyObject {
}
@VisibleForTesting
public OutputDelegatorExt initForTesting(final IRubyObject strategy) {
public OutputDelegatorExt initForTesting(
final OutputStrategyExt.AbstractOutputStrategyExt strategy
) {
eventMetricOut = LongCounter.DUMMY_COUNTER;
eventMetricIn = LongCounter.DUMMY_COUNTER;
eventMetricTime = LongCounter.DUMMY_COUNTER;
@ -87,12 +90,12 @@ public final class OutputDelegatorExt extends RubyObject {
@JRubyMethod
public IRubyObject register(final ThreadContext context) {
return strategy.callMethod(context, "register");
return strategy.register(context);
}
@JRubyMethod(name = "do_close")
public IRubyObject doClose(final ThreadContext context) {
return strategy.callMethod(context, "do_close");
return strategy.doClose(context);
}
@JRubyMethod(name = "reloadable?")
@ -136,16 +139,21 @@ public final class OutputDelegatorExt extends RubyObject {
}
public IRubyObject multiReceive(final RubyArray events) {
return multiReceive(WorkerLoop.THREAD_CONTEXT.get(), events);
try {
return multiReceive(WorkerLoop.THREAD_CONTEXT.get(), events);
} catch (final InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@JRubyMethod(name = "multi_receive")
public IRubyObject multiReceive(final ThreadContext context, final IRubyObject events) {
public IRubyObject multiReceive(final ThreadContext context, final IRubyObject events)
throws InterruptedException {
final RubyArray batch = (RubyArray) events;
final int count = batch.size();
eventMetricIn.increment((long) count);
final long start = System.nanoTime();
strategy.callMethod(context, "multi_receive", batch);
strategy.multiReceive(context, batch);
eventMetricTime.increment(
TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)
);

View file

@ -0,0 +1,250 @@
package org.logstash.config.ir.compiler;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
public final class OutputStrategyExt {
private OutputStrategyExt() {
// Just a holder for the nested classes
}
@JRubyClass(name = "OutputDelegatorStrategyRegistry")
public static final class OutputStrategyRegistryExt extends RubyObject {
private static OutputStrategyRegistryExt instance;
private RubyHash map;
public OutputStrategyRegistryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(meta = true)
public static synchronized IRubyObject instance(final ThreadContext context,
final IRubyObject recv) {
if (instance == null) {
instance = new OutputStrategyRegistryExt(
context.runtime, RubyUtil.OUTPUT_STRATEGY_REGISTRY
);
instance.init(context);
}
return instance;
}
@JRubyMethod(name = "initialize")
public IRubyObject init(final ThreadContext context) {
map = RubyHash.newHash(context.runtime);
return this;
}
@JRubyMethod
public IRubyObject classes(final ThreadContext context) {
return map.rb_values();
}
@JRubyMethod
public IRubyObject types(final ThreadContext context) {
return map.keys();
}
@JRubyMethod
public IRubyObject register(final ThreadContext context, final IRubyObject type,
final IRubyObject klass) {
return map.op_aset(context, type, klass);
}
@JRubyMethod(name = "class_for")
@SuppressWarnings("unchecked")
public IRubyObject classFor(final ThreadContext context, final IRubyObject type) {
final IRubyObject klass = map.op_aref(context, type);
if (!klass.isTrue()) {
throw new IllegalArgumentException(
String.format(
"Could not find output delegator strategy of type '%s'. Value strategies: %s",
type.asJavaString(),
map.rb_values().stream().map(v -> ((IRubyObject) v).asJavaString())
.collect(Collectors.joining(", "))
)
);
}
return klass;
}
}
@JRubyClass(name = "AbstractStrategy")
public abstract static class AbstractOutputStrategyExt extends RubyObject {
public AbstractOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
@SuppressWarnings("unchecked")
public final IRubyObject register(final ThreadContext context) {
return reg(context);
}
@JRubyMethod(name = "do_close")
@SuppressWarnings("unchecked")
public final IRubyObject doClose(final ThreadContext context) {
return close(context);
}
@JRubyMethod(name = "multi_receive")
public final IRubyObject multiReceive(final ThreadContext context, final IRubyObject events)
throws InterruptedException {
return output(context, events);
}
protected abstract IRubyObject output(ThreadContext context, IRubyObject events)
throws InterruptedException;
protected abstract IRubyObject close(ThreadContext context);
protected abstract IRubyObject reg(ThreadContext context);
}
@JRubyClass(name = "Legacy", parent = "AbstractStrategy")
public static final class LegacyOutputStrategyExt extends OutputStrategyExt.AbstractOutputStrategyExt {
private BlockingQueue<IRubyObject> workerQueue;
private IRubyObject workerCount;
private RubyArray workers;
public LegacyOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize", optional = 4)
public IRubyObject init(final ThreadContext context, final IRubyObject[] args) {
final RubyHash pluginArgs = (RubyHash) args[3];
workerCount = pluginArgs.op_aref(context, context.runtime.newString("workers"));
if (workerCount.isNil()) {
workerCount = RubyFixnum.one(context.runtime);
}
final int count = workerCount.convertToInteger().getIntValue();
workerQueue = new ArrayBlockingQueue<>(count);
workers = context.runtime.newArray(count);
for (int i = 0; i < count; ++i) {
// Calling "new" here manually to allow mocking the ctor in RSpec Tests
final IRubyObject output = args[0].callMethod(context, "new", pluginArgs);
output.callMethod(context, "metric=", args[1]);
output.callMethod(context, "execution_context=", args[2]);
workers.append(output);
workerQueue.add(output);
}
return this;
}
@JRubyMethod(name = "worker_count")
public IRubyObject workerCount(final ThreadContext context) {
return workerCount;
}
@JRubyMethod
public IRubyObject workers(final ThreadContext context) {
return workers;
}
@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) throws InterruptedException {
final IRubyObject worker = workerQueue.take();
try {
return worker.callMethod(context, "multi_receive", events);
} finally {
workerQueue.put(worker);
}
}
@Override
@SuppressWarnings("unchecked")
protected IRubyObject close(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "do_close"));
return this;
}
@Override
@SuppressWarnings("unchecked")
protected IRubyObject reg(final ThreadContext context) {
workers.forEach(worker -> ((IRubyObject) worker).callMethod(context, "register"));
return this;
}
}
@JRubyClass(name = "SimpleAbstractStrategy", parent = "AbstractStrategy")
public abstract static class SimpleAbstractOutputStrategyExt
extends OutputStrategyExt.AbstractOutputStrategyExt {
private IRubyObject output;
public SimpleAbstractOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize", optional = 4)
public IRubyObject init(final ThreadContext context, final IRubyObject[] args) {
// Calling "new" here manually to allow mocking the ctor in RSpec Tests
output = args[0].callMethod(context, "new", args[3]);
output.callMethod(context, "metric=", args[1]);
output.callMethod(context, "execution_context=", args[2]);
return this;
}
@Override
protected final IRubyObject close(final ThreadContext context) {
return output.callMethod(context, "do_close");
}
@Override
protected final IRubyObject reg(final ThreadContext context) {
return output.callMethod(context, "register");
}
protected final IRubyObject doOutput(final ThreadContext context, final IRubyObject events) {
return output.callMethod(context, "multi_receive", events);
}
}
@JRubyClass(name = "Single", parent = "SimpleAbstractStrategy")
public static final class SingleOutputStrategyExt extends SimpleAbstractOutputStrategyExt {
public SingleOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
synchronized (this) {
return doOutput(context, events);
}
}
}
@JRubyClass(name = "Shared", parent = "SimpleAbstractStrategy")
public static final class SharedOutputStrategyExt extends SimpleAbstractOutputStrategyExt {
public SharedOutputStrategyExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
return doOutput(context, events);
}
}
}

View file

@ -10,8 +10,10 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.jruby.RubyArray;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.After;
import org.junit.Before;
@ -20,6 +22,7 @@ import org.logstash.Event;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.ext.JrubyEventExtLibrary;
@ -32,7 +35,7 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
* Globally accessible map of test run id to a queue of {@link JrubyEventExtLibrary.RubyEvent}
* that can be used by Ruby outputs.
*/
public static final Map<Long, Collection<JrubyEventExtLibrary.RubyEvent>> EVENT_SINKS =
private static final Map<Long, Collection<JrubyEventExtLibrary.RubyEvent>> EVENT_SINKS =
new ConcurrentHashMap<>();
/**
@ -178,20 +181,15 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
MatcherAssert.assertThat(testEvent.getEvent().getField("foo"), CoreMatchers.nullValue());
}
private Supplier<IRubyObject> mockOutputSupplier() {
return () -> RubyUtil.RUBY.evalScriptlet(
String.join(
"\n",
"output = Object.new",
"output.define_singleton_method(:multi_receive) do |batch|",
String.format(
"batch.to_a.each {|e| org.logstash.config.ir.CompiledPipelineTest::EVENT_SINKS.get(%d).put(e)}",
runId
),
"end",
"output"
)
);
private Supplier<OutputStrategyExt.AbstractOutputStrategyExt> mockOutputSupplier() {
return () -> new OutputStrategyExt.SimpleAbstractOutputStrategyExt(RubyUtil.RUBY, RubyUtil.RUBY.getObject()) {
@Override
@SuppressWarnings("unchecked")
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
((RubyArray) events).forEach(event -> EVENT_SINKS.get(runId).add((JrubyEventExtLibrary.RubyEvent) event));
return this;
}
};
}
/**
@ -203,11 +201,11 @@ public final class CompiledPipelineTest extends RubyEnvTestCase {
private final Map<String, Supplier<IRubyObject>> filters;
private final Map<String, Supplier<IRubyObject>> outputs;
private final Map<String, Supplier<OutputStrategyExt.AbstractOutputStrategyExt>> outputs;
MockPluginFactory(final Map<String, Supplier<IRubyObject>> inputs,
final Map<String, Supplier<IRubyObject>> filters,
final Map<String, Supplier<IRubyObject>> outputs) {
final Map<String, Supplier<OutputStrategyExt.AbstractOutputStrategyExt>> outputs) {
this.inputs = inputs;
this.filters = filters;
this.outputs = outputs;

View file

@ -2,6 +2,8 @@ package org.logstash.config.ir.compiler;
import java.util.Collections;
import org.jruby.RubyArray;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.FieldReference;
@ -23,9 +25,15 @@ public final class DatasetCompilerTest {
DatasetCompiler.outputDataset(
Collections.emptyList(),
new OutputDelegatorExt(RubyUtil.RUBY, RubyUtil.OUTPUT_DELEGATOR_CLASS)
.initForTesting(RubyUtil.RUBY.evalScriptlet(
"output = Object.new\noutput.define_singleton_method(:multi_receive) do |batch|\nend\noutput"
)),
.initForTesting(
new OutputStrategyExt.SimpleAbstractOutputStrategyExt(
RubyUtil.RUBY, RubyUtil.RUBY.getObject()
) {
@Override
protected IRubyObject output(final ThreadContext context, final IRubyObject events) {
return this;
}
}),
true
).instantiate().compute(RubyUtil.RUBY.newArray(), false, false),
nullValue()