JAVAFICATION: Move PluginFactory to Java

This reverts commit 219ebae452.

Fixes #9620
This commit is contained in:
Armin 2018-05-20 16:51:40 +02:00 committed by Armin Braun
parent e786ca1126
commit 5e08c4a271
10 changed files with 293 additions and 85 deletions

View file

@ -1,5 +1,4 @@
# encoding: utf-8
require "logstash/plugins/registry"
require "logstash/util/password"
require "logstash/util/safe_uri"
require "logstash/version"

View file

@ -13,7 +13,6 @@ require "logstash/pipeline_reporter"
require "logstash/instrument/collector"
require "logstash/filter_delegator"
require "logstash/queue_factory"
require "logstash/plugins/plugin_factory"
require "logstash/compiler"
require "securerandom"

View file

@ -133,6 +133,7 @@ class LogStash::Plugin
# Should I remove this now and make sure the pipeline invoke the Registry or I should wait for 6.0
# Its not really part of the public api but its used by the tests a lot to mock the plugins.
def self.lookup(type, name)
require "logstash/plugins/registry"
LogStash::PLUGIN_REGISTRY.lookup_pipeline_plugin(type, name)
end
end # class LogStash::Plugin

View file

@ -1,80 +0,0 @@
# encoding: utf-8
module LogStash
module Plugins
class PluginFactory
include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory
def self.filter_delegator(wrapper_class, filter_class, args, filter_metrics, execution_context)
filter_instance = filter_class.new(args)
id = args["id"]
filter_instance.metric = filter_metrics.namespace(id.to_sym)
filter_instance.execution_context = execution_context
wrapper_class.new(filter_instance, id)
end
def initialize(lir, metric_factory, exec_factory, filter_class)
@lir = lir
@plugins_by_id = {}
@metric_factory = metric_factory
@exec_factory = exec_factory
@filter_class = filter_class
end
def buildOutput(name, line, column, *args)
plugin("output", name, line, column, *args)
end
def buildFilter(name, line, column, *args)
plugin("filter", name, line, column, *args)
end
def buildInput(name, line, column, *args)
plugin("input", name, line, column, *args)
end
def buildCodec(name, *args)
plugin("codec", name, 0, 0, *args)
end
def plugin(plugin_type, name, line, column, *args)
# Collapse the array of arguments into a single merged hash
args = args.reduce({}, &:merge)
if plugin_type == "codec"
id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here
else
# Pull the ID from LIR to keep IDs consistent between the two representations
id = @lir.graph.vertices.filter do |v|
v.source_with_metadata &&
v.source_with_metadata.line == line &&
v.source_with_metadata.column == column
end.findFirst.get.id
end
args["id"] = id # some code pulls the id out of the args
raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" unless id
raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id]
@plugins_by_id[id] = true
# Scope plugins of type 'input' to 'inputs'
type_scoped_metric = @metric_factory.create(plugin_type)
klass = Plugin.lookup(plugin_type, name)
execution_context = @exec_factory.create(id, klass.config_name)
if plugin_type == "output"
OutputDelegator.new(klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
elsif plugin_type == "filter"
self.class.filter_delegator(@filter_class, klass, args, type_scoped_metric, execution_context)
else # input or codec plugin
plugin_instance = klass.new(args)
scoped_metric = type_scoped_metric.namespace(id.to_sym)
scoped_metric.gauge(:name, plugin_instance.config_name)
plugin_instance.metric = scoped_metric
plugin_instance.execution_context = execution_context
plugin_instance
end
end
end
end
end

View file

@ -3,6 +3,9 @@ require "rubygems/package"
require "logstash/plugin"
require "logstash/plugins/hooks_registry"
require "logstash/modules/scaffold"
require "logstash/codecs/base"
require "logstash/filters/base"
require "logstash/outputs/base"
module LogStash module Plugins
class Registry

View file

@ -1,4 +1,5 @@
# encoding: utf-8
require "forwardable"
require "logstash/util/password"
module LogStash module Util class ModulesSettingArray

View file

@ -138,6 +138,8 @@ public final class RubyUtil {
public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;
public static final RubyClass PLUGIN_FACTORY_CLASS;
public static final RubyClass LOGGER;
public static final RubyModule LOGGABLE_MODULE;
@ -146,6 +148,8 @@ public final class RubyUtil {
public static final RubyModule UTIL_MODULE;
public static final RubyClass CONFIGURATION_ERROR_CLASS;
/**
* Logstash Ruby Module.
*/
@ -350,7 +354,7 @@ public final class RubyUtil {
LOGSTASH_MODULE.defineClassUnder(
"EnvironmentError", stdErr, JRubyLogstashErrorsExt.LogstashEnvironmentError::new
);
LOGSTASH_MODULE.defineClassUnder(
CONFIGURATION_ERROR_CLASS = LOGSTASH_MODULE.defineClassUnder(
"ConfigurationError", stdErr, JRubyLogstashErrorsExt.ConfigurationError::new
);
LOGSTASH_MODULE.defineClassUnder(
@ -403,6 +407,10 @@ public final class RubyUtil {
RUBY_EVENT_CLASS.setConstant("VERSION_ONE", RUBY.newString(Event.VERSION_ONE));
RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class);
RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class);
PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new
);
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class);
RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true"));
RubyJavaIntegration.setupRubyJavaIntegration(RUBY);
}

View file

@ -1,16 +1,31 @@
package org.logstash.plugins;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyInteger;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
@ -18,6 +33,205 @@ import org.logstash.instrument.metrics.NullMetricExt;
public final class PluginFactoryExt {
@JRubyClass(name = "PluginFactory")
public static final class Plugins extends RubyBasicObject
implements RubyIntegration.PluginFactory {
private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");
private static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name");
private final Collection<String> pluginsById = new HashSet<>();
private PipelineIR lir;
private PluginFactoryExt.ExecutionContext executionContext;
private PluginFactoryExt.Metrics metrics;
private RubyClass filterClass;
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
public static IRubyObject filterDelegator(final ThreadContext context,
final IRubyObject recv, final IRubyObject[] args) {
final RubyHash arguments = (RubyHash) args[2];
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
filterInstance.callMethod(
context, "metric=",
args[3].callMethod(context, "namespace", id.intern19())
);
filterInstance.callMethod(context, "execution_context=", args[4]);
return args[0].callMethod(context, "new", new IRubyObject[]{filterInstance, id});
}
public Plugins(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(required = 4)
public Plugins initialize(final ThreadContext context, final IRubyObject[] args) {
lir = (PipelineIR) args[0].toJava(PipelineIR.class);
metrics = (PluginFactoryExt.Metrics) args[1];
executionContext = (PluginFactoryExt.ExecutionContext) args[2];
filterClass = (RubyClass) args[3];
return this;
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildInput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}
@JRubyMethod(required = 4)
public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) {
return buildInput(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
args[3]
);
}
@SuppressWarnings("unchecked")
@Override
public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return (OutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}
@JRubyMethod(required = 4)
public OutputDelegatorExt buildOutput(final ThreadContext context,
final IRubyObject[] args) {
return buildOutput(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3]
);
}
@SuppressWarnings("unchecked")
@Override
public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line,
final RubyInteger column, final IRubyObject args) {
return (FilterDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER,
name.asJavaString(), line.getIntValue(), column.getIntValue(),
(Map<String, IRubyObject>) args
);
}
@JRubyMethod(required = 4)
public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) {
return buildFilter(
(RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(),
args[3]
);
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, final IRubyObject args) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
name.asJavaString(), 0, 0, (Map<String, IRubyObject>) args
);
}
@JRubyMethod(required = 4)
public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) {
return buildCodec((RubyString) args[0], args[1]);
}
@SuppressWarnings("unchecked")
@JRubyMethod(required = 4, optional = 1)
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
return plugin(
context,
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
args[1].asJavaString(),
args[2].convertToInteger().getIntValue(),
args[3].convertToInteger().getIntValue(),
args.length > 4 ? (Map<String, IRubyObject>) args[4] : new HashMap<>()
);
}
private IRubyObject plugin(final ThreadContext context,
final PluginLookup.PluginType type, final String name, final int line, final int column,
final Map<String, IRubyObject> args) {
final String id;
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
} else {
id = lir.getGraph().vertices().filter(
v -> v.getSourceWithMetadata() != null
&& v.getSourceWithMetadata().getLine() == line
&& v.getSourceWithMetadata().getColumn() == column
).findFirst().map(Vertex::getId).orElse(null);
}
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
)
);
}
if (pluginsById.contains(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
pluginsById.add(id);
final AbstractNamespacedMetricExt typeScopedMetric =
metrics.create(context, type.rubyLabel());
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
final Map<String, Object> newArgs = new HashMap<>(args);
newArgs.put("id", id);
final RubyClass klass = (RubyClass) pluginClass.klass();
final ExecutionContextExt executionCntx = executionContext.create(
context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name")
);
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
rubyArgs.putAll(newArgs);
if (type == PluginLookup.PluginType.OUTPUT) {
return new OutputDelegatorExt(context.runtime, RubyUtil.OUTPUT_DELEGATOR_CLASS).init(
context,
new IRubyObject[]{
klass, typeScopedMetric, executionCntx,
OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null),
rubyArgs
}
);
} else if (type == PluginLookup.PluginType.FILTER) {
return filterDelegator(
context, null,
new IRubyObject[]{
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx
}
);
} else {
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
scopedMetric.gauge(context, NAME_KEY, pluginInstance.callMethod(context, "config_name"));
pluginInstance.callMethod(context, "metric=", scopedMetric);
pluginInstance.callMethod(context, "execution_context=", executionCntx);
return pluginInstance;
}
} else {
return context.nil;
}
}
}
@JRubyClass(name = "ExecutionContextFactory")
public static final class ExecutionContext extends RubyBasicObject {

View file

@ -0,0 +1,64 @@
package org.logstash.plugins;
import org.jruby.RubyString;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
/**
* Java Implementation of the plugin that is implemented by wrapping the Ruby
* {@code LogStash::Plugin} class for the Ruby plugin lookup.
*/
public final class PluginLookup {
private static final IRubyObject RUBY_REGISTRY = RubyUtil.RUBY.executeScript(
"require 'logstash/plugins/registry'\nrequire 'logstash/plugin'\nLogStash::Plugin",
""
);
private PluginLookup() {
// Utility Class
}
public static PluginLookup.PluginClass lookup(final PluginLookup.PluginType type,
final String name) {
return new PluginLookup.PluginClass() {
@Override
public PluginLookup.PluginLanguage language() {
return PluginLookup.PluginLanguage.RUBY;
}
@Override
public Object klass() {
return RUBY_REGISTRY.callMethod(
RubyUtil.RUBY.getCurrentContext(), "lookup",
new IRubyObject[]{type.rubyLabel(), RubyUtil.RUBY.newString(name)}
);
}
};
}
public interface PluginClass {
PluginLookup.PluginLanguage language();
Object klass();
}
public enum PluginLanguage {
JAVA, RUBY
}
public enum PluginType {
INPUT("input"), FILTER("filter"), OUTPUT("output"), CODEC("codec");
private final RubyString label;
PluginType(final String label) {
this.label = RubyUtil.RUBY.newString(label);
}
RubyString rubyLabel() {
return label;
}
}
}

View file

@ -29,8 +29,7 @@ public abstract class RubyEnvTestCase {
.resolve("jruby").resolve("2.3.0").toFile().getAbsolutePath();
environment.put("GEM_HOME", gems);
environment.put("GEM_PATH", gems);
loader.addPaths(root.resolve("lib").toFile().getAbsolutePath()
);
loader.addPaths(root.resolve("lib").toFile().getAbsolutePath());
}
}
}