port output_delegator_spec to Java

Fixes #9717
This commit is contained in:
Dan Hermann 2018-06-07 20:21:54 -05:00
parent d538b524f8
commit cc6ee1aeeb
4 changed files with 383 additions and 200 deletions

View file

@ -65,6 +65,7 @@ task javaTests(type: Test) {
exclude '/org/logstash/RSpecTests.class'
exclude '/org/logstash/config/ir/ConfigCompilerTest.class'
exclude '/org/logstash/config/ir/CompiledPipelineTest.class'
exclude '/org/logstash/config/ir/OutputDelegatorTest.class'
}
task rubyTests(type: Test) {
@ -74,6 +75,7 @@ task rubyTests(type: Test) {
include '/org/logstash/RSpecTests.class'
include '/org/logstash/config/ir/ConfigCompilerTest.class'
include '/org/logstash/config/ir/CompiledPipelineTest.class'
include '/org/logstash/config/ir/OutputDelegatorTest.class'
}
test {

View file

@ -1,200 +0,0 @@
# encoding: utf-8
require "spec_helper"
require "support/shared_contexts"
describe LogStash::OutputDelegator do
let(:events) { 7.times.map { LogStash::Event.new }}
let(:plugin_args) { {"id" => "foo", "arg1" => "val1"} }
let(:metric) {
LogStash::Instrument::NamespacedMetric.new(
LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:output]
)
}
let(:counter_in) {
counter = metric.counter(:in)
counter.increment(0)
counter
}
let(:counter_out) {
counter = metric.counter(:out)
counter.increment(0)
counter
}
let(:counter_time) {
counter = metric.counter(:duration_in_millis)
counter.increment(0)
counter
}
include_context "execution_context"
class FakeOutClass
def self.set_out_strategy(out_strategy)
@@out_strategy = out_strategy
end
def self.set_out_inst(out_inst)
@@out_inst = out_inst
end
def self.name
"example"
end
def self.concurrency
@@out_strategy
end
def self.config_name
"dummy_plugin"
end
class << self
def new(args)
if args == {"id" => "foo", "arg1" => "val1"}
@@out_inst
else
raise "unexpected plugin arguments"
end
end
end
end
let(:out_klass) {FakeOutClass}
subject { described_class.new(out_klass, metric, execution_context, ::LogStash::OutputDelegatorStrategyRegistry.instance, plugin_args) }
context "with a plain output plugin" do
let(:out_inst) { double("output instance") }
before(:each) do
# use the same metric instance
allow(out_inst).to receive(:register)
allow(out_inst).to receive(:multi_receive)
allow(out_inst).to receive(:metric=).with(any_args)
allow(out_inst).to receive(:execution_context=).with(execution_context)
allow(out_inst).to receive(:id).and_return("a-simple-plugin")
FakeOutClass.set_out_inst(out_inst)
FakeOutClass.set_out_strategy(:single)
end
it "should initialize cleanly" do
expect { subject }.not_to raise_error
end
it "should push the name of the plugin to the metric" do
described_class.new(out_klass, metric, execution_context, ::LogStash::OutputDelegatorStrategyRegistry.instance, plugin_args)
expect(metric.collector.snapshot_metric.metric_store.get_with_path("output/foo")[:output][:foo][:name].value).to eq(out_klass.config_name)
end
context "after having received a batch of events" do
before do
subject.register
end
it "should pass the events through" do
expect(out_inst).to receive(:multi_receive).with(events)
subject.multi_receive(events)
end
it "should increment the number of events received" do
subject.multi_receive(events)
store = metric.collector.snapshot_metric.metric_store.get_with_path("output/foo")[:output][:foo][:events]
number_of_events = events.length
expect(store[:in].value).to eq(number_of_events)
expect(store[:out].value).to eq(number_of_events)
end
it "should record the `duration_in_millis`" do
value = 0
while value == 0
subject.multi_receive(events)
store = metric.collector.snapshot_metric.metric_store.get_with_path("output/foo")[:output][:foo][:events]
value = store[:duration_in_millis].value
end
expect(value).to be > 0
end
end
describe "closing" do
before do
subject.register
end
it "should register the output plugin instance on register" do
expect(out_inst).to have_received(:register)
end
it "should close the output plugin instance when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
end
end
describe "concurrency strategies" do
it "should have :single as the default" do
expect(subject.concurrency).to eq :single
end
[
[:shared, ::LogStash::OutputDelegatorStrategies::Shared],
[:single, ::LogStash::OutputDelegatorStrategies::Single],
[:legacy, ::LogStash::OutputDelegatorStrategies::Legacy],
].each do |strategy_concurrency,klass|
context "with strategy #{strategy_concurrency}" do
let(:concurrency) { strategy_concurrency }
before(:each) do
FakeOutClass.set_out_strategy(strategy_concurrency)
end
it "should find the correct concurrency type for the output" do
expect(subject.concurrency).to eq(strategy_concurrency)
end
it "should find the correct Strategy class for the worker" do
expect(subject.strategy).to be_a(klass)
end
it "should set the metric on the instance" do
expect(out_inst).to have_received(:metric=).with(subject.namespaced_metric)
end
[[:register], [:do_close], [:multi_receive, [[]] ] ].each do |method, args|
context "strategy objects" do
before do
allow(out_inst).to receive(method)
end
it "should delegate #{method} to the strategy" do
subject.send(method, *args)
if args
expect(out_inst).to have_received(method).with(*args)
else
expect(out_inst).to have_received(method).with(no_args)
end
end
end
context "strategy output instances" do
before do
allow(out_inst).to receive(method)
end
it "should delegate #{method} to the strategy" do
subject.send(method, *args)
if args
expect(out_inst).to have_received(method).with(*args)
else
expect(out_inst).to have_received(method).with(no_args)
end
end
end
end
end
end
end
end
end

View file

@ -0,0 +1,124 @@
package org.logstash.config.ir.compiler;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import static org.logstash.RubyUtil.RUBY;
@JRubyClass(name = "FakeOutClass")
public class FakeOutClass extends RubyObject {
private int multiReceiveDelay = 0;
private int multiReceiveCallCount = 0;
private int registerCallCount = 0;
private int closeCallCount = 0;
private IRubyObject multiReceiveArgs;
private IRubyObject metricArgs;
private IRubyObject outStrategy;
FakeOutClass(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
outStrategy = RUBY.newSymbol("single");
}
static FakeOutClass create() {
return new FakeOutClass(RUBY, OutputDelegatorTest.FAKE_OUT_CLASS);
}
@JRubyMethod
public IRubyObject name(final ThreadContext context) {
return RUBY.newString("example");
}
@JRubyMethod(name = "config_name")
public IRubyObject configName(final ThreadContext context) {
return RUBY.newString("dummy_plugin");
}
@JRubyMethod
public IRubyObject initialize(final ThreadContext context) {
return this;
}
@JRubyMethod
public IRubyObject concurrency(final ThreadContext context) {
return outStrategy;
}
@JRubyMethod
public IRubyObject register(final ThreadContext context) {
registerCallCount++;
return this;
}
@JRubyMethod(name = "new")
public IRubyObject newMethod(final ThreadContext context, IRubyObject args) {
return this;
}
@JRubyMethod(name = "metric=")
public IRubyObject metric(final ThreadContext context, IRubyObject args) {
this.metricArgs = args;
return this;
}
@JRubyMethod(name = "execution_context=")
public IRubyObject executionContext(final ThreadContext context, IRubyObject args) {
return this;
}
@JRubyMethod(name = "multi_receive")
public IRubyObject multiReceive(final ThreadContext context, IRubyObject args) {
multiReceiveCallCount++;
multiReceiveArgs = args;
if (multiReceiveDelay > 0) {
try {
Thread.sleep(multiReceiveDelay);
} catch (InterruptedException e) {
// do nothing
}
}
return this;
}
@JRubyMethod(name = "do_close")
public IRubyObject close(final ThreadContext context) {
closeCallCount++;
return this;
}
@JRubyMethod(name = "set_out_strategy")
public IRubyObject setOutStrategy(final ThreadContext context, IRubyObject outStrategy) {
this.outStrategy = outStrategy;
return this;
}
public IRubyObject getMetricArgs() {
return this.metricArgs;
}
public IRubyObject getMultiReceiveArgs() {
return multiReceiveArgs;
}
public int getMultiReceiveCallCount() {
return this.multiReceiveCallCount;
}
public void setMultiReceiveDelay(int delay) {
this.multiReceiveDelay = delay;
}
public int getRegisterCallCount() {
return registerCallCount;
}
public int getCloseCallCount() {
return closeCallCount;
}
}

View file

@ -0,0 +1,257 @@
package org.logstash.config.ir.compiler;
import org.assertj.core.data.Percentage;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyHash;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.java.proxies.ConcreteJavaProxy;
import org.jruby.runtime.builtin.IRubyObject;
import org.junit.Before;
import org.junit.Test;
import org.logstash.Event;
import org.logstash.config.ir.RubyEnvTestCase;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.NamespacedMetricExt;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.logstash.RubyUtil.EXECUTION_CONTEXT_CLASS;
import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS;
import static org.logstash.RubyUtil.OUTPUT_DELEGATOR_CLASS;
import static org.logstash.RubyUtil.RUBY;
public class OutputDelegatorTest extends RubyEnvTestCase {
private FakeOutClass fakeOutClass;
private NamespacedMetricExt metric;
private ExecutionContextExt executionContext;
private RubyHash pluginArgs;
private RubyArray events;
private static final int EVENT_COUNT = 7;
static RubyClass FAKE_OUT_CLASS;
static {
FAKE_OUT_CLASS = RUBY.defineClass("FakeOutClass", RUBY.getObject(), FakeOutClass::new);
FAKE_OUT_CLASS.defineAnnotatedMethods(FakeOutClass.class);
}
@Before
public void setup() {
events = RUBY.newArray(EVENT_COUNT);
for (int k = 0; k < EVENT_COUNT; k++) {
events.add(k, new Event());
}
fakeOutClass = FakeOutClass.create();
RubyArray namespaces = RubyArray.newArray(RUBY, 1);
namespaces.add(0, RubySymbol.newSymbol(RUBY, "output"));
IRubyObject metricWithCollector =
runRubyScript("require \"logstash/instrument/collector\"\n" +
"metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)");
metric = (NamespacedMetricExt) new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS)
.initialize(RUBY.getCurrentContext(), metricWithCollector, namespaces);
executionContext = new ExecutionContextExt(RUBY, EXECUTION_CONTEXT_CLASS);
pluginArgs = RubyHash.newHash(RUBY);
pluginArgs.put("id", "foo");
pluginArgs.put("arg1", "val1");
}
@Test
public void plainOutputPluginInitializesCleanly() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
}
@Test
public void plainOutputPluginPushesPluginNameToMetric() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
RubyHash metricStore = getMetricStore(new String[]{"output", "foo"});
String pluginName = getMetricStringValue(metricStore, "name");
assertEquals(fakeOutClass.configName(RUBY.getCurrentContext()).asJavaString(), pluginName);
}
@Test
public void multiReceivePassesBatch() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
assertEquals(events, fakeOutClass.getMultiReceiveArgs());
assertEquals(EVENT_COUNT, ((RubyArray) fakeOutClass.getMultiReceiveArgs()).size());
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
}
@Test
public void multiReceiveIncrementsEventCount() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
assertEquals(EVENT_COUNT, getMetricLongValue("in"));
assertEquals(EVENT_COUNT, getMetricLongValue("out"));
}
@Test
public void multiReceiveRecordsDurationInMillis() {
int delay = 100;
long millis = 0;
try {
fakeOutClass.setMultiReceiveDelay(delay);
OutputDelegatorExt outputDelegator = constructOutputDelegator();
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), events);
} catch (InterruptedException e) {
fail("Multireceive error: " + e);
}
millis = getMetricLongValue("duration_in_millis");
} finally {
fakeOutClass.setMultiReceiveDelay(0);
}
assertThat(millis).isCloseTo((long)delay, Percentage.withPercentage(10));
}
@Test
public void registersOutputPlugin() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
outputDelegator.register(RUBY.getCurrentContext());
assertEquals(1, fakeOutClass.getRegisterCallCount());
}
@Test
public void closesOutputPlugin() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
outputDelegator.doClose(RUBY.getCurrentContext());
assertEquals(1, fakeOutClass.getCloseCallCount());
}
@Test
public void singleConcurrencyStrategyIsDefault() {
OutputDelegatorExt outputDelegator = constructOutputDelegator();
IRubyObject concurrency = outputDelegator.concurrency(RUBY.getCurrentContext());
assertEquals(RUBY.newSymbol("single"), concurrency);
}
@Test
public void outputStrategyTests() {
StrategyPair[] outputStrategies = new StrategyPair[]{
new StrategyPair("shared", OutputStrategyExt.SharedOutputStrategyExt.class),
new StrategyPair("single", OutputStrategyExt.SingleOutputStrategyExt.class),
new StrategyPair("legacy", OutputStrategyExt.LegacyOutputStrategyExt.class)
};
for (StrategyPair pair : outputStrategies) {
fakeOutClass.setOutStrategy(RUBY.getCurrentContext(), pair.symbol);
OutputDelegatorExt outputDelegator = constructOutputDelegator();
// test that output strategies are properly set
IRubyObject outStrategy = outputDelegator.concurrency(RUBY.getCurrentContext());
assertEquals(pair.symbol, outStrategy);
// test that strategy classes are correctly instantiated
IRubyObject strategyClass = outputDelegator.strategy(RUBY.getCurrentContext());
assertThat(strategyClass).isInstanceOf(pair.klazz);
// test that metrics are properly set on the instance
assertEquals(outputDelegator.namespacedMetric(RUBY.getCurrentContext()), fakeOutClass.getMetricArgs());
}
}
@Test
public void outputStrategyMethodDelegationTests() {
RubySymbol[] outputStrategies = new RubySymbol[]{
RUBY.newSymbol("shared"),
RUBY.newSymbol("single"),
RUBY.newSymbol("legacy")
};
for (RubySymbol symbol : outputStrategies) {
fakeOutClass = FakeOutClass.create();
fakeOutClass.setOutStrategy(RUBY.getCurrentContext(), symbol);
OutputDelegatorExt outputDelegator = constructOutputDelegator();
outputDelegator.register(RUBY.getCurrentContext());
assertEquals(1, fakeOutClass.getRegisterCallCount());
outputDelegator.doClose(RUBY.getCurrentContext());
assertEquals(1, fakeOutClass.getCloseCallCount());
try {
outputDelegator.multiReceive(RUBY.getCurrentContext(), RUBY.newArray(0));
assertEquals(1, fakeOutClass.getMultiReceiveCallCount());
} catch (InterruptedException e) {
fail("multireceive error: " + e);
}
}
}
private static IRubyObject runRubyScript(String script) {
IRubyObject m = RUBY.evalScriptlet(script);
return m;
}
private OutputDelegatorExt constructOutputDelegator() {
OutputDelegatorExt outputDelegator = (OutputDelegatorExt)
new OutputDelegatorExt(RUBY, OUTPUT_DELEGATOR_CLASS).init(RUBY.getCurrentContext(), new IRubyObject[]{
fakeOutClass,
metric,
executionContext,
OutputStrategyExt.OutputStrategyRegistryExt.instance(RUBY.getCurrentContext(), null),
pluginArgs
});
return outputDelegator;
}
private RubyHash getMetricStore() {
return getMetricStore(new String[]{"output", "foo", "events"});
}
private RubyHash getMetricStore(String[] path) {
RubyHash metricStore = (RubyHash) metric.collector(RUBY.getCurrentContext())
.callMethod(RUBY.getCurrentContext(), "snapshot_metric")
.callMethod(RUBY.getCurrentContext(), "metric_store")
.callMethod(RUBY.getCurrentContext(), "get_with_path", new IRubyObject[]{RUBY.newString("output/foo")});
RubyHash rh = metricStore;
for (String p : path) {
rh = (RubyHash) rh.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(p));
}
return rh;
}
private String getMetricStringValue(RubyHash metricStore, String symbolName) {
ConcreteJavaProxy counter = (ConcreteJavaProxy) metricStore.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(symbolName));
RubyString value = (RubyString) counter.callMethod("value");
return value.asJavaString();
}
private long getMetricLongValue(String symbolName) {
return getMetricLongValue(getMetricStore(), symbolName);
}
private long getMetricLongValue(RubyHash metricStore, String symbolName) {
ConcreteJavaProxy counter = (ConcreteJavaProxy) metricStore.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(symbolName));
RubyFixnum count = (RubyFixnum) counter.callMethod("value");
return count.getLongValue();
}
private static class StrategyPair {
RubySymbol symbol;
Class klazz;
StrategyPair(String symbolName, Class c) {
this.symbol = RUBY.newSymbol(symbolName);
this.klazz = c;
}
}
}