From 496f81c8ca48eba8228c90f4d41c5f3b01ca22f9 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 10 Feb 2020 12:30:36 +0100 Subject: [PATCH] Changed PluginFactory to resolve id field with environment variables docs: plugin ids variable expansion cannot use secret store closes 10546 Fixes #11592 --- docs/include/filter.asciidoc | 2 + docs/include/input.asciidoc | 2 + docs/include/output.asciidoc | 2 + docs/static/keystore.asciidoc | 5 +- logstash-core/build.gradle | 2 + .../logstash/config/ir/CompiledPipeline.java | 1 - .../plugins/ConfigVariableExpander.java | 7 + .../logstash/plugins/PluginFactoryExt.java | 28 +++- .../plugins/PluginFactoryExtTest.java | 139 ++++++++++++++++++ 9 files changed, 184 insertions(+), 4 deletions(-) create mode 100644 logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java diff --git a/docs/include/filter.asciidoc b/docs/include/filter.asciidoc index 12b9596b0..b749ceefb 100644 --- a/docs/include/filter.asciidoc +++ b/docs/include/filter.asciidoc @@ -146,6 +146,8 @@ Adding a named ID in this case will help in monitoring Logstash when using the m } } +NOTE: Variable substitution in the `id` field only supports environment variables + and does not support the use of values from the secret store. ifeval::["{versioned_docs}"!="true"] [id="plugins-{type}s-{plugin}-periodic_flush"] diff --git a/docs/include/input.asciidoc b/docs/include/input.asciidoc index 83d7734dd..d0b9b5cd0 100644 --- a/docs/include/input.asciidoc +++ b/docs/include/input.asciidoc @@ -112,6 +112,8 @@ input { } --------------------------------------------------------------------------------------------------- +NOTE: Variable substitution in the `id` field only supports environment variables + and does not support the use of values from the secret store. ifeval::["{versioned_docs}"!="true"] [id="plugins-{type}s-{plugin}-tags"] diff --git a/docs/include/output.asciidoc b/docs/include/output.asciidoc index e546ce8ab..5d8090645 100644 --- a/docs/include/output.asciidoc +++ b/docs/include/output.asciidoc @@ -89,4 +89,6 @@ output { } --------------------------------------------------------------------------------------------------- +NOTE: Variable substitution in the `id` field only supports environment variables + and does not support the use of values from the secret store. diff --git a/docs/static/keystore.asciidoc b/docs/static/keystore.asciidoc index d40b5a07f..b3fc3decb 100644 --- a/docs/static/keystore.asciidoc +++ b/docs/static/keystore.asciidoc @@ -25,7 +25,10 @@ value `yourelasticsearchpassword`: Notice that the Logstash keystore differs from the Elasticsearch keystore. Whereas the Elasticsearch keystore lets you store `elasticsearch.yml` values by name, the Logstash keystore lets you specify arbitrary names that you -can reference in the Logstash configuration. +can reference in the Logstash configuration. + +NOTE: There are some configuration fields that have no secret meaning, so not every field could leverage +the secret store for variables substitution. Plugin's `id` field is a field of this kind NOTE: Referencing keystore data from `pipelines.yml` or the command line (`-e`) is not currently supported. diff --git a/logstash-core/build.gradle b/logstash-core/build.gradle index a5ec6a057..f327b8d34 100644 --- a/logstash-core/build.gradle +++ b/logstash-core/build.gradle @@ -74,6 +74,7 @@ task javaTests(type: Test) { exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' exclude '/org/logstash/plugins/NamespacedMetricImplTest.class' exclude '/org/logstash/plugins/CounterMetricImplTest.class' + exclude '/org/logstash/plugins/PluginFactoryExtTest.class' } task rubyTests(type: Test) { @@ -88,6 +89,7 @@ task rubyTests(type: Test) { include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class' include '/org/logstash/plugins/NamespacedMetricImplTest.class' include '/org/logstash/plugins/CounterMetricImplTest.class' + include '/org/logstash/plugins/PluginFactoryExtTest.class' } test { diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 207aa8a99..e919facac 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -34,7 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; diff --git a/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java index e5732b500..c664717a4 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java +++ b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java @@ -16,6 +16,13 @@ public class ConfigVariableExpander implements AutoCloseable { private SecretStore secretStore; private EnvironmentVariableProvider envVarProvider; + /** + * Creates a ConfigVariableExpander that doesn't lookup any secreted placeholder. + * */ + static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) { + return new ConfigVariableExpander(null, envVarProvider); + } + public ConfigVariableExpander(SecretStore secretStore, EnvironmentVariableProvider envVarProvider) { this.secretStore = secretStore; this.envVarProvider = envVarProvider; diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java index 7f36421fa..0e487e616 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java @@ -22,6 +22,7 @@ import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; import org.logstash.common.AbstractDeadLetterQueueWriterExt; import org.logstash.common.DLQWriterAdapter; +import org.logstash.common.EnvironmentVariableProvider; import org.logstash.common.NullDeadLetterQueueWriter; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.PipelineIR; @@ -56,6 +57,11 @@ import java.util.UUID; public final class PluginFactoryExt { + @FunctionalInterface + public interface PluginResolver { + PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name); + } + @JRubyClass(name = "PluginFactory") public static final class Plugins extends RubyBasicObject implements RubyIntegration.PluginFactory { @@ -74,6 +80,10 @@ public final class PluginFactoryExt { private RubyClass filterClass; + private ConfigVariableExpander configVariables; + + private PluginResolver pluginResolver; + @JRubyMethod(name = "filter_delegator", meta = true, required = 5) public static IRubyObject filterDelegator(final ThreadContext context, final IRubyObject recv, final IRubyObject[] args) { @@ -90,7 +100,12 @@ public final class PluginFactoryExt { } public Plugins(final Ruby runtime, final RubyClass metaClass) { + this(runtime, metaClass, PluginLookup::lookup); + } + + Plugins(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) { super(runtime, metaClass); + this.pluginResolver = pluginResolver; } @JRubyMethod(required = 4) @@ -106,10 +121,18 @@ public final class PluginFactoryExt { public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics, final PluginFactoryExt.ExecutionContext executionContext, final RubyClass filterClass) { + return this.init(lir, metrics, executionContext, filterClass, EnvironmentVariableProvider.defaultProvider()); + } + + PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics, + final PluginFactoryExt.ExecutionContext executionContext, + final RubyClass filterClass, + final EnvironmentVariableProvider envVars) { this.lir = lir; this.metrics = metrics; this.executionContext = executionContext; this.filterClass = filterClass; + this.configVariables = ConfigVariableExpander.withoutSecret(envVars); return this; } @@ -178,16 +201,17 @@ public final class PluginFactoryExt { SourceWithMetadata source, final Map args, Map pluginArgs) { final String id; - final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name); + final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name); if (type == PluginLookup.PluginType.CODEC) { id = UUID.randomUUID().toString(); } else { - id = lir.getGraph().vertices() + String unresolvedId = lir.getGraph().vertices() .filter(v -> v.getSourceWithMetadata() != null && v.getSourceWithMetadata().equalsWithoutText(source)) .findFirst() .map(Vertex::getId).orElse(null); + id = (String) configVariables.expand(unresolvedId); } if (id == null) { throw context.runtime.newRaiseException( diff --git a/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java b/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java new file mode 100644 index 000000000..a751bb392 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/plugins/PluginFactoryExtTest.java @@ -0,0 +1,139 @@ +package org.logstash.plugins; + +import co.elastic.logstash.api.*; +import org.jruby.RubyArray; +import org.jruby.RubyHash; +import org.jruby.RubyString; +import org.jruby.javasupport.JavaUtil; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Test; +import org.logstash.RubyUtil; +import org.logstash.common.IncompleteSourceWithMetadataException; +import org.logstash.common.SourceWithMetadata; +import org.logstash.config.ir.ConfigCompiler; +import org.logstash.config.ir.PipelineIR; +import org.logstash.config.ir.RubyEnvTestCase; +import org.logstash.instrument.metrics.NamespacedMetricExt; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; + +import static org.junit.Assert.assertEquals; +import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS; +import static org.logstash.RubyUtil.RUBY; +import static org.logstash.plugins.MetricTestCase.runRubyScript; + +/** + * Tests for {@link PluginFactoryExt.Plugins}. + */ +public final class PluginFactoryExtTest extends RubyEnvTestCase { + + static class MockInputPlugin implements Input { + private final String id; + + @SuppressWarnings("unused") + public MockInputPlugin(String id, Configuration config, Context ctx) { + this.id = id; + } + + @Override + public Collection> configSchema() { + return Collections.emptyList(); + } + + @Override + public String getId() { + return id; + } + + @Override + public void start(Consumer> writer) { + } + + @Override + public void stop() { + } + + @Override + public void awaitStop() throws InterruptedException { + } + } + + @Test + public void testPluginIdResolvedWithEnvironmentVariables() throws IncompleteSourceWithMetadataException { + PluginFactoryExt.PluginResolver mockPluginResolver = wrapWithSearchable(MockInputPlugin.class); + + SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("proto", "path", 1, 8, "input {mockinput{ id => \"${CUSTOM}\"}} output{mockoutput{}}"); + final PipelineIR pipelineIR = compilePipeline(sourceWithMetadata); + + PluginFactoryExt.Metrics metricsFactory = createMetricsFactory(); + PluginFactoryExt.ExecutionContext execContextFactory = createExecutionContextFactory(); + Map envVars = new HashMap<>(); + envVars.put("CUSTOM", "test"); + PluginFactoryExt.Plugins sut = new PluginFactoryExt.Plugins(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS, + mockPluginResolver); + sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get); + + RubyString pluginName = RubyUtil.RUBY.newString("mockinput"); + + // Exercise + IRubyObject pluginInstance = sut.buildInput(pluginName, sourceWithMetadata, RubyHash.newHash(RubyUtil.RUBY), + Collections.emptyMap()); + + //Verify + IRubyObject id = pluginInstance.callMethod(RUBY.getCurrentContext(), "id"); + assertEquals("Resolved config setting MUST be evaluated with substitution", envVars.get("CUSTOM"), id.toString()); + } + + @SuppressWarnings("rawtypes") + private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) { + RubyArray sourcesWithMetadata = RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby( + RubyUtil.RUBY, sourceWithMetadata)); + + return ConfigCompiler.configToPipelineIR(sourcesWithMetadata, false); + } + + private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() { + PluginFactoryExt.ExecutionContext execContextFactory = new PluginFactoryExt.ExecutionContext(RubyUtil.RUBY, + RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS); + execContextFactory.initialize(RubyUtil.RUBY.getCurrentContext(), null, null, + RubyUtil.RUBY.newString("no DLQ")); + return execContextFactory; + } + + private static PluginFactoryExt.Metrics createMetricsFactory() { + final IRubyObject metricWithCollector = + runRubyScript("require \"logstash/instrument/collector\"\n" + + "metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)"); + + NamespacedMetricExt metric = new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS) + .initialize(RUBY.getCurrentContext(), metricWithCollector, RUBY.newEmptyArray()); + + + PluginFactoryExt.Metrics metricsFactory = new PluginFactoryExt.Metrics(RubyUtil.RUBY, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS); + metricsFactory.initialize(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newString("main"), metric); + return metricsFactory; + } + + private static PluginFactoryExt.PluginResolver wrapWithSearchable(final Class pluginClass) { + return new PluginFactoryExt.PluginResolver() { + @Override + public PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name) { + return new PluginLookup.PluginClass() { + @Override + public PluginLookup.PluginLanguage language() { + return PluginLookup.PluginLanguage.JAVA; + } + + @Override + public Object klass() { + return pluginClass; + } + }; + } + }; + } +} \ No newline at end of file