mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
Changed PluginFactory to resolve id field with environment variables docs: plugin ids variable expansion cannot use secret store
closes 10546 Fixes #11592
This commit is contained in:
parent
91e96b1f2e
commit
496f81c8ca
9 changed files with 184 additions and 4 deletions
|
@ -146,6 +146,8 @@ Adding a named ID in this case will help in monitoring Logstash when using the m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
NOTE: Variable substitution in the `id` field only supports environment variables
|
||||||
|
and does not support the use of values from the secret store.
|
||||||
|
|
||||||
ifeval::["{versioned_docs}"!="true"]
|
ifeval::["{versioned_docs}"!="true"]
|
||||||
[id="plugins-{type}s-{plugin}-periodic_flush"]
|
[id="plugins-{type}s-{plugin}-periodic_flush"]
|
||||||
|
|
|
@ -112,6 +112,8 @@ input {
|
||||||
}
|
}
|
||||||
---------------------------------------------------------------------------------------------------
|
---------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
NOTE: Variable substitution in the `id` field only supports environment variables
|
||||||
|
and does not support the use of values from the secret store.
|
||||||
|
|
||||||
ifeval::["{versioned_docs}"!="true"]
|
ifeval::["{versioned_docs}"!="true"]
|
||||||
[id="plugins-{type}s-{plugin}-tags"]
|
[id="plugins-{type}s-{plugin}-tags"]
|
||||||
|
|
|
@ -89,4 +89,6 @@ output {
|
||||||
}
|
}
|
||||||
---------------------------------------------------------------------------------------------------
|
---------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
NOTE: Variable substitution in the `id` field only supports environment variables
|
||||||
|
and does not support the use of values from the secret store.
|
||||||
|
|
||||||
|
|
3
docs/static/keystore.asciidoc
vendored
3
docs/static/keystore.asciidoc
vendored
|
@ -27,6 +27,9 @@ Whereas the Elasticsearch keystore lets you store `elasticsearch.yml` values by
|
||||||
name, the Logstash keystore lets you specify arbitrary names that you
|
name, the Logstash keystore lets you specify arbitrary names that you
|
||||||
can reference in the Logstash configuration.
|
can reference in the Logstash configuration.
|
||||||
|
|
||||||
|
NOTE: There are some configuration fields that have no secret meaning, so not every field could leverage
|
||||||
|
the secret store for variables substitution. Plugin's `id` field is a field of this kind
|
||||||
|
|
||||||
NOTE: Referencing keystore data from `pipelines.yml` or the command line (`-e`)
|
NOTE: Referencing keystore data from `pipelines.yml` or the command line (`-e`)
|
||||||
is not currently supported.
|
is not currently supported.
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,7 @@ task javaTests(type: Test) {
|
||||||
exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
||||||
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
||||||
exclude '/org/logstash/plugins/CounterMetricImplTest.class'
|
exclude '/org/logstash/plugins/CounterMetricImplTest.class'
|
||||||
|
exclude '/org/logstash/plugins/PluginFactoryExtTest.class'
|
||||||
}
|
}
|
||||||
|
|
||||||
task rubyTests(type: Test) {
|
task rubyTests(type: Test) {
|
||||||
|
@ -88,6 +89,7 @@ task rubyTests(type: Test) {
|
||||||
include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
||||||
include '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
include '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
||||||
include '/org/logstash/plugins/CounterMetricImplTest.class'
|
include '/org/logstash/plugins/CounterMetricImplTest.class'
|
||||||
|
include '/org/logstash/plugins/PluginFactoryExtTest.class'
|
||||||
}
|
}
|
||||||
|
|
||||||
test {
|
test {
|
||||||
|
|
|
@ -34,7 +34,6 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,13 @@ public class ConfigVariableExpander implements AutoCloseable {
|
||||||
private SecretStore secretStore;
|
private SecretStore secretStore;
|
||||||
private EnvironmentVariableProvider envVarProvider;
|
private EnvironmentVariableProvider envVarProvider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a ConfigVariableExpander that doesn't lookup any secreted placeholder.
|
||||||
|
* */
|
||||||
|
static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) {
|
||||||
|
return new ConfigVariableExpander(null, envVarProvider);
|
||||||
|
}
|
||||||
|
|
||||||
public ConfigVariableExpander(SecretStore secretStore, EnvironmentVariableProvider envVarProvider) {
|
public ConfigVariableExpander(SecretStore secretStore, EnvironmentVariableProvider envVarProvider) {
|
||||||
this.secretStore = secretStore;
|
this.secretStore = secretStore;
|
||||||
this.envVarProvider = envVarProvider;
|
this.envVarProvider = envVarProvider;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.jruby.runtime.builtin.IRubyObject;
|
||||||
import org.logstash.RubyUtil;
|
import org.logstash.RubyUtil;
|
||||||
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
|
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
|
||||||
import org.logstash.common.DLQWriterAdapter;
|
import org.logstash.common.DLQWriterAdapter;
|
||||||
|
import org.logstash.common.EnvironmentVariableProvider;
|
||||||
import org.logstash.common.NullDeadLetterQueueWriter;
|
import org.logstash.common.NullDeadLetterQueueWriter;
|
||||||
import org.logstash.common.SourceWithMetadata;
|
import org.logstash.common.SourceWithMetadata;
|
||||||
import org.logstash.config.ir.PipelineIR;
|
import org.logstash.config.ir.PipelineIR;
|
||||||
|
@ -56,6 +57,11 @@ import java.util.UUID;
|
||||||
|
|
||||||
public final class PluginFactoryExt {
|
public final class PluginFactoryExt {
|
||||||
|
|
||||||
|
@FunctionalInterface
|
||||||
|
public interface PluginResolver {
|
||||||
|
PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name);
|
||||||
|
}
|
||||||
|
|
||||||
@JRubyClass(name = "PluginFactory")
|
@JRubyClass(name = "PluginFactory")
|
||||||
public static final class Plugins extends RubyBasicObject
|
public static final class Plugins extends RubyBasicObject
|
||||||
implements RubyIntegration.PluginFactory {
|
implements RubyIntegration.PluginFactory {
|
||||||
|
@ -74,6 +80,10 @@ public final class PluginFactoryExt {
|
||||||
|
|
||||||
private RubyClass filterClass;
|
private RubyClass filterClass;
|
||||||
|
|
||||||
|
private ConfigVariableExpander configVariables;
|
||||||
|
|
||||||
|
private PluginResolver pluginResolver;
|
||||||
|
|
||||||
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
|
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
|
||||||
public static IRubyObject filterDelegator(final ThreadContext context,
|
public static IRubyObject filterDelegator(final ThreadContext context,
|
||||||
final IRubyObject recv, final IRubyObject[] args) {
|
final IRubyObject recv, final IRubyObject[] args) {
|
||||||
|
@ -90,7 +100,12 @@ public final class PluginFactoryExt {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Plugins(final Ruby runtime, final RubyClass metaClass) {
|
public Plugins(final Ruby runtime, final RubyClass metaClass) {
|
||||||
|
this(runtime, metaClass, PluginLookup::lookup);
|
||||||
|
}
|
||||||
|
|
||||||
|
Plugins(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) {
|
||||||
super(runtime, metaClass);
|
super(runtime, metaClass);
|
||||||
|
this.pluginResolver = pluginResolver;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JRubyMethod(required = 4)
|
@JRubyMethod(required = 4)
|
||||||
|
@ -106,10 +121,18 @@ public final class PluginFactoryExt {
|
||||||
public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics,
|
public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics,
|
||||||
final PluginFactoryExt.ExecutionContext executionContext,
|
final PluginFactoryExt.ExecutionContext executionContext,
|
||||||
final RubyClass filterClass) {
|
final RubyClass filterClass) {
|
||||||
|
return this.init(lir, metrics, executionContext, filterClass, EnvironmentVariableProvider.defaultProvider());
|
||||||
|
}
|
||||||
|
|
||||||
|
PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics,
|
||||||
|
final PluginFactoryExt.ExecutionContext executionContext,
|
||||||
|
final RubyClass filterClass,
|
||||||
|
final EnvironmentVariableProvider envVars) {
|
||||||
this.lir = lir;
|
this.lir = lir;
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.executionContext = executionContext;
|
this.executionContext = executionContext;
|
||||||
this.filterClass = filterClass;
|
this.filterClass = filterClass;
|
||||||
|
this.configVariables = ConfigVariableExpander.withoutSecret(envVars);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,16 +201,17 @@ public final class PluginFactoryExt {
|
||||||
SourceWithMetadata source, final Map<String, IRubyObject> args,
|
SourceWithMetadata source, final Map<String, IRubyObject> args,
|
||||||
Map<String, Object> pluginArgs) {
|
Map<String, Object> pluginArgs) {
|
||||||
final String id;
|
final String id;
|
||||||
final PluginLookup.PluginClass pluginClass = PluginLookup.lookup(type, name);
|
final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name);
|
||||||
|
|
||||||
if (type == PluginLookup.PluginType.CODEC) {
|
if (type == PluginLookup.PluginType.CODEC) {
|
||||||
id = UUID.randomUUID().toString();
|
id = UUID.randomUUID().toString();
|
||||||
} else {
|
} else {
|
||||||
id = lir.getGraph().vertices()
|
String unresolvedId = lir.getGraph().vertices()
|
||||||
.filter(v -> v.getSourceWithMetadata() != null
|
.filter(v -> v.getSourceWithMetadata() != null
|
||||||
&& v.getSourceWithMetadata().equalsWithoutText(source))
|
&& v.getSourceWithMetadata().equalsWithoutText(source))
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.map(Vertex::getId).orElse(null);
|
.map(Vertex::getId).orElse(null);
|
||||||
|
id = (String) configVariables.expand(unresolvedId);
|
||||||
}
|
}
|
||||||
if (id == null) {
|
if (id == null) {
|
||||||
throw context.runtime.newRaiseException(
|
throw context.runtime.newRaiseException(
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
package org.logstash.plugins;
|
||||||
|
|
||||||
|
import co.elastic.logstash.api.*;
|
||||||
|
import org.jruby.RubyArray;
|
||||||
|
import org.jruby.RubyHash;
|
||||||
|
import org.jruby.RubyString;
|
||||||
|
import org.jruby.javasupport.JavaUtil;
|
||||||
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.logstash.RubyUtil;
|
||||||
|
import org.logstash.common.IncompleteSourceWithMetadataException;
|
||||||
|
import org.logstash.common.SourceWithMetadata;
|
||||||
|
import org.logstash.config.ir.ConfigCompiler;
|
||||||
|
import org.logstash.config.ir.PipelineIR;
|
||||||
|
import org.logstash.config.ir.RubyEnvTestCase;
|
||||||
|
import org.logstash.instrument.metrics.NamespacedMetricExt;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS;
|
||||||
|
import static org.logstash.RubyUtil.RUBY;
|
||||||
|
import static org.logstash.plugins.MetricTestCase.runRubyScript;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests for {@link PluginFactoryExt.Plugins}.
|
||||||
|
*/
|
||||||
|
public final class PluginFactoryExtTest extends RubyEnvTestCase {
|
||||||
|
|
||||||
|
static class MockInputPlugin implements Input {
|
||||||
|
private final String id;
|
||||||
|
|
||||||
|
@SuppressWarnings("unused")
|
||||||
|
public MockInputPlugin(String id, Configuration config, Context ctx) {
|
||||||
|
this.id = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<PluginConfigSpec<?>> configSchema() {
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getId() {
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start(Consumer<Map<String, Object>> writer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void awaitStop() throws InterruptedException {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPluginIdResolvedWithEnvironmentVariables() throws IncompleteSourceWithMetadataException {
|
||||||
|
PluginFactoryExt.PluginResolver mockPluginResolver = wrapWithSearchable(MockInputPlugin.class);
|
||||||
|
|
||||||
|
SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("proto", "path", 1, 8, "input {mockinput{ id => \"${CUSTOM}\"}} output{mockoutput{}}");
|
||||||
|
final PipelineIR pipelineIR = compilePipeline(sourceWithMetadata);
|
||||||
|
|
||||||
|
PluginFactoryExt.Metrics metricsFactory = createMetricsFactory();
|
||||||
|
PluginFactoryExt.ExecutionContext execContextFactory = createExecutionContextFactory();
|
||||||
|
Map<String, String> envVars = new HashMap<>();
|
||||||
|
envVars.put("CUSTOM", "test");
|
||||||
|
PluginFactoryExt.Plugins sut = new PluginFactoryExt.Plugins(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS,
|
||||||
|
mockPluginResolver);
|
||||||
|
sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get);
|
||||||
|
|
||||||
|
RubyString pluginName = RubyUtil.RUBY.newString("mockinput");
|
||||||
|
|
||||||
|
// Exercise
|
||||||
|
IRubyObject pluginInstance = sut.buildInput(pluginName, sourceWithMetadata, RubyHash.newHash(RubyUtil.RUBY),
|
||||||
|
Collections.emptyMap());
|
||||||
|
|
||||||
|
//Verify
|
||||||
|
IRubyObject id = pluginInstance.callMethod(RUBY.getCurrentContext(), "id");
|
||||||
|
assertEquals("Resolved config setting MUST be evaluated with substitution", envVars.get("CUSTOM"), id.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
private static PipelineIR compilePipeline(SourceWithMetadata sourceWithMetadata) {
|
||||||
|
RubyArray sourcesWithMetadata = RubyUtil.RUBY.newArray(JavaUtil.convertJavaToRuby(
|
||||||
|
RubyUtil.RUBY, sourceWithMetadata));
|
||||||
|
|
||||||
|
return ConfigCompiler.configToPipelineIR(sourcesWithMetadata, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() {
|
||||||
|
PluginFactoryExt.ExecutionContext execContextFactory = new PluginFactoryExt.ExecutionContext(RubyUtil.RUBY,
|
||||||
|
RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS);
|
||||||
|
execContextFactory.initialize(RubyUtil.RUBY.getCurrentContext(), null, null,
|
||||||
|
RubyUtil.RUBY.newString("no DLQ"));
|
||||||
|
return execContextFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PluginFactoryExt.Metrics createMetricsFactory() {
|
||||||
|
final IRubyObject metricWithCollector =
|
||||||
|
runRubyScript("require \"logstash/instrument/collector\"\n" +
|
||||||
|
"metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)");
|
||||||
|
|
||||||
|
NamespacedMetricExt metric = new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS)
|
||||||
|
.initialize(RUBY.getCurrentContext(), metricWithCollector, RUBY.newEmptyArray());
|
||||||
|
|
||||||
|
|
||||||
|
PluginFactoryExt.Metrics metricsFactory = new PluginFactoryExt.Metrics(RubyUtil.RUBY, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS);
|
||||||
|
metricsFactory.initialize(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newString("main"), metric);
|
||||||
|
return metricsFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PluginFactoryExt.PluginResolver wrapWithSearchable(final Class<? extends Input> pluginClass) {
|
||||||
|
return new PluginFactoryExt.PluginResolver() {
|
||||||
|
@Override
|
||||||
|
public PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name) {
|
||||||
|
return new PluginLookup.PluginClass() {
|
||||||
|
@Override
|
||||||
|
public PluginLookup.PluginLanguage language() {
|
||||||
|
return PluginLookup.PluginLanguage.JAVA;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object klass() {
|
||||||
|
return pluginClass;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue