From 3e380bf10f5d591725b93b5d7f7302fe157cb447 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Fri, 26 Jun 2020 11:09:08 +0200 Subject: [PATCH] Backport of PR #11457 to 7.x (#12057) Simplified if..else if in PluginFactory for Java plugins part, moved to template method pattern --- logstash-core/build.gradle | 4 +- logstash-core/lib/logstash/pipeline.rb | 2 +- .../src/main/java/org/logstash/RubyUtil.java | 20 +- .../logstash/config/ir/CompiledPipeline.java | 2 +- .../execution/JavaBasePipelineExt.java | 12 +- .../plugins/ConfigVariableExpander.java | 5 +- .../logstash/plugins/PluginFactoryExt.java | 486 ------------------ .../org/logstash/plugins/codecs/Dots.java | 14 +- .../org/logstash/plugins/codecs/Line.java | 18 +- .../org/logstash/plugins/codecs/Plain.java | 17 +- .../factory/AbstractPluginCreator.java | 45 ++ .../plugins/factory/CodecPluginCreator.java | 23 + .../factory/ExecutionContextFactoryExt.java | 77 +++ .../plugins/factory/FilterPluginCreator.java | 21 + .../plugins/factory/InputPluginCreator.java | 27 + .../plugins/factory/OutputPluginCreator.java | 25 + .../plugins/factory/PluginFactoryExt.java | 261 ++++++++++ .../factory/PluginMetricsFactoryExt.java | 61 +++ .../org/logstash/plugins/MetricTestCase.java | 2 +- .../{ => factory}/PluginFactoryExtTest.java | 24 +- 20 files changed, 615 insertions(+), 531 deletions(-) delete mode 100644 logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/AbstractPluginCreator.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/CodecPluginCreator.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/FilterPluginCreator.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/InputPluginCreator.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/OutputPluginCreator.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/factory/PluginMetricsFactoryExt.java rename logstash-core/src/test/java/org/logstash/plugins/{ => factory}/PluginFactoryExtTest.java (85%) diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index 4d308557b..5c837b2e6 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -97,7 +97,7 @@ tasks.register("javaTests", Test) { exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' exclude '/org/logstash/plugins/NamespacedMetricImplTest.class' exclude '/org/logstash/plugins/CounterMetricImplTest.class' - exclude '/org/logstash/plugins/PluginFactoryExtTest.class' + exclude '/org/logstash/plugins/factory/PluginFactoryExtTest.class' } tasks.register("rubyTests", Test) { @@ -113,7 +113,7 @@ tasks.register("rubyTests", Test) { include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' include '/org/logstash/plugins/NamespacedMetricImplTest.class' include '/org/logstash/plugins/CounterMetricImplTest.class' - include '/org/logstash/plugins/PluginFactoryExtTest.class' + include '/org/logstash/plugins/factory/PluginFactoryExtTest.class' } test { diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 766d1be49..3afc80677 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -46,7 +46,7 @@ module LogStash; class BasePipeline < AbstractPipeline @plugin_factory = LogStash::Plugins::PluginFactory.new( # use NullMetric if called in the BasePipeline context otherwise use the @metric value - lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, metric), + lir, LogStash::Plugins::PluginMetricsFactory.new(pipeline_id, metric), LogStash::Plugins::ExecutionContextFactory.new(@agent, self, dlq_writer), FilterDelegator ) diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index d5c8ed817..d80267923 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -73,9 +73,11 @@ import org.logstash.log.LoggableExt; import org.logstash.log.LoggerExt; import org.logstash.log.SlowLoggerExt; import org.logstash.plugins.HooksRegistryExt; -import org.logstash.plugins.PluginFactoryExt; import org.logstash.plugins.UniversalPluginExt; import org.logstash.util.UtilExt; +import org.logstash.plugins.factory.ExecutionContextFactoryExt; +import org.logstash.plugins.factory.PluginMetricsFactoryExt; +import org.logstash.plugins.factory.PluginFactoryExt; import java.util.stream.Stream; @@ -193,7 +195,7 @@ public final class RubyUtil { public static final RubyClass EXECUTION_CONTEXT_FACTORY_CLASS; - public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS; + public static final RubyClass PLUGIN_METRICS_FACTORY_CLASS; public static final RubyClass PLUGIN_FACTORY_CLASS; @@ -259,16 +261,16 @@ public final class RubyUtil { METRIC_SNAPSHOT_CLASS.defineAnnotatedMethods(SnapshotExt.class); EXECUTION_CONTEXT_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder( "ExecutionContextFactory", RUBY.getObject(), - PluginFactoryExt.ExecutionContext::new + ExecutionContextFactoryExt::new ); - PLUGIN_METRIC_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder( - "PluginMetricFactory", RUBY.getObject(), PluginFactoryExt.Metrics::new + PLUGIN_METRICS_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder( + "PluginMetricsFactory", RUBY.getObject(), PluginMetricsFactoryExt::new ); SHUTDOWN_WATCHER_CLASS = setupLogstashClass(ShutdownWatcherExt::new, ShutdownWatcherExt.class); - PLUGIN_METRIC_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Metrics.class); + PLUGIN_METRICS_FACTORY_CLASS.defineAnnotatedMethods(PluginMetricsFactoryExt.class); EXECUTION_CONTEXT_FACTORY_CLASS.defineAnnotatedMethods( - PluginFactoryExt.ExecutionContext.class + ExecutionContextFactoryExt.class ); METRIC_EXCEPTION_CLASS = instrumentModule.defineClassUnder( "MetricException", RUBY.getException(), MetricExt.MetricException::new @@ -546,9 +548,9 @@ public final class RubyUtil { RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class); RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class); PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder( - "PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new + "PluginFactory", RUBY.getObject(), PluginFactoryExt::new ); - PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class); + PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.class); UNIVERSAL_PLUGIN_CLASS = setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class); EVENT_DISPATCHER_CLASS = diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 3ed4ad9c4..b351c0612 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -116,7 +116,7 @@ public final class CompiledPipeline { filters = setupFilters(cve); outputs = setupOutputs(cve); } catch (Exception e) { - throw new IllegalStateException("Unable to configure plugins: " + e.getMessage()); + throw new IllegalStateException("Unable to configure plugins: " + e.getMessage(), e); } } diff --git a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java index 75a78ad70..aea003dde 100644 --- a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java @@ -36,7 +36,9 @@ import org.logstash.common.IncompleteSourceWithMetadataException; import org.logstash.config.ir.CompiledPipeline; import org.logstash.execution.queue.QueueWriter; import org.logstash.ext.JRubyWrappedWriteClientExt; -import org.logstash.plugins.PluginFactoryExt; +import org.logstash.plugins.factory.ExecutionContextFactoryExt; +import org.logstash.plugins.factory.PluginMetricsFactoryExt; +import org.logstash.plugins.factory.PluginFactoryExt; import java.security.NoSuchAlgorithmException; import java.util.Collection; @@ -67,12 +69,12 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt { initialize(context, args[0], args[1], args[2]); lirExecution = new CompiledPipeline( lir, - new PluginFactoryExt.Plugins(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init( + new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init( lir, - new PluginFactoryExt.Metrics( - context.runtime, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS + new PluginMetricsFactoryExt( + context.runtime, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS ).initialize(context, pipelineId(), metric()), - new PluginFactoryExt.ExecutionContext( + new ExecutionContextFactoryExt( context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS ).initialize(context, args[3], this, dlqWriter(context)), RubyUtil.FILTER_DELEGATOR_CLASS diff --git a/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java index 3dd4d86d0..fb1247f3c 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java +++ b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java @@ -38,8 +38,11 @@ public class ConfigVariableExpander implements AutoCloseable { /** * Creates a ConfigVariableExpander that doesn't lookup any secreted placeholder. + * + * @param envVarProvider EnvironmentVariableProvider to use as source of substitutions + * @return an variable expander that uses envVarProvider as source * */ - static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) { + public static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) { return new ConfigVariableExpander(null, envVarProvider); } diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java deleted file mode 100644 index fabe2150b..000000000 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ /dev/null @@ -1,486 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.logstash.plugins; - -import co.elastic.logstash.api.Codec; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Context; -import co.elastic.logstash.api.DeadLetterQueueWriter; -import co.elastic.logstash.api.Filter; -import co.elastic.logstash.api.Input; -import co.elastic.logstash.api.Output; -import org.jruby.Ruby; -import org.jruby.RubyArray; -import org.jruby.RubyBasicObject; -import org.jruby.RubyClass; -import org.jruby.RubyHash; -import org.jruby.RubyString; -import org.jruby.RubySymbol; -import org.jruby.anno.JRubyClass; -import org.jruby.anno.JRubyMethod; -import org.jruby.javasupport.JavaUtil; -import org.jruby.runtime.ThreadContext; -import org.jruby.runtime.builtin.IRubyObject; -import org.logstash.RubyUtil; -import org.logstash.common.AbstractDeadLetterQueueWriterExt; -import org.logstash.common.DLQWriterAdapter; -import org.logstash.common.EnvironmentVariableProvider; -import org.logstash.common.NullDeadLetterQueueWriter; -import org.logstash.common.SourceWithMetadata; -import org.logstash.config.ir.PipelineIR; -import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; -import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; -import org.logstash.config.ir.compiler.FilterDelegatorExt; -import org.logstash.config.ir.compiler.JavaCodecDelegator; -import org.logstash.config.ir.compiler.JavaFilterDelegatorExt; -import org.logstash.config.ir.compiler.JavaInputDelegatorExt; -import org.logstash.config.ir.compiler.JavaOutputDelegatorExt; -import org.logstash.config.ir.compiler.OutputDelegatorExt; -import org.logstash.config.ir.compiler.OutputStrategyExt; -import org.logstash.config.ir.compiler.RubyIntegration; -import org.logstash.config.ir.graph.Vertex; -import org.logstash.execution.ExecutionContextExt; -import org.logstash.execution.JavaBasePipelineExt; -import org.logstash.instrument.metrics.AbstractMetricExt; -import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; -import org.logstash.instrument.metrics.MetricKeys; -import org.logstash.instrument.metrics.NullMetricExt; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Locale; -import java.util.Map; -import java.util.UUID; - -public final class PluginFactoryExt { - - @FunctionalInterface - public interface PluginResolver { - PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name); - } - - @JRubyClass(name = "PluginFactory") - public static final class Plugins extends RubyBasicObject - implements RubyIntegration.PluginFactory { - - private static final long serialVersionUID = 1L; - - private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id"); - - private final Collection pluginsById = new HashSet<>(); - - private PipelineIR lir; - - private PluginFactoryExt.ExecutionContext executionContext; - - private PluginFactoryExt.Metrics metrics; - - private RubyClass filterClass; - - private ConfigVariableExpander configVariables; - - private PluginResolver pluginResolver; - - @JRubyMethod(name = "filter_delegator", meta = true, required = 5) - public static IRubyObject filterDelegator(final ThreadContext context, - final IRubyObject recv, final IRubyObject[] args) { - final RubyHash arguments = (RubyHash) args[2]; - final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); - final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); - filterInstance.callMethod( - context, "metric=", - ((AbstractMetricExt) args[3]).namespace(context, id.intern()) - ); - filterInstance.callMethod(context, "execution_context=", args[4]); - return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS) - .initialize(context, filterInstance, id); - } - - public Plugins(final Ruby runtime, final RubyClass metaClass) { - this(runtime, metaClass, PluginLookup::lookup); - } - - Plugins(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) { - super(runtime, metaClass); - this.pluginResolver = pluginResolver; - } - - @JRubyMethod(required = 4) - public PluginFactoryExt.Plugins initialize(final ThreadContext context, - final IRubyObject[] args) { - return init( - args[0].toJava(PipelineIR.class), - (PluginFactoryExt.Metrics) args[1], (PluginFactoryExt.ExecutionContext) args[2], - (RubyClass) args[3] - ); - } - - public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics, - final PluginFactoryExt.ExecutionContext executionContext, - final RubyClass filterClass) { - return this.init(lir, metrics, executionContext, filterClass, EnvironmentVariableProvider.defaultProvider()); - } - - PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics, - final PluginFactoryExt.ExecutionContext executionContext, - final RubyClass filterClass, - final EnvironmentVariableProvider envVars) { - this.lir = lir; - this.metrics = metrics; - this.executionContext = executionContext; - this.filterClass = filterClass; - this.configVariables = ConfigVariableExpander.withoutSecret(envVars); - return this; - } - - @SuppressWarnings("unchecked") - @Override - public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { - return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(), - source, (Map) args, pluginArgs - ); - } - - @SuppressWarnings("unchecked") - @Override - public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { - return (AbstractOutputDelegatorExt) plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(), - source, (Map) args, pluginArgs - ); - } - - @SuppressWarnings("unchecked") - @Override - public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, - final IRubyObject args, Map pluginArgs) { - return (AbstractFilterDelegatorExt) plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(), - source, (Map) args, pluginArgs - ); - } - - @SuppressWarnings("unchecked") - @Override - public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, - Map pluginArgs) { - return plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - name.asJavaString(), source, (Map) args, pluginArgs - ); - } - - @Override - public Codec buildDefaultCodec(String codecName) { - return (Codec) JavaUtil.unwrapJavaValue(plugin( - RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, - codecName, null, Collections.emptyMap(), Collections.emptyMap() - )); - } - - @SuppressWarnings("unchecked") - @JRubyMethod(required = 3, optional = 1) - public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { - return plugin( - context, - PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), - args[1].asJavaString(), - JavaUtil.unwrapIfJavaObject(args[2]), - args.length > 3 ? (Map) args[3] : new HashMap<>(), - null - ); - } - @SuppressWarnings("unchecked") - private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, - SourceWithMetadata source, final Map args, - Map pluginArgs) { - final String id; - final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); - - if (type == PluginLookup.PluginType.CODEC) { - id = UUID.randomUUID().toString(); - } else { - String unresolvedId = lir.getGraph().vertices() - .filter(v -> v.getSourceWithMetadata() != null - && v.getSourceWithMetadata().equalsWithoutText(source)) - .findFirst() - .map(Vertex::getId).orElse(null); - id = (String) configVariables.expand(unresolvedId); - } - if (id == null) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format("Could not determine ID for %s/%s, source don't matched: %s", - type.rubyLabel().asJavaString(), name, source - ) - ); - } - if (pluginsById.contains(id)) { - throw context.runtime.newRaiseException( - RubyUtil.CONFIGURATION_ERROR_CLASS, - String.format("Two plugins have the id '%s', please fix this conflict", id) - ); - } - pluginsById.add(id); - final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel()); - - if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) { - - final Map newArgs = new HashMap<>(args); - newArgs.put("id", id); - final RubyClass klass = (RubyClass) pluginClass.klass(); - final ExecutionContextExt executionCntx = executionContext.create( - context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name") - ); - final RubyHash rubyArgs = RubyHash.newHash(context.runtime); - rubyArgs.putAll(newArgs); - if (type == PluginLookup.PluginType.OUTPUT) { - return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize( - context, - new IRubyObject[]{ - klass, typeScopedMetric, executionCntx, - OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null), - rubyArgs - } - ); - } else if (type == PluginLookup.PluginType.FILTER) { - return filterDelegator( - context, null, - new IRubyObject[]{ - filterClass, klass, rubyArgs, typeScopedMetric, executionCntx - } - ); - } else { - final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); - final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); - scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name")); - pluginInstance.callMethod(context, "metric=", scopedMetric); - pluginInstance.callMethod(context, "execution_context=", executionCntx); - return pluginInstance; - } - } else { - if (pluginArgs == null) { - String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." + - " The Java execution engine is required to run Java plugins.", name); - throw new IllegalStateException(err); - } - - if (type == PluginLookup.PluginType.OUTPUT) { - final Class cls = (Class) pluginClass.klass(); - Output output = null; - if (cls != null) { - try { - final Constructor ctor = cls.getConstructor(String.class, Configuration.class, Context.class); - Configuration config = new ConfigurationImpl(pluginArgs, this); - output = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context))); - PluginUtil.validateConfig(output, config); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { - if (ex instanceof InvocationTargetException && ex.getCause() != null) { - throw new IllegalStateException((ex).getCause()); - } - throw new IllegalStateException(ex); - } - } - - if (output != null) { - return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output); - } else { - throw new IllegalStateException("Unable to instantiate output: " + pluginClass); - } - } else if (type == PluginLookup.PluginType.FILTER) { - final Class cls = (Class) pluginClass.klass(); - Filter filter = null; - if (cls != null) { - try { - final Constructor ctor = cls.getConstructor(String.class, Configuration.class, Context.class); - Configuration config = new ConfigurationImpl(pluginArgs); - filter = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context))); - PluginUtil.validateConfig(filter, config); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { - if (ex instanceof InvocationTargetException && ex.getCause() != null) { - throw new IllegalStateException((ex).getCause()); - } - throw new IllegalStateException(ex); - } - } - - if (filter != null) { - return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs); - } else { - throw new IllegalStateException("Unable to instantiate filter: " + pluginClass); - } - } else if (type == PluginLookup.PluginType.INPUT) { - final Class cls = (Class) pluginClass.klass(); - Input input = null; - if (cls != null) { - try { - final Constructor ctor = cls.getConstructor(String.class, Configuration.class, Context.class); - Configuration config = new ConfigurationImpl(pluginArgs, this); - input = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context))); - PluginUtil.validateConfig(input, config); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { - if (ex instanceof InvocationTargetException && ex.getCause() != null) { - throw new IllegalStateException((ex).getCause()); - } - throw new IllegalStateException(ex); - } - } - - if (input != null) { - return JavaInputDelegatorExt.create((JavaBasePipelineExt) executionContext.pipeline, typeScopedMetric, input, pluginArgs); - } else { - throw new IllegalStateException("Unable to instantiate input: " + pluginClass); - } - } else if (type == PluginLookup.PluginType.CODEC) { - final Class cls = (Class) pluginClass.klass(); - if (cls != null) { - try { - final Constructor ctor = cls.getConstructor(Configuration.class, Context.class); - Configuration config = new ConfigurationImpl(pluginArgs); - final Context pluginContext = executionContext.toContext(type, metrics.getRoot(context)); - final Codec codec = ctor.newInstance(config, pluginContext); - PluginUtil.validateConfig(codec, config); - return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec)); - } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { - if (ex instanceof InvocationTargetException && ex.getCause() != null) { - throw new IllegalStateException((ex).getCause()); - } - throw new IllegalStateException(ex); - } - } - - throw new IllegalStateException("Unable to instantiate codec: " + pluginClass); - } - else { - throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString()); - } - } - } - } - - @JRubyClass(name = "ExecutionContextFactory") - public static final class ExecutionContext extends RubyBasicObject { - - private static final long serialVersionUID = 1L; - - private IRubyObject agent; - - private IRubyObject pipeline; - - private IRubyObject dlqWriter; - - public ExecutionContext(final Ruby runtime, final RubyClass metaClass) { - super(runtime, metaClass); - } - - @JRubyMethod - public PluginFactoryExt.ExecutionContext initialize(final ThreadContext context, - final IRubyObject agent, final IRubyObject pipeline, final IRubyObject dlqWriter) { - this.agent = agent; - this.pipeline = pipeline; - this.dlqWriter = dlqWriter; - return this; - } - - @JRubyMethod - public ExecutionContextExt create(final ThreadContext context, final IRubyObject id, - final IRubyObject classConfigName) { - return new ExecutionContextExt( - context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS - ).initialize( - context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter} - ); - } - - public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) { - DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance(); - if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) { - IRubyObject innerWriter = - ((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter) - .innerWriter(RubyUtil.RUBY.getCurrentContext()); - if (innerWriter != null) { - if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) { - dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class)); - } - } - } else if (dlqWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) { - dlq = dlqWriter.toJava(DeadLetterQueueWriter.class); - } - - return new ContextImpl(dlq, new NamespacedMetricImpl(RubyUtil.RUBY.getCurrentContext(), metric)); - } - } - - @JRubyClass(name = "PluginMetricFactory") - public static final class Metrics extends RubyBasicObject { - - private static final long serialVersionUID = 1L; - - private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins"); - - private RubySymbol pipelineId; - - private AbstractMetricExt metric; - - public Metrics(final Ruby runtime, final RubyClass metaClass) { - super(runtime, metaClass); - } - - @JRubyMethod - public PluginFactoryExt.Metrics initialize(final ThreadContext context, - final IRubyObject pipelineId, final IRubyObject metrics) { - this.pipelineId = pipelineId.convertToString().intern(); - if (metrics.isNil()) { - this.metric = new NullMetricExt(context.runtime, RubyUtil.NULL_METRIC_CLASS); - } else { - this.metric = (AbstractMetricExt) metrics; - } - return this; - } - - AbstractNamespacedMetricExt getRoot(final ThreadContext context) { - return metric.namespace( - context, - RubyArray.newArray( - context.runtime, - Arrays.asList( - MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS - ) - ) - ); - } - - @JRubyMethod - public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) { - return getRoot(context).namespace( - context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString())) - ); - } - } -} diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java index 307944409..3326161d0 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Dots.java @@ -41,12 +41,16 @@ public class Dots implements Codec { private final String id; - public Dots(final Configuration configuration, final Context context) { - this(); + public Dots(final String id, final Configuration configuration, final Context context) { + this((id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); } - private Dots() { - this.id = UUID.randomUUID().toString(); + public Dots(final Configuration configuration, final Context context) { + this(UUID.randomUUID().toString()); + } + + private Dots(String id) { + this.id = id; } @Override @@ -66,7 +70,7 @@ public class Dots implements Codec { @Override public Codec cloneCodec() { - return new Dots(); + return new Dots(UUID.randomUUID().toString()); } @Override diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java index d62981aaa..e42c5fa9f 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Line.java @@ -77,16 +77,26 @@ public class Line implements Codec { /** * Required constructor. * + * @param id plugin id + * @param configuration Logstash Configuration + * @param context Logstash Context + */ + public Line(final String id, final Configuration configuration, final Context context) { + this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG), + (id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); + } + + /* * @param configuration Logstash Configuration * @param context Logstash Context */ public Line(final Configuration configuration, final Context context) { - this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG)); + this(null, configuration, context); } - private Line(Context context, String delimiter, String charsetName, String format) { + private Line(Context context, String delimiter, String charsetName, String format, String id) { this.context = context; - this.id = UUID.randomUUID().toString(); + this.id = id; this.delimiter = delimiter; this.charset = Charset.forName(charsetName); this.format = format; @@ -165,6 +175,6 @@ public class Line implements Codec { @Override public Codec cloneCodec() { - return new Line(context, delimiter, charset.name(), format); + return new Line(context, delimiter, charset.name(), format, UUID.randomUUID().toString()); } } diff --git a/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java b/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java index 026aba044..6ea5afd14 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java +++ b/logstash-core/src/main/java/org/logstash/plugins/codecs/Plain.java @@ -71,16 +71,25 @@ public class Plain implements Codec { /** * Required constructor. * + * @param id plugin id + * @param configuration Logstash Configuration + * @param context Logstash Context + */ + public Plain(final String id, final Configuration configuration, final Context context) { + this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG), + (id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString()); + } + /** * @param configuration Logstash Configuration * @param context Logstash Context */ public Plain(final Configuration configuration, final Context context) { - this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG)); + this(null, configuration, context); } - private Plain(Context context, String charsetName, String format) { + private Plain(Context context, String charsetName, String format, String id) { this.context = context; - this.id = UUID.randomUUID().toString(); + this.id = id; this.charset = Charset.forName(charsetName); this.format = format; decoder = charset.newDecoder(); @@ -127,6 +136,6 @@ public class Plain implements Codec { @Override public Codec cloneCodec() { - return new Plain(context, charset.name(), format); + return new Plain(context, charset.name(), format, UUID.randomUUID().toString()); } } diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/AbstractPluginCreator.java b/logstash-core/src/main/java/org/logstash/plugins/factory/AbstractPluginCreator.java new file mode 100644 index 000000000..4685a01bd --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/AbstractPluginCreator.java @@ -0,0 +1,45 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Plugin; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.ConfigurationImpl; +import org.logstash.plugins.PluginLookup; +import org.logstash.plugins.PluginUtil; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; + +abstract class AbstractPluginCreator { + + protected PluginFactoryExt pluginsFactory = null; + + abstract IRubyObject createDelegator(String name, Map pluginArgs, String id, + AbstractNamespacedMetricExt typeScopedMetric, + PluginLookup.PluginClass pluginClass, Context pluginContext); + + protected T instantiateAndValidate(Map pluginArgs, String id, Context pluginContext, + PluginLookup.PluginClass pluginClass) { + @SuppressWarnings("unchecked") + final Class cls = (Class) pluginClass.klass(); + if (cls == null) { + throw new IllegalStateException("Unable to instantiate type: " + pluginClass); + } + + try { + final Constructor ctor = cls.getConstructor(String.class, Configuration.class, Context.class); + Configuration config = new ConfigurationImpl(pluginArgs, pluginsFactory); + T plugin = ctor.newInstance(id, config, pluginContext); + PluginUtil.validateConfig(plugin, config); + return plugin; + } catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) { + if (ex instanceof InvocationTargetException && ex.getCause() != null) { + throw new IllegalStateException((ex).getCause()); + } + throw new IllegalStateException(ex); + } + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/CodecPluginCreator.java b/logstash-core/src/main/java/org/logstash/plugins/factory/CodecPluginCreator.java new file mode 100644 index 000000000..66f12a83c --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/CodecPluginCreator.java @@ -0,0 +1,23 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Codec; +import co.elastic.logstash.api.Context; +import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.config.ir.compiler.JavaCodecDelegator; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.PluginLookup; + +import java.util.Map; + +class CodecPluginCreator extends AbstractPluginCreator { + + @Override + public IRubyObject createDelegator(String name, Map pluginArgs, String id, + AbstractNamespacedMetricExt typeScopedMetric, + PluginLookup.PluginClass pluginClass, Context pluginContext) { + Codec codec = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass); + return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec)); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java new file mode 100644 index 000000000..d72ef4a5c --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/ExecutionContextFactoryExt.java @@ -0,0 +1,77 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.DeadLetterQueueWriter; +import org.jruby.Ruby; +import org.jruby.RubyBasicObject; +import org.jruby.RubyClass; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.common.AbstractDeadLetterQueueWriterExt; +import org.logstash.common.DLQWriterAdapter; +import org.logstash.common.NullDeadLetterQueueWriter; +import org.logstash.execution.ExecutionContextExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.ContextImpl; +import org.logstash.plugins.NamespacedMetricImpl; +import org.logstash.plugins.PluginLookup; + +@JRubyClass(name = "ExecutionContextFactory") +public final class ExecutionContextFactoryExt extends RubyBasicObject { + + private static final long serialVersionUID = 1L; + + private IRubyObject agent; + + private IRubyObject pipeline; + + private IRubyObject dlqWriter; + + public ExecutionContextFactoryExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public ExecutionContextFactoryExt initialize(final ThreadContext context, final IRubyObject agent, + final IRubyObject pipeline, final IRubyObject dlqWriter) { + this.agent = agent; + this.pipeline = pipeline; + this.dlqWriter = dlqWriter; + return this; + } + + @JRubyMethod + public ExecutionContextExt create(final ThreadContext context, final IRubyObject id, + final IRubyObject classConfigName) { + return new ExecutionContextExt( + context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS + ).initialize( + context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter} + ); + } + + Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) { + DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance(); + if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) { + IRubyObject innerWriter = + ((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter) + .innerWriter(RubyUtil.RUBY.getCurrentContext()); + if (innerWriter != null) { + if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) { + dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class)); + } + } + } else if (dlqWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) { + dlq = dlqWriter.toJava(DeadLetterQueueWriter.class); + } + + return new ContextImpl(dlq, new NamespacedMetricImpl(RubyUtil.RUBY.getCurrentContext(), metric)); + } + + IRubyObject getPipeline() { + return pipeline; + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/FilterPluginCreator.java b/logstash-core/src/main/java/org/logstash/plugins/factory/FilterPluginCreator.java new file mode 100644 index 000000000..5d98f7d4f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/FilterPluginCreator.java @@ -0,0 +1,21 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Filter; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.config.ir.compiler.JavaFilterDelegatorExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.PluginLookup; + +import java.util.Map; + +class FilterPluginCreator extends AbstractPluginCreator { + + @Override + public IRubyObject createDelegator(String name, Map pluginArgs, String id, + AbstractNamespacedMetricExt typeScopedMetric, + PluginLookup.PluginClass pluginClass, Context pluginContext) { + Filter filter = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass); + return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/InputPluginCreator.java b/logstash-core/src/main/java/org/logstash/plugins/factory/InputPluginCreator.java new file mode 100644 index 000000000..66929065b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/InputPluginCreator.java @@ -0,0 +1,27 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Input; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.config.ir.compiler.JavaInputDelegatorExt; +import org.logstash.execution.JavaBasePipelineExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.PluginLookup; + +import java.util.Map; + +class InputPluginCreator extends AbstractPluginCreator { + + InputPluginCreator(PluginFactoryExt pluginsFactory) { + this.pluginsFactory = pluginsFactory; + } + + @Override + public IRubyObject createDelegator(String name, Map pluginArgs, String id, + AbstractNamespacedMetricExt typeScopedMetric, + PluginLookup.PluginClass pluginClass, Context pluginContext) { + Input input = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass); + return JavaInputDelegatorExt.create((JavaBasePipelineExt) pluginsFactory.getExecutionContextFactory().getPipeline(), + typeScopedMetric, input, pluginArgs); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/OutputPluginCreator.java b/logstash-core/src/main/java/org/logstash/plugins/factory/OutputPluginCreator.java new file mode 100644 index 000000000..25039546f --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/OutputPluginCreator.java @@ -0,0 +1,25 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Output; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.config.ir.compiler.JavaOutputDelegatorExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.plugins.PluginLookup; + +import java.util.Map; + +class OutputPluginCreator extends AbstractPluginCreator { + + OutputPluginCreator(PluginFactoryExt pluginsFactory) { + this.pluginsFactory = pluginsFactory; + } + + @Override + public IRubyObject createDelegator(String name, Map pluginArgs, String id, + AbstractNamespacedMetricExt typeScopedMetric, + PluginLookup.PluginClass pluginClass, Context pluginContext) { + Output output = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass); + return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java new file mode 100644 index 000000000..a226e57d5 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginFactoryExt.java @@ -0,0 +1,261 @@ +package org.logstash.plugins.factory; + +import co.elastic.logstash.api.*; +import org.jruby.*; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.common.EnvironmentVariableProvider; +import org.logstash.common.SourceWithMetadata; +import org.logstash.config.ir.PipelineIR; +import org.logstash.config.ir.compiler.*; +import org.logstash.config.ir.graph.Vertex; +import org.logstash.execution.ExecutionContextExt; +import org.logstash.instrument.metrics.AbstractMetricExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.instrument.metrics.MetricKeys; +import org.logstash.plugins.ConfigVariableExpander; +import org.logstash.plugins.PluginLookup; + +import java.util.*; + +@JRubyClass(name = "PluginFactory") +public final class PluginFactoryExt extends RubyBasicObject + implements RubyIntegration.PluginFactory { + + @FunctionalInterface + public interface PluginResolver { + PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name); + } + + private static final long serialVersionUID = 1L; + + private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id"); + + private final Collection pluginsById = new HashSet<>(); + + private PipelineIR lir; + + private ExecutionContextFactoryExt executionContextFactory; + + private PluginMetricsFactoryExt metrics; + + private RubyClass filterClass; + + private ConfigVariableExpander configVariables; + + private PluginResolver pluginResolver; + + private Map> pluginCreatorsRegistry = new HashMap<>(4); + + @JRubyMethod(name = "filter_delegator", meta = true, required = 5) + public static IRubyObject filterDelegator(final ThreadContext context, + final IRubyObject recv, final IRubyObject... args) { + final RubyHash arguments = (RubyHash) args[2]; + final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments); + final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY); + filterInstance.callMethod( + context, "metric=", + ((AbstractMetricExt) args[3]).namespace(context, id.intern()) + ); + filterInstance.callMethod(context, "execution_context=", args[4]); + return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS) + .initialize(context, filterInstance, id); + } + + public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) { + this(runtime, metaClass, PluginLookup::lookup); + } + + PluginFactoryExt(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) { + super(runtime, metaClass); + this.pluginResolver = pluginResolver; + } + + @JRubyMethod(required = 4) + public PluginFactoryExt initialize(final ThreadContext context, + final IRubyObject[] args) { + return init( + args[0].toJava(PipelineIR.class), + (PluginMetricsFactoryExt) args[1], (ExecutionContextFactoryExt) args[2], + (RubyClass) args[3] + ); + } + + public PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, + final ExecutionContextFactoryExt executionContextFactoryExt, + final RubyClass filterClass) { + return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider()); + } + + PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics, + final ExecutionContextFactoryExt executionContextFactoryExt, + final RubyClass filterClass, + final EnvironmentVariableProvider envVars) { + this.lir = lir; + this.metrics = metrics; + this.executionContextFactory = executionContextFactoryExt; + this.filterClass = filterClass; + this.pluginCreatorsRegistry.put(PluginLookup.PluginType.INPUT, new InputPluginCreator(this)); + this.pluginCreatorsRegistry.put(PluginLookup.PluginType.CODEC, new CodecPluginCreator()); + this.pluginCreatorsRegistry.put(PluginLookup.PluginType.FILTER, new FilterPluginCreator()); + this.pluginCreatorsRegistry.put(PluginLookup.PluginType.OUTPUT, new OutputPluginCreator(this)); + this.configVariables = ConfigVariableExpander.withoutSecret(envVars); + return this; + } + + @SuppressWarnings("unchecked") + @Override + public IRubyObject buildInput(final RubyString name, SourceWithMetadata source, + final IRubyObject args, Map pluginArgs) { + return plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(), + source, (Map) args, pluginArgs + ); + } + + @SuppressWarnings("unchecked") + @Override + public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source, + final IRubyObject args, Map pluginArgs) { + return (AbstractOutputDelegatorExt) plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(), + source, (Map) args, pluginArgs + ); + } + + @SuppressWarnings("unchecked") + @Override + public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source, + final IRubyObject args, Map pluginArgs) { + return (AbstractFilterDelegatorExt) plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(), + source, (Map) args, pluginArgs + ); + } + + @SuppressWarnings("unchecked") + @Override + public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args, + Map pluginArgs) { + return plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, + name.asJavaString(), source, (Map) args, pluginArgs + ); + } + + @Override + public Codec buildDefaultCodec(String codecName) { + return (Codec) JavaUtil.unwrapJavaValue(plugin( + RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC, + codecName, null, Collections.emptyMap(), Collections.emptyMap() + )); + } + + @SuppressWarnings("unchecked") + @JRubyMethod(required = 3, optional = 1) + public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) { + return plugin( + context, + PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)), + args[1].asJavaString(), + JavaUtil.unwrapIfJavaObject(args[2]), + args.length > 3 ? (Map) args[3] : new HashMap<>(), + null + ); + } + + @SuppressWarnings("unchecked") + private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name, + SourceWithMetadata source, final Map args, + Map pluginArgs) { + final String id = generateOrRetrievePluginId(context, type, name, source); + pluginsById.add(id); + final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel()); + + final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); + if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) { + + final Map newArgs = new HashMap<>(args); + newArgs.put("id", id); + final RubyClass klass = (RubyClass) pluginClass.klass(); + final ExecutionContextExt executionCntx = executionContextFactory.create( + context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name") + ); + final RubyHash rubyArgs = RubyHash.newHash(context.runtime); + rubyArgs.putAll(newArgs); + if (type == PluginLookup.PluginType.OUTPUT) { + return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize( + context, + new IRubyObject[]{ + klass, typeScopedMetric, executionCntx, + OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null), + rubyArgs + } + ); + } else if (type == PluginLookup.PluginType.FILTER) { + return filterDelegator( + context, null, + filterClass, klass, rubyArgs, typeScopedMetric, executionCntx); + } else { + final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs); + final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id)); + scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name")); + pluginInstance.callMethod(context, "metric=", scopedMetric); + pluginInstance.callMethod(context, "execution_context=", executionCntx); + return pluginInstance; + } + } else { + if (pluginArgs == null) { + String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." + + " The Java execution engine is required to run Java plugins.", name); + throw new IllegalStateException(err); + } + + AbstractPluginCreator pluginCreator = pluginCreatorsRegistry.get(type); + if (pluginCreator == null) { + throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString()); + } + + Context contextWithMetrics = executionContextFactory.toContext(type, metrics.getRoot(context)); + return pluginCreator.createDelegator(name, pluginArgs, id, typeScopedMetric, pluginClass, contextWithMetrics); + } + } + + private String generateOrRetrievePluginId(ThreadContext context, PluginLookup.PluginType type, String name, + SourceWithMetadata source) { + final String id; + if (type == PluginLookup.PluginType.CODEC) { + id = UUID.randomUUID().toString(); + } else { + String unresolvedId = lir.getGraph().vertices() + .filter(v -> v.getSourceWithMetadata() != null + && v.getSourceWithMetadata().equalsWithoutText(source)) + .findFirst() + .map(Vertex::getId).orElse(null); + id = (String) configVariables.expand(unresolvedId); + } + if (id == null) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format( + "Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name + ) + ); + } + if (pluginsById.contains(id)) { + throw context.runtime.newRaiseException( + RubyUtil.CONFIGURATION_ERROR_CLASS, + String.format("Two plugins have the id '%s', please fix this conflict", id) + ); + } + return id; + } + + ExecutionContextFactoryExt getExecutionContextFactory() { + return executionContextFactory; + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/factory/PluginMetricsFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginMetricsFactoryExt.java new file mode 100644 index 000000000..18a8aab33 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/factory/PluginMetricsFactoryExt.java @@ -0,0 +1,61 @@ +package org.logstash.plugins.factory; + +import org.jruby.*; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.instrument.metrics.AbstractMetricExt; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.instrument.metrics.MetricKeys; +import org.logstash.instrument.metrics.NullMetricExt; + +import java.util.Arrays; + +@JRubyClass(name = "PluginMetricsFactory") +public final class PluginMetricsFactoryExt extends RubyBasicObject { + + private static final long serialVersionUID = 1L; + + private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins"); + + private RubySymbol pipelineId; + + private AbstractMetricExt metric; + + public PluginMetricsFactoryExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public PluginMetricsFactoryExt initialize(final ThreadContext context, + final IRubyObject pipelineId, final IRubyObject metrics) { + this.pipelineId = pipelineId.convertToString().intern(); + if (metrics.isNil()) { + this.metric = new NullMetricExt(context.runtime, RubyUtil.NULL_METRIC_CLASS); + } else { + this.metric = (AbstractMetricExt) metrics; + } + return this; + } + + AbstractNamespacedMetricExt getRoot(final ThreadContext context) { + return metric.namespace( + context, + RubyArray.newArray( + context.runtime, + Arrays.asList( + MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS + ) + ) + ); + } + + @JRubyMethod + public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) { + return getRoot(context).namespace( + context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString())) + ); + } +} diff --git a/logstash-core/src/test/java/org/logstash/plugins/MetricTestCase.java b/logstash-core/src/test/java/org/logstash/plugins/MetricTestCase.java index 2aad692a4..fbf92213a 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/MetricTestCase.java +++ b/logstash-core/src/test/java/org/logstash/plugins/MetricTestCase.java @@ -51,7 +51,7 @@ public abstract class MetricTestCase extends RubyEnvTestCase { executionContext = new ExecutionContextExt(RUBY, EXECUTION_CONTEXT_CLASS); } - protected static IRubyObject runRubyScript(String script) { + public static IRubyObject runRubyScript(String script) { IRubyObject m = RUBY.evalScriptlet(script); return m; } diff --git a/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java similarity index 85% rename from logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java rename to logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java index f97c21d1b..ded3049a6 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/factory/PluginFactoryExtTest.java @@ -17,8 +17,7 @@ * under the License. */ - -package org.logstash.plugins; +package org.logstash.plugins.factory; import co.elastic.logstash.api.*; import org.jruby.RubyHash; @@ -32,6 +31,8 @@ import org.logstash.config.ir.InvalidIRException; import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.RubyEnvTestCase; import org.logstash.instrument.metrics.NamespacedMetricExt; +import org.logstash.plugins.MetricTestCase; +import org.logstash.plugins.PluginLookup; import java.util.Collection; import java.util.Collections; @@ -42,10 +43,9 @@ import java.util.function.Consumer; import static org.junit.Assert.assertEquals; import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS; import static org.logstash.RubyUtil.RUBY; -import static org.logstash.plugins.MetricTestCase.runRubyScript; /** - * Tests for {@link PluginFactoryExt.Plugins}. + * Tests for {@link PluginFactoryExt}. */ public final class PluginFactoryExtTest extends RubyEnvTestCase { @@ -87,11 +87,11 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase { SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("proto", "path", 1, 8, "input {mockinput{ id => \"${CUSTOM}\"}} output{mockoutput{}}"); final PipelineIR pipelineIR = compilePipeline(sourceWithMetadata); - PluginFactoryExt.Metrics metricsFactory = createMetricsFactory(); - PluginFactoryExt.ExecutionContext execContextFactory = createExecutionContextFactory(); + PluginMetricsFactoryExt metricsFactory = createMetricsFactory(); + ExecutionContextFactoryExt execContextFactory = createExecutionContextFactory(); Map envVars = new HashMap<>(); envVars.put("CUSTOM", "test"); - PluginFactoryExt.Plugins sut = new PluginFactoryExt.Plugins(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS, + PluginFactoryExt sut = new PluginFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS, mockPluginResolver); sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get); @@ -110,24 +110,24 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase { return ConfigCompiler.configToPipelineIR(Collections.singletonList(sourceWithMetadata), false); } - private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() { - PluginFactoryExt.ExecutionContext execContextFactory = new PluginFactoryExt.ExecutionContext(RubyUtil.RUBY, + private static ExecutionContextFactoryExt createExecutionContextFactory() { + ExecutionContextFactoryExt execContextFactory = new ExecutionContextFactoryExt(RubyUtil.RUBY, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS); execContextFactory.initialize(RubyUtil.RUBY.getCurrentContext(), null, null, RubyUtil.RUBY.newString("no DLQ")); return execContextFactory; } - private static PluginFactoryExt.Metrics createMetricsFactory() { + private static PluginMetricsFactoryExt createMetricsFactory() { final IRubyObject metricWithCollector = - runRubyScript("require \"logstash/instrument/collector\"\n" + + MetricTestCase.runRubyScript("require \"logstash/instrument/collector\"\n" + "metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)"); NamespacedMetricExt metric = new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS) .initialize(RUBY.getCurrentContext(), metricWithCollector, RUBY.newEmptyArray()); - PluginFactoryExt.Metrics metricsFactory = new PluginFactoryExt.Metrics(RubyUtil.RUBY, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS); + PluginMetricsFactoryExt metricsFactory = new PluginMetricsFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS); metricsFactory.initialize(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newString("main"), metric); return metricsFactory; }