Backport of PR #11457 to 7.x (#12057)

Simplified if..else if in PluginFactory for Java plugins part, moved to template method pattern
This commit is contained in:
Andrea Selva 2020-06-26 11:09:08 +02:00 committed by GitHub
parent c6795731f1
commit 3e380bf10f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 615 additions and 531 deletions

View file

@ -97,7 +97,7 @@ tasks.register("javaTests", Test) {
exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
exclude '/org/logstash/plugins/CounterMetricImplTest.class'
exclude '/org/logstash/plugins/PluginFactoryExtTest.class'
exclude '/org/logstash/plugins/factory/PluginFactoryExtTest.class'
}
tasks.register("rubyTests", Test) {
@ -113,7 +113,7 @@ tasks.register("rubyTests", Test) {
include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
include '/org/logstash/plugins/NamespacedMetricImplTest.class'
include '/org/logstash/plugins/CounterMetricImplTest.class'
include '/org/logstash/plugins/PluginFactoryExtTest.class'
include '/org/logstash/plugins/factory/PluginFactoryExtTest.class'
}
test {

View file

@ -46,7 +46,7 @@ module LogStash; class BasePipeline < AbstractPipeline
@plugin_factory = LogStash::Plugins::PluginFactory.new(
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, metric),
lir, LogStash::Plugins::PluginMetricsFactory.new(pipeline_id, metric),
LogStash::Plugins::ExecutionContextFactory.new(@agent, self, dlq_writer),
FilterDelegator
)

View file

@ -73,9 +73,11 @@ import org.logstash.log.LoggableExt;
import org.logstash.log.LoggerExt;
import org.logstash.log.SlowLoggerExt;
import org.logstash.plugins.HooksRegistryExt;
import org.logstash.plugins.PluginFactoryExt;
import org.logstash.plugins.UniversalPluginExt;
import org.logstash.util.UtilExt;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginMetricsFactoryExt;
import org.logstash.plugins.factory.PluginFactoryExt;
import java.util.stream.Stream;
@ -193,7 +195,7 @@ public final class RubyUtil {
public static final RubyClass EXECUTION_CONTEXT_FACTORY_CLASS;
public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;
public static final RubyClass PLUGIN_METRICS_FACTORY_CLASS;
public static final RubyClass PLUGIN_FACTORY_CLASS;
@ -259,16 +261,16 @@ public final class RubyUtil {
METRIC_SNAPSHOT_CLASS.defineAnnotatedMethods(SnapshotExt.class);
EXECUTION_CONTEXT_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"ExecutionContextFactory", RUBY.getObject(),
PluginFactoryExt.ExecutionContext::new
ExecutionContextFactoryExt::new
);
PLUGIN_METRIC_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginMetricFactory", RUBY.getObject(), PluginFactoryExt.Metrics::new
PLUGIN_METRICS_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginMetricsFactory", RUBY.getObject(), PluginMetricsFactoryExt::new
);
SHUTDOWN_WATCHER_CLASS =
setupLogstashClass(ShutdownWatcherExt::new, ShutdownWatcherExt.class);
PLUGIN_METRIC_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Metrics.class);
PLUGIN_METRICS_FACTORY_CLASS.defineAnnotatedMethods(PluginMetricsFactoryExt.class);
EXECUTION_CONTEXT_FACTORY_CLASS.defineAnnotatedMethods(
PluginFactoryExt.ExecutionContext.class
ExecutionContextFactoryExt.class
);
METRIC_EXCEPTION_CLASS = instrumentModule.defineClassUnder(
"MetricException", RUBY.getException(), MetricExt.MetricException::new
@ -546,9 +548,9 @@ public final class RubyUtil {
RUBY_EVENT_CLASS.defineAnnotatedMethods(JrubyEventExtLibrary.RubyEvent.class);
RUBY_EVENT_CLASS.defineAnnotatedConstants(JrubyEventExtLibrary.RubyEvent.class);
PLUGIN_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginFactory", RUBY.getObject(), PluginFactoryExt.Plugins::new
"PluginFactory", RUBY.getObject(), PluginFactoryExt::new
);
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Plugins.class);
PLUGIN_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.class);
UNIVERSAL_PLUGIN_CLASS =
setupLogstashClass(UniversalPluginExt::new, UniversalPluginExt.class);
EVENT_DISPATCHER_CLASS =

View file

@ -116,7 +116,7 @@ public final class CompiledPipeline {
filters = setupFilters(cve);
outputs = setupOutputs(cve);
} catch (Exception e) {
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage());
throw new IllegalStateException("Unable to configure plugins: " + e.getMessage(), e);
}
}

View file

@ -36,7 +36,9 @@ import org.logstash.common.IncompleteSourceWithMetadataException;
import org.logstash.config.ir.CompiledPipeline;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.plugins.PluginFactoryExt;
import org.logstash.plugins.factory.ExecutionContextFactoryExt;
import org.logstash.plugins.factory.PluginMetricsFactoryExt;
import org.logstash.plugins.factory.PluginFactoryExt;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
@ -67,12 +69,12 @@ public final class JavaBasePipelineExt extends AbstractPipelineExt {
initialize(context, args[0], args[1], args[2]);
lirExecution = new CompiledPipeline(
lir,
new PluginFactoryExt.Plugins(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init(
new PluginFactoryExt(context.runtime, RubyUtil.PLUGIN_FACTORY_CLASS).init(
lir,
new PluginFactoryExt.Metrics(
context.runtime, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS
new PluginMetricsFactoryExt(
context.runtime, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS
).initialize(context, pipelineId(), metric()),
new PluginFactoryExt.ExecutionContext(
new ExecutionContextFactoryExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS
).initialize(context, args[3], this, dlqWriter(context)),
RubyUtil.FILTER_DELEGATOR_CLASS

View file

@ -38,8 +38,11 @@ public class ConfigVariableExpander implements AutoCloseable {
/**
* Creates a ConfigVariableExpander that doesn't lookup any secreted placeholder.
*
* @param envVarProvider EnvironmentVariableProvider to use as source of substitutions
* @return an variable expander that uses envVarProvider as source
* */
static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) {
public static ConfigVariableExpander withoutSecret(EnvironmentVariableProvider envVarProvider) {
return new ConfigVariableExpander(null, envVarProvider);
}

View file

@ -1,486 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.plugins;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Output;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.DLQWriterAdapter;
import org.logstash.common.EnvironmentVariableProvider;
import org.logstash.common.NullDeadLetterQueueWriter;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaCodecDelegator;
import org.logstash.config.ir.compiler.JavaFilterDelegatorExt;
import org.logstash.config.ir.compiler.JavaInputDelegatorExt;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.OutputStrategyExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.NullMetricExt;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
public final class PluginFactoryExt {
@FunctionalInterface
public interface PluginResolver {
PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name);
}
@JRubyClass(name = "PluginFactory")
public static final class Plugins extends RubyBasicObject
implements RubyIntegration.PluginFactory {
private static final long serialVersionUID = 1L;
private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");
private final Collection<String> pluginsById = new HashSet<>();
private PipelineIR lir;
private PluginFactoryExt.ExecutionContext executionContext;
private PluginFactoryExt.Metrics metrics;
private RubyClass filterClass;
private ConfigVariableExpander configVariables;
private PluginResolver pluginResolver;
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
public static IRubyObject filterDelegator(final ThreadContext context,
final IRubyObject recv, final IRubyObject[] args) {
final RubyHash arguments = (RubyHash) args[2];
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
filterInstance.callMethod(
context, "metric=",
((AbstractMetricExt) args[3]).namespace(context, id.intern())
);
filterInstance.callMethod(context, "execution_context=", args[4]);
return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS)
.initialize(context, filterInstance, id);
}
public Plugins(final Ruby runtime, final RubyClass metaClass) {
this(runtime, metaClass, PluginLookup::lookup);
}
Plugins(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) {
super(runtime, metaClass);
this.pluginResolver = pluginResolver;
}
@JRubyMethod(required = 4)
public PluginFactoryExt.Plugins initialize(final ThreadContext context,
final IRubyObject[] args) {
return init(
args[0].toJava(PipelineIR.class),
(PluginFactoryExt.Metrics) args[1], (PluginFactoryExt.ExecutionContext) args[2],
(RubyClass) args[3]
);
}
public PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics,
final PluginFactoryExt.ExecutionContext executionContext,
final RubyClass filterClass) {
return this.init(lir, metrics, executionContext, filterClass, EnvironmentVariableProvider.defaultProvider());
}
PluginFactoryExt.Plugins init(final PipelineIR lir, final PluginFactoryExt.Metrics metrics,
final PluginFactoryExt.ExecutionContext executionContext,
final RubyClass filterClass,
final EnvironmentVariableProvider envVars) {
this.lir = lir;
this.metrics = metrics;
this.executionContext = executionContext;
this.filterClass = filterClass;
this.configVariables = ConfigVariableExpander.withoutSecret(envVars);
return this;
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildInput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return (AbstractOutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return (AbstractFilterDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args,
Map<String, Object> pluginArgs) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
name.asJavaString(), source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@Override
public Codec buildDefaultCodec(String codecName) {
return (Codec) JavaUtil.unwrapJavaValue(plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
codecName, null, Collections.emptyMap(), Collections.emptyMap()
));
}
@SuppressWarnings("unchecked")
@JRubyMethod(required = 3, optional = 1)
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
return plugin(
context,
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
args[1].asJavaString(),
JavaUtil.unwrapIfJavaObject(args[2]),
args.length > 3 ? (Map<String, IRubyObject>) args[3] : new HashMap<>(),
null
);
}
@SuppressWarnings("unchecked")
private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name,
SourceWithMetadata source, final Map<String, IRubyObject> args,
Map<String, Object> pluginArgs) {
final String id;
final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name);
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
} else {
String unresolvedId = lir.getGraph().vertices()
.filter(v -> v.getSourceWithMetadata() != null
&& v.getSourceWithMetadata().equalsWithoutText(source))
.findFirst()
.map(Vertex::getId).orElse(null);
id = (String) configVariables.expand(unresolvedId);
}
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Could not determine ID for %s/%s, source don't matched: %s",
type.rubyLabel().asJavaString(), name, source
)
);
}
if (pluginsById.contains(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
pluginsById.add(id);
final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel());
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
final Map<String, Object> newArgs = new HashMap<>(args);
newArgs.put("id", id);
final RubyClass klass = (RubyClass) pluginClass.klass();
final ExecutionContextExt executionCntx = executionContext.create(
context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name")
);
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
rubyArgs.putAll(newArgs);
if (type == PluginLookup.PluginType.OUTPUT) {
return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize(
context,
new IRubyObject[]{
klass, typeScopedMetric, executionCntx,
OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null),
rubyArgs
}
);
} else if (type == PluginLookup.PluginType.FILTER) {
return filterDelegator(
context, null,
new IRubyObject[]{
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx
}
);
} else {
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name"));
pluginInstance.callMethod(context, "metric=", scopedMetric);
pluginInstance.callMethod(context, "execution_context=", executionCntx);
return pluginInstance;
}
} else {
if (pluginArgs == null) {
String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." +
" The Java execution engine is required to run Java plugins.", name);
throw new IllegalStateException(err);
}
if (type == PluginLookup.PluginType.OUTPUT) {
final Class<Output> cls = (Class<Output>) pluginClass.klass();
Output output = null;
if (cls != null) {
try {
final Constructor<Output> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs, this);
output = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
PluginUtil.validateConfig(output, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
if (output != null) {
return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output);
} else {
throw new IllegalStateException("Unable to instantiate output: " + pluginClass);
}
} else if (type == PluginLookup.PluginType.FILTER) {
final Class<Filter> cls = (Class<Filter>) pluginClass.klass();
Filter filter = null;
if (cls != null) {
try {
final Constructor<Filter> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs);
filter = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
PluginUtil.validateConfig(filter, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
if (filter != null) {
return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs);
} else {
throw new IllegalStateException("Unable to instantiate filter: " + pluginClass);
}
} else if (type == PluginLookup.PluginType.INPUT) {
final Class<Input> cls = (Class<Input>) pluginClass.klass();
Input input = null;
if (cls != null) {
try {
final Constructor<Input> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs, this);
input = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
PluginUtil.validateConfig(input, config);
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
if (input != null) {
return JavaInputDelegatorExt.create((JavaBasePipelineExt) executionContext.pipeline, typeScopedMetric, input, pluginArgs);
} else {
throw new IllegalStateException("Unable to instantiate input: " + pluginClass);
}
} else if (type == PluginLookup.PluginType.CODEC) {
final Class<Codec> cls = (Class<Codec>) pluginClass.klass();
if (cls != null) {
try {
final Constructor<Codec> ctor = cls.getConstructor(Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs);
final Context pluginContext = executionContext.toContext(type, metrics.getRoot(context));
final Codec codec = ctor.newInstance(config, pluginContext);
PluginUtil.validateConfig(codec, config);
return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec));
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
throw new IllegalStateException("Unable to instantiate codec: " + pluginClass);
}
else {
throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString());
}
}
}
}
@JRubyClass(name = "ExecutionContextFactory")
public static final class ExecutionContext extends RubyBasicObject {
private static final long serialVersionUID = 1L;
private IRubyObject agent;
private IRubyObject pipeline;
private IRubyObject dlqWriter;
public ExecutionContext(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public PluginFactoryExt.ExecutionContext initialize(final ThreadContext context,
final IRubyObject agent, final IRubyObject pipeline, final IRubyObject dlqWriter) {
this.agent = agent;
this.pipeline = pipeline;
this.dlqWriter = dlqWriter;
return this;
}
@JRubyMethod
public ExecutionContextExt create(final ThreadContext context, final IRubyObject id,
final IRubyObject classConfigName) {
return new ExecutionContextExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS
).initialize(
context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter}
);
}
public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) {
DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance();
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
IRubyObject innerWriter =
((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter)
.innerWriter(RubyUtil.RUBY.getCurrentContext());
if (innerWriter != null) {
if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) {
dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class));
}
}
} else if (dlqWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) {
dlq = dlqWriter.toJava(DeadLetterQueueWriter.class);
}
return new ContextImpl(dlq, new NamespacedMetricImpl(RubyUtil.RUBY.getCurrentContext(), metric));
}
}
@JRubyClass(name = "PluginMetricFactory")
public static final class Metrics extends RubyBasicObject {
private static final long serialVersionUID = 1L;
private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins");
private RubySymbol pipelineId;
private AbstractMetricExt metric;
public Metrics(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public PluginFactoryExt.Metrics initialize(final ThreadContext context,
final IRubyObject pipelineId, final IRubyObject metrics) {
this.pipelineId = pipelineId.convertToString().intern();
if (metrics.isNil()) {
this.metric = new NullMetricExt(context.runtime, RubyUtil.NULL_METRIC_CLASS);
} else {
this.metric = (AbstractMetricExt) metrics;
}
return this;
}
AbstractNamespacedMetricExt getRoot(final ThreadContext context) {
return metric.namespace(
context,
RubyArray.newArray(
context.runtime,
Arrays.asList(
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS
)
)
);
}
@JRubyMethod
public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) {
return getRoot(context).namespace(
context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString()))
);
}
}
}

View file

@ -41,12 +41,16 @@ public class Dots implements Codec {
private final String id;
public Dots(final Configuration configuration, final Context context) {
this();
public Dots(final String id, final Configuration configuration, final Context context) {
this((id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString());
}
private Dots() {
this.id = UUID.randomUUID().toString();
public Dots(final Configuration configuration, final Context context) {
this(UUID.randomUUID().toString());
}
private Dots(String id) {
this.id = id;
}
@Override
@ -66,7 +70,7 @@ public class Dots implements Codec {
@Override
public Codec cloneCodec() {
return new Dots();
return new Dots(UUID.randomUUID().toString());
}
@Override

View file

@ -77,16 +77,26 @@ public class Line implements Codec {
/**
* Required constructor.
*
* @param id plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Line(final String id, final Configuration configuration, final Context context) {
this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG),
(id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString());
}
/*
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Line(final Configuration configuration, final Context context) {
this(context, configuration.get(DELIMITER_CONFIG), configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG));
this(null, configuration, context);
}
private Line(Context context, String delimiter, String charsetName, String format) {
private Line(Context context, String delimiter, String charsetName, String format, String id) {
this.context = context;
this.id = UUID.randomUUID().toString();
this.id = id;
this.delimiter = delimiter;
this.charset = Charset.forName(charsetName);
this.format = format;
@ -165,6 +175,6 @@ public class Line implements Codec {
@Override
public Codec cloneCodec() {
return new Line(context, delimiter, charset.name(), format);
return new Line(context, delimiter, charset.name(), format, UUID.randomUUID().toString());
}
}

View file

@ -71,16 +71,25 @@ public class Plain implements Codec {
/**
* Required constructor.
*
* @param id plugin id
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Plain(final String id, final Configuration configuration, final Context context) {
this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG),
(id != null && !id.isEmpty()) ? id : UUID.randomUUID().toString());
}
/**
* @param configuration Logstash Configuration
* @param context Logstash Context
*/
public Plain(final Configuration configuration, final Context context) {
this(context, configuration.get(CHARSET_CONFIG), configuration.get(FORMAT_CONFIG));
this(null, configuration, context);
}
private Plain(Context context, String charsetName, String format) {
private Plain(Context context, String charsetName, String format, String id) {
this.context = context;
this.id = UUID.randomUUID().toString();
this.id = id;
this.charset = Charset.forName(charsetName);
this.format = format;
decoder = charset.newDecoder();
@ -127,6 +136,6 @@ public class Plain implements Codec {
@Override
public Codec cloneCodec() {
return new Plain(context, charset.name(), format);
return new Plain(context, charset.name(), format, UUID.randomUUID().toString());
}
}

View file

@ -0,0 +1,45 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Plugin;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.ConfigurationImpl;
import org.logstash.plugins.PluginLookup;
import org.logstash.plugins.PluginUtil;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
abstract class AbstractPluginCreator<T extends Plugin> {
protected PluginFactoryExt pluginsFactory = null;
abstract IRubyObject createDelegator(String name, Map<String, Object> pluginArgs, String id,
AbstractNamespacedMetricExt typeScopedMetric,
PluginLookup.PluginClass pluginClass, Context pluginContext);
protected T instantiateAndValidate(Map<String, Object> pluginArgs, String id, Context pluginContext,
PluginLookup.PluginClass pluginClass) {
@SuppressWarnings("unchecked")
final Class<T> cls = (Class<T>) pluginClass.klass();
if (cls == null) {
throw new IllegalStateException("Unable to instantiate type: " + pluginClass);
}
try {
final Constructor<T> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
Configuration config = new ConfigurationImpl(pluginArgs, pluginsFactory);
T plugin = ctor.newInstance(id, config, pluginContext);
PluginUtil.validateConfig(plugin, config);
return plugin;
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
throw new IllegalStateException((ex).getCause());
}
throw new IllegalStateException(ex);
}
}
}

View file

@ -0,0 +1,23 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Context;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.JavaCodecDelegator;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.PluginLookup;
import java.util.Map;
class CodecPluginCreator extends AbstractPluginCreator<Codec> {
@Override
public IRubyObject createDelegator(String name, Map<String, Object> pluginArgs, String id,
AbstractNamespacedMetricExt typeScopedMetric,
PluginLookup.PluginClass pluginClass, Context pluginContext) {
Codec codec = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass);
return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec));
}
}

View file

@ -0,0 +1,77 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import org.jruby.Ruby;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.DLQWriterAdapter;
import org.logstash.common.NullDeadLetterQueueWriter;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.ContextImpl;
import org.logstash.plugins.NamespacedMetricImpl;
import org.logstash.plugins.PluginLookup;
@JRubyClass(name = "ExecutionContextFactory")
public final class ExecutionContextFactoryExt extends RubyBasicObject {
private static final long serialVersionUID = 1L;
private IRubyObject agent;
private IRubyObject pipeline;
private IRubyObject dlqWriter;
public ExecutionContextFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public ExecutionContextFactoryExt initialize(final ThreadContext context, final IRubyObject agent,
final IRubyObject pipeline, final IRubyObject dlqWriter) {
this.agent = agent;
this.pipeline = pipeline;
this.dlqWriter = dlqWriter;
return this;
}
@JRubyMethod
public ExecutionContextExt create(final ThreadContext context, final IRubyObject id,
final IRubyObject classConfigName) {
return new ExecutionContextExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS
).initialize(
context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter}
);
}
Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) {
DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance();
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
IRubyObject innerWriter =
((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter)
.innerWriter(RubyUtil.RUBY.getCurrentContext());
if (innerWriter != null) {
if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) {
dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class));
}
}
} else if (dlqWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) {
dlq = dlqWriter.toJava(DeadLetterQueueWriter.class);
}
return new ContextImpl(dlq, new NamespacedMetricImpl(RubyUtil.RUBY.getCurrentContext(), metric));
}
IRubyObject getPipeline() {
return pipeline;
}
}

View file

@ -0,0 +1,21 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Filter;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.config.ir.compiler.JavaFilterDelegatorExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.PluginLookup;
import java.util.Map;
class FilterPluginCreator extends AbstractPluginCreator<Filter> {
@Override
public IRubyObject createDelegator(String name, Map<String, Object> pluginArgs, String id,
AbstractNamespacedMetricExt typeScopedMetric,
PluginLookup.PluginClass pluginClass, Context pluginContext) {
Filter filter = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass);
return JavaFilterDelegatorExt.create(name, id, typeScopedMetric, filter, pluginArgs);
}
}

View file

@ -0,0 +1,27 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Input;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.config.ir.compiler.JavaInputDelegatorExt;
import org.logstash.execution.JavaBasePipelineExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.PluginLookup;
import java.util.Map;
class InputPluginCreator extends AbstractPluginCreator<Input> {
InputPluginCreator(PluginFactoryExt pluginsFactory) {
this.pluginsFactory = pluginsFactory;
}
@Override
public IRubyObject createDelegator(String name, Map<String, Object> pluginArgs, String id,
AbstractNamespacedMetricExt typeScopedMetric,
PluginLookup.PluginClass pluginClass, Context pluginContext) {
Input input = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass);
return JavaInputDelegatorExt.create((JavaBasePipelineExt) pluginsFactory.getExecutionContextFactory().getPipeline(),
typeScopedMetric, input, pluginArgs);
}
}

View file

@ -0,0 +1,25 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Output;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.config.ir.compiler.JavaOutputDelegatorExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.plugins.PluginLookup;
import java.util.Map;
class OutputPluginCreator extends AbstractPluginCreator<Output> {
OutputPluginCreator(PluginFactoryExt pluginsFactory) {
this.pluginsFactory = pluginsFactory;
}
@Override
public IRubyObject createDelegator(String name, Map<String, Object> pluginArgs, String id,
AbstractNamespacedMetricExt typeScopedMetric,
PluginLookup.PluginClass pluginClass, Context pluginContext) {
Output output = instantiateAndValidate(pluginArgs, id, pluginContext, pluginClass);
return JavaOutputDelegatorExt.create(name, id, typeScopedMetric, output);
}
}

View file

@ -0,0 +1,261 @@
package org.logstash.plugins.factory;
import co.elastic.logstash.api.*;
import org.jruby.*;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaUtil;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.EnvironmentVariableProvider;
import org.logstash.common.SourceWithMetadata;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.*;
import org.logstash.config.ir.graph.Vertex;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.plugins.ConfigVariableExpander;
import org.logstash.plugins.PluginLookup;
import java.util.*;
@JRubyClass(name = "PluginFactory")
public final class PluginFactoryExt extends RubyBasicObject
implements RubyIntegration.PluginFactory {
@FunctionalInterface
public interface PluginResolver {
PluginLookup.PluginClass resolve(PluginLookup.PluginType type, String name);
}
private static final long serialVersionUID = 1L;
private static final RubyString ID_KEY = RubyUtil.RUBY.newString("id");
private final Collection<String> pluginsById = new HashSet<>();
private PipelineIR lir;
private ExecutionContextFactoryExt executionContextFactory;
private PluginMetricsFactoryExt metrics;
private RubyClass filterClass;
private ConfigVariableExpander configVariables;
private PluginResolver pluginResolver;
private Map<PluginLookup.PluginType, AbstractPluginCreator<? extends Plugin>> pluginCreatorsRegistry = new HashMap<>(4);
@JRubyMethod(name = "filter_delegator", meta = true, required = 5)
public static IRubyObject filterDelegator(final ThreadContext context,
final IRubyObject recv, final IRubyObject... args) {
final RubyHash arguments = (RubyHash) args[2];
final IRubyObject filterInstance = args[1].callMethod(context, "new", arguments);
final RubyString id = (RubyString) arguments.op_aref(context, ID_KEY);
filterInstance.callMethod(
context, "metric=",
((AbstractMetricExt) args[3]).namespace(context, id.intern())
);
filterInstance.callMethod(context, "execution_context=", args[4]);
return new FilterDelegatorExt(context.runtime, RubyUtil.FILTER_DELEGATOR_CLASS)
.initialize(context, filterInstance, id);
}
public PluginFactoryExt(final Ruby runtime, final RubyClass metaClass) {
this(runtime, metaClass, PluginLookup::lookup);
}
PluginFactoryExt(final Ruby runtime, final RubyClass metaClass, PluginResolver pluginResolver) {
super(runtime, metaClass);
this.pluginResolver = pluginResolver;
}
@JRubyMethod(required = 4)
public PluginFactoryExt initialize(final ThreadContext context,
final IRubyObject[] args) {
return init(
args[0].toJava(PipelineIR.class),
(PluginMetricsFactoryExt) args[1], (ExecutionContextFactoryExt) args[2],
(RubyClass) args[3]
);
}
public PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics,
final ExecutionContextFactoryExt executionContextFactoryExt,
final RubyClass filterClass) {
return this.init(lir, metrics, executionContextFactoryExt, filterClass, EnvironmentVariableProvider.defaultProvider());
}
PluginFactoryExt init(final PipelineIR lir, final PluginMetricsFactoryExt metrics,
final ExecutionContextFactoryExt executionContextFactoryExt,
final RubyClass filterClass,
final EnvironmentVariableProvider envVars) {
this.lir = lir;
this.metrics = metrics;
this.executionContextFactory = executionContextFactoryExt;
this.filterClass = filterClass;
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.INPUT, new InputPluginCreator(this));
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.CODEC, new CodecPluginCreator());
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.FILTER, new FilterPluginCreator());
this.pluginCreatorsRegistry.put(PluginLookup.PluginType.OUTPUT, new OutputPluginCreator(this));
this.configVariables = ConfigVariableExpander.withoutSecret(envVars);
return this;
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildInput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.INPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractOutputDelegatorExt buildOutput(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return (AbstractOutputDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.OUTPUT, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public AbstractFilterDelegatorExt buildFilter(final RubyString name, SourceWithMetadata source,
final IRubyObject args, Map<String, Object> pluginArgs) {
return (AbstractFilterDelegatorExt) plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.FILTER, name.asJavaString(),
source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@SuppressWarnings("unchecked")
@Override
public IRubyObject buildCodec(final RubyString name, SourceWithMetadata source, final IRubyObject args,
Map<String, Object> pluginArgs) {
return plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
name.asJavaString(), source, (Map<String, IRubyObject>) args, pluginArgs
);
}
@Override
public Codec buildDefaultCodec(String codecName) {
return (Codec) JavaUtil.unwrapJavaValue(plugin(
RubyUtil.RUBY.getCurrentContext(), PluginLookup.PluginType.CODEC,
codecName, null, Collections.emptyMap(), Collections.emptyMap()
));
}
@SuppressWarnings("unchecked")
@JRubyMethod(required = 3, optional = 1)
public IRubyObject plugin(final ThreadContext context, final IRubyObject[] args) {
return plugin(
context,
PluginLookup.PluginType.valueOf(args[0].asJavaString().toUpperCase(Locale.ENGLISH)),
args[1].asJavaString(),
JavaUtil.unwrapIfJavaObject(args[2]),
args.length > 3 ? (Map<String, IRubyObject>) args[3] : new HashMap<>(),
null
);
}
@SuppressWarnings("unchecked")
private IRubyObject plugin(final ThreadContext context, final PluginLookup.PluginType type, final String name,
SourceWithMetadata source, final Map<String, IRubyObject> args,
Map<String, Object> pluginArgs) {
final String id = generateOrRetrievePluginId(context, type, name, source);
pluginsById.add(id);
final AbstractNamespacedMetricExt typeScopedMetric = metrics.create(context, type.rubyLabel());
final PluginLookup.PluginClass pluginClass = pluginResolver.resolve(type, name);
if (pluginClass.language() == PluginLookup.PluginLanguage.RUBY) {
final Map<String, Object> newArgs = new HashMap<>(args);
newArgs.put("id", id);
final RubyClass klass = (RubyClass) pluginClass.klass();
final ExecutionContextExt executionCntx = executionContextFactory.create(
context, RubyUtil.RUBY.newString(id), klass.callMethod(context, "config_name")
);
final RubyHash rubyArgs = RubyHash.newHash(context.runtime);
rubyArgs.putAll(newArgs);
if (type == PluginLookup.PluginType.OUTPUT) {
return new OutputDelegatorExt(context.runtime, RubyUtil.RUBY_OUTPUT_DELEGATOR_CLASS).initialize(
context,
new IRubyObject[]{
klass, typeScopedMetric, executionCntx,
OutputStrategyExt.OutputStrategyRegistryExt.instance(context, null),
rubyArgs
}
);
} else if (type == PluginLookup.PluginType.FILTER) {
return filterDelegator(
context, null,
filterClass, klass, rubyArgs, typeScopedMetric, executionCntx);
} else {
final IRubyObject pluginInstance = klass.callMethod(context, "new", rubyArgs);
final AbstractNamespacedMetricExt scopedMetric = typeScopedMetric.namespace(context, RubyUtil.RUBY.newSymbol(id));
scopedMetric.gauge(context, MetricKeys.NAME_KEY, pluginInstance.callMethod(context, "config_name"));
pluginInstance.callMethod(context, "metric=", scopedMetric);
pluginInstance.callMethod(context, "execution_context=", executionCntx);
return pluginInstance;
}
} else {
if (pluginArgs == null) {
String err = String.format("Cannot start the Java plugin '%s' in the Ruby execution engine." +
" The Java execution engine is required to run Java plugins.", name);
throw new IllegalStateException(err);
}
AbstractPluginCreator<? extends Plugin> pluginCreator = pluginCreatorsRegistry.get(type);
if (pluginCreator == null) {
throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString());
}
Context contextWithMetrics = executionContextFactory.toContext(type, metrics.getRoot(context));
return pluginCreator.createDelegator(name, pluginArgs, id, typeScopedMetric, pluginClass, contextWithMetrics);
}
}
private String generateOrRetrievePluginId(ThreadContext context, PluginLookup.PluginType type, String name,
SourceWithMetadata source) {
final String id;
if (type == PluginLookup.PluginType.CODEC) {
id = UUID.randomUUID().toString();
} else {
String unresolvedId = lir.getGraph().vertices()
.filter(v -> v.getSourceWithMetadata() != null
&& v.getSourceWithMetadata().equalsWithoutText(source))
.findFirst()
.map(Vertex::getId).orElse(null);
id = (String) configVariables.expand(unresolvedId);
}
if (id == null) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format(
"Could not determine ID for %s/%s", type.rubyLabel().asJavaString(), name
)
);
}
if (pluginsById.contains(id)) {
throw context.runtime.newRaiseException(
RubyUtil.CONFIGURATION_ERROR_CLASS,
String.format("Two plugins have the id '%s', please fix this conflict", id)
);
}
return id;
}
ExecutionContextFactoryExt getExecutionContextFactory() {
return executionContextFactory;
}
}

View file

@ -0,0 +1,61 @@
package org.logstash.plugins.factory;
import org.jruby.*;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.NullMetricExt;
import java.util.Arrays;
@JRubyClass(name = "PluginMetricsFactory")
public final class PluginMetricsFactoryExt extends RubyBasicObject {
private static final long serialVersionUID = 1L;
private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins");
private RubySymbol pipelineId;
private AbstractMetricExt metric;
public PluginMetricsFactoryExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public PluginMetricsFactoryExt initialize(final ThreadContext context,
final IRubyObject pipelineId, final IRubyObject metrics) {
this.pipelineId = pipelineId.convertToString().intern();
if (metrics.isNil()) {
this.metric = new NullMetricExt(context.runtime, RubyUtil.NULL_METRIC_CLASS);
} else {
this.metric = (AbstractMetricExt) metrics;
}
return this;
}
AbstractNamespacedMetricExt getRoot(final ThreadContext context) {
return metric.namespace(
context,
RubyArray.newArray(
context.runtime,
Arrays.asList(
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS
)
)
);
}
@JRubyMethod
public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) {
return getRoot(context).namespace(
context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString()))
);
}
}

View file

@ -51,7 +51,7 @@ public abstract class MetricTestCase extends RubyEnvTestCase {
executionContext = new ExecutionContextExt(RUBY, EXECUTION_CONTEXT_CLASS);
}
protected static IRubyObject runRubyScript(String script) {
public static IRubyObject runRubyScript(String script) {
IRubyObject m = RUBY.evalScriptlet(script);
return m;
}

View file

@ -17,8 +17,7 @@
* under the License.
*/
package org.logstash.plugins;
package org.logstash.plugins.factory;
import co.elastic.logstash.api.*;
import org.jruby.RubyHash;
@ -32,6 +31,8 @@ import org.logstash.config.ir.InvalidIRException;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.RubyEnvTestCase;
import org.logstash.instrument.metrics.NamespacedMetricExt;
import org.logstash.plugins.MetricTestCase;
import org.logstash.plugins.PluginLookup;
import java.util.Collection;
import java.util.Collections;
@ -42,10 +43,9 @@ import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
import static org.logstash.RubyUtil.NAMESPACED_METRIC_CLASS;
import static org.logstash.RubyUtil.RUBY;
import static org.logstash.plugins.MetricTestCase.runRubyScript;
/**
* Tests for {@link PluginFactoryExt.Plugins}.
* Tests for {@link PluginFactoryExt}.
*/
public final class PluginFactoryExtTest extends RubyEnvTestCase {
@ -87,11 +87,11 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
SourceWithMetadata sourceWithMetadata = new SourceWithMetadata("proto", "path", 1, 8, "input {mockinput{ id => \"${CUSTOM}\"}} output{mockoutput{}}");
final PipelineIR pipelineIR = compilePipeline(sourceWithMetadata);
PluginFactoryExt.Metrics metricsFactory = createMetricsFactory();
PluginFactoryExt.ExecutionContext execContextFactory = createExecutionContextFactory();
PluginMetricsFactoryExt metricsFactory = createMetricsFactory();
ExecutionContextFactoryExt execContextFactory = createExecutionContextFactory();
Map<String, String> envVars = new HashMap<>();
envVars.put("CUSTOM", "test");
PluginFactoryExt.Plugins sut = new PluginFactoryExt.Plugins(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS,
PluginFactoryExt sut = new PluginFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_FACTORY_CLASS,
mockPluginResolver);
sut.init(pipelineIR, metricsFactory, execContextFactory, RubyUtil.FILTER_DELEGATOR_CLASS, envVars::get);
@ -110,24 +110,24 @@ public final class PluginFactoryExtTest extends RubyEnvTestCase {
return ConfigCompiler.configToPipelineIR(Collections.singletonList(sourceWithMetadata), false);
}
private static PluginFactoryExt.ExecutionContext createExecutionContextFactory() {
PluginFactoryExt.ExecutionContext execContextFactory = new PluginFactoryExt.ExecutionContext(RubyUtil.RUBY,
private static ExecutionContextFactoryExt createExecutionContextFactory() {
ExecutionContextFactoryExt execContextFactory = new ExecutionContextFactoryExt(RubyUtil.RUBY,
RubyUtil.EXECUTION_CONTEXT_FACTORY_CLASS);
execContextFactory.initialize(RubyUtil.RUBY.getCurrentContext(), null, null,
RubyUtil.RUBY.newString("no DLQ"));
return execContextFactory;
}
private static PluginFactoryExt.Metrics createMetricsFactory() {
private static PluginMetricsFactoryExt createMetricsFactory() {
final IRubyObject metricWithCollector =
runRubyScript("require \"logstash/instrument/collector\"\n" +
MetricTestCase.runRubyScript("require \"logstash/instrument/collector\"\n" +
"metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)");
NamespacedMetricExt metric = new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS)
.initialize(RUBY.getCurrentContext(), metricWithCollector, RUBY.newEmptyArray());
PluginFactoryExt.Metrics metricsFactory = new PluginFactoryExt.Metrics(RubyUtil.RUBY, RubyUtil.PLUGIN_METRIC_FACTORY_CLASS);
PluginMetricsFactoryExt metricsFactory = new PluginMetricsFactoryExt(RubyUtil.RUBY, RubyUtil.PLUGIN_METRICS_FACTORY_CLASS);
metricsFactory.initialize(RubyUtil.RUBY.getCurrentContext(), RubyUtil.RUBY.newString("main"), metric);
return metricsFactory;
}