diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 8ef076c4a..c3eea14a8 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -1,5 +1,4 @@ # encoding: utf-8 -require "logstash/plugins/registry" require "logstash/util/password" require "logstash/util/safe_uri" require "logstash/version" diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index a8044b1b7..354512f79 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -13,7 +13,6 @@ require "logstash/pipeline_reporter" require "logstash/instrument/collector" require "logstash/filter_delegator" require "logstash/queue_factory" -require "logstash/plugins/plugin_factory" require "logstash/compiler" require "securerandom" diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index 84d19c2b7..59fa8a9dc 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -133,6 +133,7 @@ class LogStash::Plugin # Should I remove this now and make sure the pipeline invoke the Registry or I should wait for 6.0 # Its not really part of the public api but its used by the tests a lot to mock the plugins. def self.lookup(type, name) + require "logstash/plugins/registry" LogStash::PLUGIN_REGISTRY.lookup_pipeline_plugin(type, name) end end # class LogStash::Plugin diff --git a/logstash-core/lib/logstash/plugins/plugin_factory.rb b/logstash-core/lib/logstash/plugins/plugin_factory.rb deleted file mode 100644 index e74646f04..000000000 --- a/logstash-core/lib/logstash/plugins/plugin_factory.rb +++ /dev/null @@ -1,80 +0,0 @@ -# encoding: utf-8 - -module LogStash - module Plugins - class PluginFactory - include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory - - def self.filter_delegator(wrapper_class, filter_class, args, filter_metrics, execution_context) - filter_instance = filter_class.new(args) - id = args["id"] - filter_instance.metric = filter_metrics.namespace(id.to_sym) - filter_instance.execution_context = execution_context - wrapper_class.new(filter_instance, id) - end - - def initialize(lir, metric_factory, exec_factory, filter_class) - @lir = lir - @plugins_by_id = {} - @metric_factory = metric_factory - @exec_factory = exec_factory - @filter_class = filter_class - end - - def buildOutput(name, line, column, *args) - plugin("output", name, line, column, *args) - end - - def buildFilter(name, line, column, *args) - plugin("filter", name, line, column, *args) - end - - def buildInput(name, line, column, *args) - plugin("input", name, line, column, *args) - end - - def buildCodec(name, *args) - plugin("codec", name, 0, 0, *args) - end - - def plugin(plugin_type, name, line, column, *args) - # Collapse the array of arguments into a single merged hash - args = args.reduce({}, &:merge) - - if plugin_type == "codec" - id = SecureRandom.uuid # codecs don't really use their IDs for metrics, so we can use anything here - else - # Pull the ID from LIR to keep IDs consistent between the two representations - id = @lir.graph.vertices.filter do |v| - v.source_with_metadata && - v.source_with_metadata.line == line && - v.source_with_metadata.column == column - end.findFirst.get.id - end - args["id"] = id # some code pulls the id out of the args - - raise ConfigurationError, "Could not determine ID for #{plugin_type}/#{plugin_name}" unless id - raise ConfigurationError, "Two plugins have the id '#{id}', please fix this conflict" if @plugins_by_id[id] - - @plugins_by_id[id] = true - # Scope plugins of type 'input' to 'inputs' - type_scoped_metric = @metric_factory.create(plugin_type) - klass = Plugin.lookup(plugin_type, name) - execution_context = @exec_factory.create(id, klass.config_name) - - if plugin_type == "output" - OutputDelegator.new(klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args) - elsif plugin_type == "filter" - self.class.filter_delegator(@filter_class, klass, args, type_scoped_metric, execution_context) - else # input or codec plugin - plugin_instance = klass.new(args) - scoped_metric = type_scoped_metric.namespace(id.to_sym) - scoped_metric.gauge(:name, plugin_instance.config_name) - plugin_instance.metric = scoped_metric - plugin_instance.execution_context = execution_context - plugin_instance - end - end - end - end -end diff --git a/logstash-core/lib/logstash/plugins/registry.rb b/logstash-core/lib/logstash/plugins/registry.rb index a84d9cbc1..cd3e20ab6 100644 --- a/logstash-core/lib/logstash/plugins/registry.rb +++ b/logstash-core/lib/logstash/plugins/registry.rb @@ -3,6 +3,9 @@ require "rubygems/package" require "logstash/plugin" require "logstash/plugins/hooks_registry" require "logstash/modules/scaffold" +require "logstash/codecs/base" +require "logstash/filters/base" +require "logstash/outputs/base" module LogStash module Plugins class Registry diff --git a/logstash-core/lib/logstash/util/modules_setting_array.rb b/logstash-core/lib/logstash/util/modules_setting_array.rb index 85def77c5..5b4b2da8c 100644 --- a/logstash-core/lib/logstash/util/modules_setting_array.rb +++ b/logstash-core/lib/logstash/util/modules_setting_array.rb @@ -1,4 +1,5 @@ # encoding: utf-8 +require "forwardable" require "logstash/util/password" module LogStash module Util class ModulesSettingArray diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 682a10abc..8f1bae0f0 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -138,6 +138,8 @@ public final class RubyUtil { public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS; + public static final RubyClass PLUGIN_FACTORY_CLASS; + public static final RubyClass LOGGER; public static final RubyModule LOGGABLE_MODULE; @@ -146,6 +148,8 @@ public final class RubyUtil { public static final RubyModule UTIL_MODULE; + public static final RubyClass CONFIGURATION_ERROR_CLASS; + /** * Logstash Ruby Module. */ @@ -350,7 +354,7 @@ public final class RubyUtil { LOGSTASH_MODULE.defineClassUnder( "EnvironmentError", stdErr, JRubyLogstashErrorsExt.LogstashEnvironmentError::new ); - LOGSTASH_MODULE.defineClassUnder( + CONFIGURATION_ERROR_CLASS = LOGSTASH_MODULE.defineClassUnder( "ConfigurationError", stdErr, JRubyLogstashErrorsExt.ConfigurationError::new ); LOGSTASH_MODULE.defineClassUnder( @@ -403,6 +407,10 @@ public final class RubyUtil { RUBY_EVENT_CLASS.setConstant("VERSION_ONE", RUBY.newString(Event.VERSION_ONE)); RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class); RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class); + PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder( + "PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new + ); + PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class); RUBY.getGlobalVariables().set("$LS_JARS_LOADED", RUBY.newString("true")); RubyJavaIntegration.setupRubyJavaIntegration(RUBY); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java index 046c3b2bf..a2ba0f2ce 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java @@ -1,16 +1,31 @@ package org.logstash.plugins; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Locale; +import java.util.Map; +import java.util.UUID; import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyBasicObject; import org.jruby.RubyClass; +import org.jruby.RubyHash; +import org.jruby.RubyInteger; +import org.jruby.RubyString; import org.jruby.RubySymbol; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; +import org.logstash.config.ir.PipelineIR; +import org.logstash.config.ir.compiler.FilterDelegatorExt; +import org.logstash.config.ir.compiler.OutputDelegatorExt; +import org.logstash.config.ir.compiler.OutputStrategyExt; +import org.logstash.config.ir.compiler.RubyIntegration; +import org.logstash.config.ir.graph.Vertex; import org.logstash.execution.ExecutionContextExt; import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; @@ -18,6 +33,205 @@ import org.logstash.instrument.metrics.NullMetricExt; public final class PluginFactoryExt { + @JRubyClass(name = "PluginFactory") + public static final class Plugins extends RubyBasicObject + implements RubyIntegration.PluginFactory { + + private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id"); + + private static final RubySymbol NAME_KEY = RubyUtil.RUBY.newSymbol("name"); + + private final Collection pluginsById = new HashSet<>(); + + private PipelineIR lir; + + private PluginFactoryExt.ExecutionContext executionContext; + + private PluginFactoryExt.Metrics metrics; + + private RubyClass filterClass; + + @JRubyMethod(name = "filter_delegator", meta = true, required = 5) + public static IRubyObject filterDelegator(final ThreadContext context, + final IRubyObject recv, final IRubyObject[] args) { + final RubyHash arguments = (RubyHash) args[2]; + final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); + final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); + filterInstance.callMethod( + context, "metric=", + args[3].callMethod(context, "namespace", id.intern19()) + ); + filterInstance.callMethod(context, "execution_context=", args[4]); + return args[0].callMethod(context, "new", new IRubyObject[]{filterInstance, id}); + } + + public Plugins(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod(required = 4) + public Plugins initialize(final ThreadContext context, final IRubyObject[] args) { + lir = (PipelineIR) args[0].toJava(PipelineIR.class); + metrics = (PluginFactoryExt.Metrics) args[1]; + executionContext = (PluginFactoryExt.ExecutionContext) args[2]; + filterClass = (RubyClass) args[3]; + return this; + } + + @SuppressWarnings("unchecked") + @Override + public IRubyObject buildInput(final RubyString name, final RubyInteger line, + final RubyInteger column, final IRubyObject args) { + return plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, + name.asJavaString(), line.getIntValue(), column.getIntValue(), + (Map) args + ); + } + + @JRubyMethod(required = 4) + public IRubyObject buildInput(final ThreadContext context, final IRubyObject[] args) { + return buildInput( + (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), + args[3] + ); + } + + @SuppressWarnings("unchecked") + @Override + public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger line, + final RubyInteger column, final IRubyObject args) { + return (OutputDelegatorExt) plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, + name.asJavaString(), line.getIntValue(), column.getIntValue(), + (Map) args + ); + } + + @JRubyMethod(required = 4) + public OutputDelegatorExt buildOutput(final ThreadContext context, + final IRubyObject[] args) { + return buildOutput( + (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), args[3] + ); + } + + @SuppressWarnings("unchecked") + @Override + public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, + final RubyInteger column, final IRubyObject args) { + return (FilterDelegatorExt) plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, + name.asJavaString(), line.getIntValue(), column.getIntValue(), + (Map) args + ); + } + + @JRubyMethod(required = 4) + public IRubyObject buildFilter(final ThreadContext context, final IRubyObject[] args) { + return buildFilter( + (RubyString) args[0], args[1].convertToInteger(), args[2].convertToInteger(), + args[3] + ); + } + + @SuppressWarnings("unchecked") + @Override + public IRubyObject buildCodec(final RubyString name, final IRubyObject args) { + return plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, + name.asJavaString(), 0, 0, (Map) args + ); + } + + @JRubyMethod(required = 4) + public IRubyObject buildCodec(final ThreadContext context, final IRubyObject[] args) { + return buildCodec((RubyString) args[0], args[1]); + } + + @SuppressWarnings("unchecked") + @JRubyMethod(required = 4, optional = 1) + public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { + return plugin( + context, + PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), + args[1].asJavaString(), + args[2].convertToInteger().getIntValue(), + args[3].convertToInteger().getIntValue(), + args.length > 4 ? (Map) args[4] : new HashMap<>() + ); + } + + private IRubyObject plugin(final ThreadContext context, + final PluginLookup.PluginType type, final String name, final int line, final int column, + final Map args) { + final String id; + if (type == PluginLookup.PluginType.CODEC) { + id = UUID.randomUUID().toString(); + } else { + id = lir.getGraph().vertices().filter( + v -> v.getSourceWithMetadata() != null + && v.getSourceWithMetadata().getLine() == line + && v.getSourceWithMetadata().getColumn() == column + ).findFirst().map(Vertex::getId).orElse(null); + } + if (id == null) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name + ) + ); + } + if (pluginsById.contains(id)) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format("Two plugins have the id '%s', please fix this conflict", id) + ); + } + pluginsById.add(id); + final AbstractNamespacedMetricExt typeScopedMetric = + metrics.create(context, type.rubyLabel()); + final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name); + if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) { + final Map newArgs = new HashMap<>(args); + newArgs.put("id", id); + final RubyClass klass = (RubyClass) pluginClass.klass(); + final ExecutionContextExt executionCntx = executionContext.create( + context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name") + ); + final RubyHash rubyArgs = RubyHash.newHash(context.runtime); + rubyArgs.putAll(newArgs); + if (type == PluginLookup.PluginType.OUTPUT) { + return new OutputDelegatorExt(context.runtime, RubyUtil.OUTPUT_DELEGATOR_CLASS).init( + context, + new IRubyObject[]{ + klass, typeScopedMetric, executionCntx, + OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null), + rubyArgs + } + ); + } else if (type == PluginLookup.PluginType.FILTER) { + return filterDelegator( + context, null, + new IRubyObject[]{ + filterClass, klass, rubyArgs, typeScopedMetric, executionCntx + } + ); + } else { + final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); + final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); + scopedMetric.gauge(context, NAME_KEY, pluginInstance.callMethod(context, "config_name")); + pluginInstance.callMethod(context, "metric=", scopedMetric); + pluginInstance.callMethod(context, "execution_context=", executionCntx); + return pluginInstance; + } + } else { + return context.nil; + } + } + } + @JRubyClass(name = "ExecutionContextFactory") public static final class ExecutionContext extends RubyBasicObject { diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java b/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java new file mode 100644 index 000000000..b3f91b41a --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginLookup.java @@ -0,0 +1,64 @@ +package org.logstash.plugins; + +import org.jruby.RubyString; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; + +/** + * Java Implementation of the plugin that is implemented by wrapping the Ruby + * {@code LogStash::Plugin} class for the Ruby plugin lookup. + */ +public final class PluginLookup { + + private static final IRubyObject RUBY_REGISTRY = RubyUtil.RUBY.executeScript( + "require 'logstash/plugins/registry'\nrequire 'logstash/plugin'\nLogStash::Plugin", + "" + ); + + private PluginLookup() { + // Utility Class + } + + public static PluginLookup.PluginClass lookup(final PluginLookup.PluginType type, + final String name) { + return new PluginLookup.PluginClass() { + @Override + public PluginLookup.PluginLanguage language() { + return PluginLookup.PluginLanguage.RUBY; + } + + @Override + public Object klass() { + return RUBY_REGISTRY.callMethod( + RubyUtil.RUBY.getCurrentContext(), "lookup", + new IRubyObject[]{type.rubyLabel(), RubyUtil.RUBY.newString(name)} + ); + } + }; + } + + public interface PluginClass { + + PluginLookup.PluginLanguage language(); + + Object klass(); + } + + public enum PluginLanguage { + JAVA, RUBY + } + + public enum PluginType { + INPUT("input"), FILTER("filter"), OUTPUT("output"), CODEC("codec"); + + private final RubyString label; + + PluginType(final String label) { + this.label = RubyUtil.RUBY.newString(label); + } + + RubyString rubyLabel() { + return label; + } + } +} diff --git a/logstash-core/src/test/java/org/logstash/config/ir/RubyEnvTestCase.java b/logstash-core/src/test/java/org/logstash/config/ir/RubyEnvTestCase.java index b8bba2f28..58606916d 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/RubyEnvTestCase.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/RubyEnvTestCase.java @@ -29,8 +29,7 @@ public abstract class RubyEnvTestCase { .resolve("jruby").resolve("2.3.0").toFile().getAbsolutePath(); environment.put("GEM_HOME", gems); environment.put("GEM_PATH", gems); - loader.addPaths(root.resolve("lib").toFile().getAbsolutePath() - ); + loader.addPaths(root.resolve("lib").toFile().getAbsolutePath()); } } }