From 2375ae1b171370f77363d709def66e8cbe6d5ec5 Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Thu, 1 Aug 2019 16:58:03 -0500 Subject: [PATCH] Expand config variables for Java plugins Fixes #11043 --- .../common/EnvironmentVariableProvider.java | 10 ++ .../logstash/config/ir/CompiledPipeline.java | 75 ++++++++++--- .../execution/AbstractPipelineExt.java | 18 +++ .../execution/JavaBasePipelineExt.java | 3 +- .../plugins/ConfigVariableExpander.java | 83 ++++++++++++++ .../common/ConfigVariableExpanderTest.java | 106 ++++++++++++++++++ .../secret/store/SecretStoreFactoryTest.java | 5 +- qa/integration/specs/secret_store_spec.rb | 13 --- 8 files changed, 281 insertions(+), 32 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/common/EnvironmentVariableProvider.java create mode 100644 logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java create mode 100644 logstash-core/src/test/java/org/logstash/common/ConfigVariableExpanderTest.java diff --git a/logstash-core/src/main/java/org/logstash/common/EnvironmentVariableProvider.java b/logstash-core/src/main/java/org/logstash/common/EnvironmentVariableProvider.java new file mode 100644 index 000000000..b576cdf3e --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/common/EnvironmentVariableProvider.java @@ -0,0 +1,10 @@ +package org.logstash.common; + +public interface EnvironmentVariableProvider { + + String get(String key); + + static EnvironmentVariableProvider defaultProvider() { + return System::getenv; + } +} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 9cc2546ab..612865c8f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -8,6 +8,7 @@ import org.jruby.javasupport.JavaUtil; import org.jruby.runtime.builtin.IRubyObject; import org.logstash.RubyUtil; import org.logstash.Rubyfier; +import org.logstash.common.EnvironmentVariableProvider; import org.logstash.common.SourceWithMetadata; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; @@ -22,11 +23,15 @@ import org.logstash.config.ir.graph.PluginVertex; import org.logstash.config.ir.graph.Vertex; import org.logstash.config.ir.imperative.PluginStatement; import org.logstash.ext.JrubyEventExtLibrary; +import org.logstash.plugins.ConfigVariableExpander; +import org.logstash.secret.store.SecretStore; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -73,13 +78,27 @@ public final class CompiledPipeline { */ private final RubyIntegration.PluginFactory pluginFactory; - public CompiledPipeline(final PipelineIR pipelineIR, - final RubyIntegration.PluginFactory pluginFactory) { + public CompiledPipeline( + final PipelineIR pipelineIR, + final RubyIntegration.PluginFactory pluginFactory) { + this(pipelineIR, pluginFactory, null); + } + + public CompiledPipeline( + final PipelineIR pipelineIR, + final RubyIntegration.PluginFactory pluginFactory, + final SecretStore secretStore) { this.pipelineIR = pipelineIR; this.pluginFactory = pluginFactory; - inputs = setupInputs(); - filters = setupFilters(); - outputs = setupOutputs(); + try (ConfigVariableExpander cve = new ConfigVariableExpander( + secretStore, + EnvironmentVariableProvider.defaultProvider())) { + inputs = setupInputs(cve); + filters = setupFilters(cve); + outputs = setupOutputs(cve); + } catch (Exception e) { + throw new IllegalStateException("Unable to configure plugins: " + e.getMessage()); + } } public Collection outputs() { @@ -106,7 +125,7 @@ public final class CompiledPipeline { /** * Sets up all outputs learned from {@link PipelineIR}. */ - private Map setupOutputs() { + private Map setupOutputs(ConfigVariableExpander cve) { final Collection outs = pipelineIR.getOutputPluginVertices(); final Map res = new HashMap<>(outs.size()); outs.forEach(v -> { @@ -114,7 +133,7 @@ public final class CompiledPipeline { final SourceWithMetadata source = v.getSourceWithMetadata(); res.put(v.getId(), pluginFactory.buildOutput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def) + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve) )); }); return res; @@ -123,7 +142,7 @@ public final class CompiledPipeline { /** * Sets up all Ruby filters learnt from {@link PipelineIR}. */ - private Map setupFilters() { + private Map setupFilters(ConfigVariableExpander cve) { final Collection filterPlugins = pipelineIR.getFilterPluginVertices(); final Map res = new HashMap<>(filterPlugins.size(), 1.0F); @@ -132,7 +151,7 @@ public final class CompiledPipeline { final SourceWithMetadata source = vertex.getSourceWithMetadata(); res.put(vertex.getId(), pluginFactory.buildFilter( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def) + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve) )); } return res; @@ -141,7 +160,7 @@ public final class CompiledPipeline { /** * Sets up all Ruby inputs learnt from {@link PipelineIR}. */ - private Collection setupInputs() { + private Collection setupInputs(ConfigVariableExpander cve) { final Collection vertices = pipelineIR.getInputPluginVertices(); final Collection nodes = new HashSet<>(vertices.size()); vertices.forEach(v -> { @@ -149,7 +168,7 @@ public final class CompiledPipeline { final SourceWithMetadata source = v.getSourceWithMetadata(); IRubyObject o = pluginFactory.buildInput( RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()), - RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)); + RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)); nodes.add(o); }); return nodes; @@ -190,23 +209,47 @@ public final class CompiledPipeline { * @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory} * methods that create Java plugins */ - private Map convertJavaArgs(final PluginDefinition def) { - for (final Map.Entry entry : def.getArguments().entrySet()) { + private Map convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) { + Map args = expandConfigVariables(cve, def.getArguments()); + for (final Map.Entry entry : args.entrySet()) { final Object value = entry.getValue(); final String key = entry.getKey(); final IRubyObject toput; if (value instanceof PluginStatement) { final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition(); + Map codecArgs = expandConfigVariables(cve, codec.getArguments()); toput = pluginFactory.buildCodec( RubyUtil.RUBY.newString(codec.getName()), Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()), - codec.getArguments() + codecArgs ); Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput); - def.getArguments().put(key, javaCodec); + args.put(key, javaCodec); } } - return def.getArguments(); + return args; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private Map expandConfigVariables(ConfigVariableExpander cve, Map configArgs) { + Map expandedConfig = new HashMap<>(); + for (Map.Entry e : configArgs.entrySet()) { + if (e.getValue() instanceof List) { + List list = (List) e.getValue(); + List expandedObjects = new ArrayList<>(); + for (Object o : list) { + expandedObjects.add(cve.expand(o)); + } + expandedConfig.put(e.getKey(), expandedObjects); + } else if (e.getValue() instanceof Map) { + expandedConfig.put(e.getKey(), expandConfigVariables(cve, (Map) e.getValue())); + } else if (e.getValue() instanceof String) { + expandedConfig.put(e.getKey(), cve.expand(e.getValue())); + } else { + expandedConfig.put(e.getKey(), e.getValue()); + } + } + return expandedConfig; } /** diff --git a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java index e96f0396d..2c6cd93f6 100644 --- a/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/AbstractPipelineExt.java @@ -37,6 +37,8 @@ import org.logstash.instrument.metrics.AbstractMetricExt; import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; import org.logstash.instrument.metrics.MetricKeys; import org.logstash.instrument.metrics.NullMetricExt; +import org.logstash.secret.store.SecretStore; +import org.logstash.secret.store.SecretStoreExt; @JRubyClass(name = "AbstractPipeline") public class AbstractPipelineExt extends RubyBasicObject { @@ -366,6 +368,22 @@ public class AbstractPipelineExt extends RubyBasicObject { return settings.callMethod(context, "get_value", context.runtime.newString(name)); } + protected final boolean hasSetting(final ThreadContext context, final String name) { + return settings.callMethod(context, "registered?", context.runtime.newString(name)) == context.tru; + } + + protected SecretStore getSecretStore(final ThreadContext context) { + String keystoreFile = hasSetting(context, "keystore.file") + ? getSetting(context, "keystore.file").asJavaString() + : null; + String keystoreClassname = hasSetting(context, "keystore.classname") + ? getSetting(context, "keystore.classname").asJavaString() + : null; + return (keystoreFile != null && keystoreClassname != null) + ? SecretStoreExt.getIfExists(keystoreFile, keystoreClassname) + : null; + } + private AbstractNamespacedMetricExt getDlqMetric(final ThreadContext context) { if (dlqMetric == null) { dlqMetric = metric.namespace( diff --git a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java index f8ac46672..16e576dd0 100644 --- a/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java +++ b/logstash-core/src/main/java/org/logstash/execution/JavaBasePipelineExt.java @@ -56,7 +56,8 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt { context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS ).initialize(context, args[3], this, dlqWriter(context)), RubyUtil.FILTER_DELEGATOR_CLASS - ) + ), + getSecretStore(context) ); inputs = RubyArray.newArray(context.runtime, lirExecution.inputs()); filters = RubyArray.newArray(context.runtime, lirExecution.filters()); diff --git a/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java new file mode 100644 index 000000000..c143baccd --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/plugins/ConfigVariableExpander.java @@ -0,0 +1,83 @@ +package org.logstash.plugins; + +import org.logstash.common.EnvironmentVariableProvider; +import org.logstash.secret.SecretIdentifier; +import org.logstash.secret.store.SecretStore; + +import java.nio.charset.StandardCharsets; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ConfigVariableExpander implements AutoCloseable { + + private static String SUBSTITUTION_PLACEHOLDER_REGEX = "\\$\\{(?[a-zA-Z_.][a-zA-Z0-9_.]*)(:(?[^}]*))?}"; + + private Pattern substitutionPattern = Pattern.compile(SUBSTITUTION_PLACEHOLDER_REGEX); + private SecretStore secretStore; + private EnvironmentVariableProvider envVarProvider; + + public ConfigVariableExpander(SecretStore secretStore, EnvironmentVariableProvider envVarProvider) { + this.secretStore = secretStore; + this.envVarProvider = envVarProvider; + } + + /** + * Replace all substitution variable references in @variable and returns the substituted value, or the + * original value if a substitution cannot be made. + * + * Process following patterns : ${VAR}, ${VAR:defaultValue} + * + * If value matches the pattern, the following precedence applies: + * Secret store value + * Environment entry value + * Default value if provided in the pattern + * Exception raised + * + * If the value does not match the pattern, the 'value' param returns as-is + */ + public Object expand(Object value) { + String variable; + if (value instanceof String) { + variable = (String) value; + } else { + return value; + } + + Matcher m = substitutionPattern.matcher(variable); + if (m.matches()) { + String variableName = m.group("name"); + + if (secretStore != null) { + byte[] ssValue = secretStore.retrieveSecret(new SecretIdentifier(variableName)); + if (ssValue != null) { + return new String(ssValue, StandardCharsets.UTF_8); + } + } + + if (envVarProvider != null) { + String evValue = envVarProvider.get(variableName); + if (evValue != null) { + return evValue; + } + } + + String defaultValue = m.group("default"); + if (defaultValue != null) { + return defaultValue; + } + + throw new IllegalStateException(String.format( + "Cannot evaluate `%s`. Replacement variable `%s` is not defined in a Logstash " + + "secret store or an environment entry and there is no default value given.", + variable, variableName)); + } else { + return variable; + } + } + + @Override + public void close() { + // most keystore implementations will have close() methods + + } +} diff --git a/logstash-core/src/test/java/org/logstash/common/ConfigVariableExpanderTest.java b/logstash-core/src/test/java/org/logstash/common/ConfigVariableExpanderTest.java new file mode 100644 index 000000000..d6b261373 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/ConfigVariableExpanderTest.java @@ -0,0 +1,106 @@ +package org.logstash.common; + +import org.junit.Assert; +import org.junit.Test; +import org.logstash.plugins.ConfigVariableExpander; +import org.logstash.secret.SecretIdentifier; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; + +import static org.logstash.secret.store.SecretStoreFactoryTest.MemoryStore; + +public class ConfigVariableExpanderTest { + + @Test + public void testNonStringValueReturnsUnchanged() { + long nonStringValue = 42L; + ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap()); + Object expandedValue = cve.expand(nonStringValue); + Assert.assertEquals(nonStringValue, expandedValue); + } + + @Test + public void testExpansionWithoutVariable() throws Exception { + String key = "foo"; + ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap()); + String expandedValue = (String) cve.expand(key); + Assert.assertEquals(key, expandedValue); + } + + @Test + public void testSimpleExpansion() throws Exception { + String key = "foo"; + String val = "bar"; + ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.singletonMap(key, val)); + + String expandedValue = (String) cve.expand("${" + key + "}"); + Assert.assertEquals(val, expandedValue); + } + + @Test + public void testExpansionWithDefaultValue() throws Exception { + String key = "foo"; + String val = "bar"; + String defaultValue = "baz"; + ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap()); + + String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}"); + Assert.assertEquals(defaultValue, expandedValue); + } + + @Test + public void testExpansionWithoutValueThrows() throws Exception { + String key = "foo"; + ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap()); + + try { + String expandedValue = (String) cve.expand("${" + key + "}"); + Assert.fail("Exception should have been thrown"); + } catch (IllegalStateException ise) { + Assert.assertTrue(ise.getMessage().startsWith("Cannot evaluate")); + } + } + + @Test + public void testPrecedenceOfSecretStoreValue() throws Exception { + String key = "foo"; + String ssVal = "ssbar"; + String evVal = "evbar"; + String defaultValue = "defaultbar"; + ConfigVariableExpander cve = getFakeCve( + Collections.singletonMap(key, ssVal), + Collections.singletonMap(key, evVal)); + + String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}"); + Assert.assertEquals(ssVal, expandedValue); + } + + @Test + public void testPrecedenceOfEnvironmentVariableValue() throws Exception { + String key = "foo"; + String evVal = "evbar"; + String defaultValue = "defaultbar"; + ConfigVariableExpander cve = getFakeCve( + Collections.emptyMap(), + Collections.singletonMap(key, evVal)); + + String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}"); + Assert.assertEquals(evVal, expandedValue); + } + + private static ConfigVariableExpander getFakeCve( + final Map ssValues, final Map envVarValues) { + + MemoryStore ms = new MemoryStore(); + for (Map.Entry e : ssValues.entrySet()) { + if (e.getValue() instanceof String) { + ms.persistSecret(new SecretIdentifier(e.getKey()), + ((String) e.getValue()).getBytes(StandardCharsets.UTF_8)); + } + } + return new ConfigVariableExpander(ms, envVarValues::get); + } + +} diff --git a/logstash-core/src/test/java/org/logstash/secret/store/SecretStoreFactoryTest.java b/logstash-core/src/test/java/org/logstash/secret/store/SecretStoreFactoryTest.java index c753da617..b391757cf 100644 --- a/logstash-core/src/test/java/org/logstash/secret/store/SecretStoreFactoryTest.java +++ b/logstash-core/src/test/java/org/logstash/secret/store/SecretStoreFactoryTest.java @@ -133,7 +133,7 @@ public class SecretStoreFactoryTest { /** * Valid alternate implementation */ - static class MemoryStore implements SecretStore { + public static class MemoryStore implements SecretStore { Map secrets = new HashMap(1); @@ -178,7 +178,8 @@ public class SecretStoreFactoryTest { @Override public byte[] retrieveSecret(SecretIdentifier id) { - return secrets.get(id).array(); + ByteBuffer bb = secrets.get(id); + return bb != null ? bb.array() : null; } } diff --git a/qa/integration/specs/secret_store_spec.rb b/qa/integration/specs/secret_store_spec.rb index 5f2d154c5..111e6c687 100644 --- a/qa/integration/specs/secret_store_spec.rb +++ b/qa/integration/specs/secret_store_spec.rb @@ -74,19 +74,6 @@ describe "Test that Logstash" do end end - context "will start" do - let(:settings) {{"pipeline.id" => "main"}} - it "with the wrong password and variables are NOT in settings" do - test_env["LOGSTASH_KEYSTORE_PASS"] = "WRONG_PASSWRD" - @logstash.env_variables = test_env - @logstash.spawn_logstash("-e", "input {generator { count => 1 }} output { }", "--path.settings", settings_dir) - try(num_retries) do - expect(@logstash.exited?).to be(true) - end - expect(@logstash.exit_code).to be(0) - end - end - context "won't start " do let(:settings) {{"pipeline.id" => "${missing}"}} it "with correct password, but invalid variable " do