mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
085d93650e
commit
2375ae1b17
8 changed files with 281 additions and 32 deletions
|
@ -0,0 +1,10 @@
|
|||
package org.logstash.common;
|
||||
|
||||
public interface EnvironmentVariableProvider {
|
||||
|
||||
String get(String key);
|
||||
|
||||
static EnvironmentVariableProvider defaultProvider() {
|
||||
return System::getenv;
|
||||
}
|
||||
}
|
|
@ -8,6 +8,7 @@ import org.jruby.javasupport.JavaUtil;
|
|||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.Rubyfier;
|
||||
import org.logstash.common.EnvironmentVariableProvider;
|
||||
import org.logstash.common.SourceWithMetadata;
|
||||
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
|
||||
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
|
||||
|
@ -22,11 +23,15 @@ import org.logstash.config.ir.graph.PluginVertex;
|
|||
import org.logstash.config.ir.graph.Vertex;
|
||||
import org.logstash.config.ir.imperative.PluginStatement;
|
||||
import org.logstash.ext.JrubyEventExtLibrary;
|
||||
import org.logstash.plugins.ConfigVariableExpander;
|
||||
import org.logstash.secret.store.SecretStore;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -73,13 +78,27 @@ public final class CompiledPipeline {
|
|||
*/
|
||||
private final RubyIntegration.PluginFactory pluginFactory;
|
||||
|
||||
public CompiledPipeline(final PipelineIR pipelineIR,
|
||||
final RubyIntegration.PluginFactory pluginFactory) {
|
||||
public CompiledPipeline(
|
||||
final PipelineIR pipelineIR,
|
||||
final RubyIntegration.PluginFactory pluginFactory) {
|
||||
this(pipelineIR, pluginFactory, null);
|
||||
}
|
||||
|
||||
public CompiledPipeline(
|
||||
final PipelineIR pipelineIR,
|
||||
final RubyIntegration.PluginFactory pluginFactory,
|
||||
final SecretStore secretStore) {
|
||||
this.pipelineIR = pipelineIR;
|
||||
this.pluginFactory = pluginFactory;
|
||||
inputs = setupInputs();
|
||||
filters = setupFilters();
|
||||
outputs = setupOutputs();
|
||||
try (ConfigVariableExpander cve = new ConfigVariableExpander(
|
||||
secretStore,
|
||||
EnvironmentVariableProvider.defaultProvider())) {
|
||||
inputs = setupInputs(cve);
|
||||
filters = setupFilters(cve);
|
||||
outputs = setupOutputs(cve);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public Collection<AbstractOutputDelegatorExt> outputs() {
|
||||
|
@ -106,7 +125,7 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Sets up all outputs learned from {@link PipelineIR}.
|
||||
*/
|
||||
private Map<String, AbstractOutputDelegatorExt> setupOutputs() {
|
||||
private Map<String, AbstractOutputDelegatorExt> setupOutputs(ConfigVariableExpander cve) {
|
||||
final Collection<PluginVertex> outs = pipelineIR.getOutputPluginVertices();
|
||||
final Map<String, AbstractOutputDelegatorExt> res = new HashMap<>(outs.size());
|
||||
outs.forEach(v -> {
|
||||
|
@ -114,7 +133,7 @@ public final class CompiledPipeline {
|
|||
final SourceWithMetadata source = v.getSourceWithMetadata();
|
||||
res.put(v.getId(), pluginFactory.buildOutput(
|
||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)
|
||||
));
|
||||
});
|
||||
return res;
|
||||
|
@ -123,7 +142,7 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Sets up all Ruby filters learnt from {@link PipelineIR}.
|
||||
*/
|
||||
private Map<String, AbstractFilterDelegatorExt> setupFilters() {
|
||||
private Map<String, AbstractFilterDelegatorExt> setupFilters(ConfigVariableExpander cve) {
|
||||
final Collection<PluginVertex> filterPlugins = pipelineIR.getFilterPluginVertices();
|
||||
final Map<String, AbstractFilterDelegatorExt> res = new HashMap<>(filterPlugins.size(), 1.0F);
|
||||
|
||||
|
@ -132,7 +151,7 @@ public final class CompiledPipeline {
|
|||
final SourceWithMetadata source = vertex.getSourceWithMetadata();
|
||||
res.put(vertex.getId(), pluginFactory.buildFilter(
|
||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def)
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve)
|
||||
));
|
||||
}
|
||||
return res;
|
||||
|
@ -141,7 +160,7 @@ public final class CompiledPipeline {
|
|||
/**
|
||||
* Sets up all Ruby inputs learnt from {@link PipelineIR}.
|
||||
*/
|
||||
private Collection<IRubyObject> setupInputs() {
|
||||
private Collection<IRubyObject> setupInputs(ConfigVariableExpander cve) {
|
||||
final Collection<PluginVertex> vertices = pipelineIR.getInputPluginVertices();
|
||||
final Collection<IRubyObject> nodes = new HashSet<>(vertices.size());
|
||||
vertices.forEach(v -> {
|
||||
|
@ -149,7 +168,7 @@ public final class CompiledPipeline {
|
|||
final SourceWithMetadata source = v.getSourceWithMetadata();
|
||||
IRubyObject o = pluginFactory.buildInput(
|
||||
RubyUtil.RUBY.newString(def.getName()), RubyUtil.RUBY.newFixnum(source.getLine()),
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def));
|
||||
RubyUtil.RUBY.newFixnum(source.getColumn()), convertArgs(def), convertJavaArgs(def, cve));
|
||||
nodes.add(o);
|
||||
});
|
||||
return nodes;
|
||||
|
@ -190,23 +209,47 @@ public final class CompiledPipeline {
|
|||
* @return Map of plugin arguments as understood by the {@link RubyIntegration.PluginFactory}
|
||||
* methods that create Java plugins
|
||||
*/
|
||||
private Map<String, Object> convertJavaArgs(final PluginDefinition def) {
|
||||
for (final Map.Entry<String, Object> entry : def.getArguments().entrySet()) {
|
||||
private Map<String, Object> convertJavaArgs(final PluginDefinition def, ConfigVariableExpander cve) {
|
||||
Map<String, Object> args = expandConfigVariables(cve, def.getArguments());
|
||||
for (final Map.Entry<String, Object> entry : args.entrySet()) {
|
||||
final Object value = entry.getValue();
|
||||
final String key = entry.getKey();
|
||||
final IRubyObject toput;
|
||||
if (value instanceof PluginStatement) {
|
||||
final PluginDefinition codec = ((PluginStatement) value).getPluginDefinition();
|
||||
Map<String, Object> codecArgs = expandConfigVariables(cve, codec.getArguments());
|
||||
toput = pluginFactory.buildCodec(
|
||||
RubyUtil.RUBY.newString(codec.getName()),
|
||||
Rubyfier.deep(RubyUtil.RUBY, codec.getArguments()),
|
||||
codec.getArguments()
|
||||
codecArgs
|
||||
);
|
||||
Codec javaCodec = (Codec)JavaUtil.unwrapJavaValue(toput);
|
||||
def.getArguments().put(key, javaCodec);
|
||||
args.put(key, javaCodec);
|
||||
}
|
||||
}
|
||||
return def.getArguments();
|
||||
return args;
|
||||
}
|
||||
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
private Map<String, Object> expandConfigVariables(ConfigVariableExpander cve, Map<String, Object> configArgs) {
|
||||
Map<String, Object> expandedConfig = new HashMap<>();
|
||||
for (Map.Entry<String, Object> e : configArgs.entrySet()) {
|
||||
if (e.getValue() instanceof List) {
|
||||
List list = (List) e.getValue();
|
||||
List<Object> expandedObjects = new ArrayList<>();
|
||||
for (Object o : list) {
|
||||
expandedObjects.add(cve.expand(o));
|
||||
}
|
||||
expandedConfig.put(e.getKey(), expandedObjects);
|
||||
} else if (e.getValue() instanceof Map) {
|
||||
expandedConfig.put(e.getKey(), expandConfigVariables(cve, (Map<String, Object>) e.getValue()));
|
||||
} else if (e.getValue() instanceof String) {
|
||||
expandedConfig.put(e.getKey(), cve.expand(e.getValue()));
|
||||
} else {
|
||||
expandedConfig.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
return expandedConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.logstash.instrument.metrics.AbstractMetricExt;
|
|||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||
import org.logstash.instrument.metrics.MetricKeys;
|
||||
import org.logstash.instrument.metrics.NullMetricExt;
|
||||
import org.logstash.secret.store.SecretStore;
|
||||
import org.logstash.secret.store.SecretStoreExt;
|
||||
|
||||
@JRubyClass(name = "AbstractPipeline")
|
||||
public class AbstractPipelineExt extends RubyBasicObject {
|
||||
|
@ -366,6 +368,22 @@ public class AbstractPipelineExt extends RubyBasicObject {
|
|||
return settings.callMethod(context, "get_value", context.runtime.newString(name));
|
||||
}
|
||||
|
||||
protected final boolean hasSetting(final ThreadContext context, final String name) {
|
||||
return settings.callMethod(context, "registered?", context.runtime.newString(name)) == context.tru;
|
||||
}
|
||||
|
||||
protected SecretStore getSecretStore(final ThreadContext context) {
|
||||
String keystoreFile = hasSetting(context, "keystore.file")
|
||||
? getSetting(context, "keystore.file").asJavaString()
|
||||
: null;
|
||||
String keystoreClassname = hasSetting(context, "keystore.classname")
|
||||
? getSetting(context, "keystore.classname").asJavaString()
|
||||
: null;
|
||||
return (keystoreFile != null && keystoreClassname != null)
|
||||
? SecretStoreExt.getIfExists(keystoreFile, keystoreClassname)
|
||||
: null;
|
||||
}
|
||||
|
||||
private AbstractNamespacedMetricExt getDlqMetric(final ThreadContext context) {
|
||||
if (dlqMetric == null) {
|
||||
dlqMetric = metric.namespace(
|
||||
|
|
|
@ -56,7 +56,8 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt {
|
|||
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
|
||||
).initialize(context, args[3], this, dlqWriter(context)),
|
||||
RubyUtil.FILTER_DELEGATOR_CLASS
|
||||
)
|
||||
),
|
||||
getSecretStore(context)
|
||||
);
|
||||
inputs = RubyArray.newArray(context.runtime, lirExecution.inputs());
|
||||
filters = RubyArray.newArray(context.runtime, lirExecution.filters());
|
||||
|
|
|
@ -0,0 +1,83 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import org.logstash.common.EnvironmentVariableProvider;
|
||||
import org.logstash.secret.SecretIdentifier;
|
||||
import org.logstash.secret.store.SecretStore;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class ConfigVariableExpander implements AutoCloseable {
|
||||
|
||||
private static String SUBSTITUTION_PLACEHOLDER_REGEX = "\\$\\{(?<name>[a-zA-Z_.][a-zA-Z0-9_.]*)(:(?<default>[^}]*))?}";
|
||||
|
||||
private Pattern substitutionPattern = Pattern.compile(SUBSTITUTION_PLACEHOLDER_REGEX);
|
||||
private SecretStore secretStore;
|
||||
private EnvironmentVariableProvider envVarProvider;
|
||||
|
||||
public ConfigVariableExpander(SecretStore secretStore, EnvironmentVariableProvider envVarProvider) {
|
||||
this.secretStore = secretStore;
|
||||
this.envVarProvider = envVarProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Replace all substitution variable references in @variable and returns the substituted value, or the
|
||||
* original value if a substitution cannot be made.
|
||||
*
|
||||
* Process following patterns : ${VAR}, ${VAR:defaultValue}
|
||||
*
|
||||
* If value matches the pattern, the following precedence applies:
|
||||
* Secret store value
|
||||
* Environment entry value
|
||||
* Default value if provided in the pattern
|
||||
* Exception raised
|
||||
*
|
||||
* If the value does not match the pattern, the 'value' param returns as-is
|
||||
*/
|
||||
public Object expand(Object value) {
|
||||
String variable;
|
||||
if (value instanceof String) {
|
||||
variable = (String) value;
|
||||
} else {
|
||||
return value;
|
||||
}
|
||||
|
||||
Matcher m = substitutionPattern.matcher(variable);
|
||||
if (m.matches()) {
|
||||
String variableName = m.group("name");
|
||||
|
||||
if (secretStore != null) {
|
||||
byte[] ssValue = secretStore.retrieveSecret(new SecretIdentifier(variableName));
|
||||
if (ssValue != null) {
|
||||
return new String(ssValue, StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
||||
if (envVarProvider != null) {
|
||||
String evValue = envVarProvider.get(variableName);
|
||||
if (evValue != null) {
|
||||
return evValue;
|
||||
}
|
||||
}
|
||||
|
||||
String defaultValue = m.group("default");
|
||||
if (defaultValue != null) {
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
throw new IllegalStateException(String.format(
|
||||
"Cannot evaluate `%s`. Replacement variable `%s` is not defined in a Logstash " +
|
||||
"secret store or an environment entry and there is no default value given.",
|
||||
variable, variableName));
|
||||
} else {
|
||||
return variable;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// most keystore implementations will have close() methods
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
package org.logstash.common;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.logstash.plugins.ConfigVariableExpander;
|
||||
import org.logstash.secret.SecretIdentifier;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.logstash.secret.store.SecretStoreFactoryTest.MemoryStore;
|
||||
|
||||
public class ConfigVariableExpanderTest {
|
||||
|
||||
@Test
|
||||
public void testNonStringValueReturnsUnchanged() {
|
||||
long nonStringValue = 42L;
|
||||
ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap());
|
||||
Object expandedValue = cve.expand(nonStringValue);
|
||||
Assert.assertEquals(nonStringValue, expandedValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpansionWithoutVariable() throws Exception {
|
||||
String key = "foo";
|
||||
ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap());
|
||||
String expandedValue = (String) cve.expand(key);
|
||||
Assert.assertEquals(key, expandedValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleExpansion() throws Exception {
|
||||
String key = "foo";
|
||||
String val = "bar";
|
||||
ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.singletonMap(key, val));
|
||||
|
||||
String expandedValue = (String) cve.expand("${" + key + "}");
|
||||
Assert.assertEquals(val, expandedValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpansionWithDefaultValue() throws Exception {
|
||||
String key = "foo";
|
||||
String val = "bar";
|
||||
String defaultValue = "baz";
|
||||
ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap());
|
||||
|
||||
String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}");
|
||||
Assert.assertEquals(defaultValue, expandedValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExpansionWithoutValueThrows() throws Exception {
|
||||
String key = "foo";
|
||||
ConfigVariableExpander cve = getFakeCve(Collections.emptyMap(), Collections.emptyMap());
|
||||
|
||||
try {
|
||||
String expandedValue = (String) cve.expand("${" + key + "}");
|
||||
Assert.fail("Exception should have been thrown");
|
||||
} catch (IllegalStateException ise) {
|
||||
Assert.assertTrue(ise.getMessage().startsWith("Cannot evaluate"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedenceOfSecretStoreValue() throws Exception {
|
||||
String key = "foo";
|
||||
String ssVal = "ssbar";
|
||||
String evVal = "evbar";
|
||||
String defaultValue = "defaultbar";
|
||||
ConfigVariableExpander cve = getFakeCve(
|
||||
Collections.singletonMap(key, ssVal),
|
||||
Collections.singletonMap(key, evVal));
|
||||
|
||||
String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}");
|
||||
Assert.assertEquals(ssVal, expandedValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecedenceOfEnvironmentVariableValue() throws Exception {
|
||||
String key = "foo";
|
||||
String evVal = "evbar";
|
||||
String defaultValue = "defaultbar";
|
||||
ConfigVariableExpander cve = getFakeCve(
|
||||
Collections.emptyMap(),
|
||||
Collections.singletonMap(key, evVal));
|
||||
|
||||
String expandedValue = (String) cve.expand("${" + key + ":" + defaultValue + "}");
|
||||
Assert.assertEquals(evVal, expandedValue);
|
||||
}
|
||||
|
||||
private static ConfigVariableExpander getFakeCve(
|
||||
final Map<String, Object> ssValues, final Map<String, String> envVarValues) {
|
||||
|
||||
MemoryStore ms = new MemoryStore();
|
||||
for (Map.Entry<String, Object> e : ssValues.entrySet()) {
|
||||
if (e.getValue() instanceof String) {
|
||||
ms.persistSecret(new SecretIdentifier(e.getKey()),
|
||||
((String) e.getValue()).getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
return new ConfigVariableExpander(ms, envVarValues::get);
|
||||
}
|
||||
|
||||
}
|
|
@ -133,7 +133,7 @@ public class SecretStoreFactoryTest {
|
|||
/**
|
||||
* Valid alternate implementation
|
||||
*/
|
||||
static class MemoryStore implements SecretStore {
|
||||
public static class MemoryStore implements SecretStore {
|
||||
|
||||
Map<SecretIdentifier, ByteBuffer> secrets = new HashMap(1);
|
||||
|
||||
|
@ -178,7 +178,8 @@ public class SecretStoreFactoryTest {
|
|||
|
||||
@Override
|
||||
public byte[] retrieveSecret(SecretIdentifier id) {
|
||||
return secrets.get(id).array();
|
||||
ByteBuffer bb = secrets.get(id);
|
||||
return bb != null ? bb.array() : null;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -74,19 +74,6 @@ describe "Test that Logstash" do
|
|||
end
|
||||
end
|
||||
|
||||
context "will start" do
|
||||
let(:settings) {{"pipeline.id" => "main"}}
|
||||
it "with the wrong password and variables are NOT in settings" do
|
||||
test_env["LOGSTASH_KEYSTORE_PASS"] = "WRONG_PASSWRD"
|
||||
@logstash.env_variables = test_env
|
||||
@logstash.spawn_logstash("-e", "input {generator { count => 1 }} output { }", "--path.settings", settings_dir)
|
||||
try(num_retries) do
|
||||
expect(@logstash.exited?).to be(true)
|
||||
end
|
||||
expect(@logstash.exit_code).to be(0)
|
||||
end
|
||||
end
|
||||
|
||||
context "won't start " do
|
||||
let(:settings) {{"pipeline.id" => "${missing}"}}
|
||||
it "with correct password, but invalid variable " do
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue