mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
Expose Metric API to Java plugins and migrate JavaCodecDelegator to new Metric API
This commit is contained in:
parent
5eb569719c
commit
7c32f0fcf3
19 changed files with 647 additions and 60 deletions
|
@ -70,6 +70,8 @@ task javaTests(type: Test) {
|
|||
exclude '/org/logstash/config/ir/CompiledPipelineTest.class'
|
||||
exclude '/org/logstash/config/ir/compiler/OutputDelegatorTest.class'
|
||||
exclude '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
||||
exclude '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
||||
exclude '/org/logstash/plugins/CounterMetricImplTest.class'
|
||||
}
|
||||
|
||||
task rubyTests(type: Test) {
|
||||
|
@ -81,6 +83,8 @@ task rubyTests(type: Test) {
|
|||
include '/org/logstash/config/ir/CompiledPipelineTest.class'
|
||||
include '/org/logstash/config/ir/compiler/OutputDelegatorTest.class'
|
||||
include '/org/logstash/config/ir/compiler/JavaCodecDelegatorTest.class'
|
||||
include '/org/logstash/plugins/NamespacedMetricImplTest.class'
|
||||
include '/org/logstash/plugins/CounterMetricImplTest.class'
|
||||
}
|
||||
|
||||
test {
|
||||
|
|
|
@ -15,6 +15,14 @@ public interface Context {
|
|||
*/
|
||||
DeadLetterQueueWriter getDlqWriter();
|
||||
|
||||
/**
|
||||
* Provides a metric namespace scoped to the given {@code plugin} that metrics can be written to and
|
||||
* can be nested deeper with further namespaces.
|
||||
* @param plugin The plugin the metric should be scoped to
|
||||
* @return A metric scoped to the current plugin
|
||||
*/
|
||||
NamespacedMetric getMetric(Plugin plugin);
|
||||
|
||||
/**
|
||||
* Provides a {@link Logger} instance to plugins.
|
||||
* @param plugin The plugin for which the logger should be supplied.
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
package co.elastic.logstash.api;
|
||||
|
||||
/**
|
||||
* A counter metric that tracks a single counter.
|
||||
*
|
||||
* You can retrieve an instance of this class using {@link NamespacedMetric#counter(String)}.
|
||||
*/
|
||||
public interface CounterMetric {
|
||||
/**
|
||||
* Increments the counter by 1.
|
||||
*/
|
||||
void increment();
|
||||
|
||||
/**
|
||||
* Increments the counter by {@code delta}.
|
||||
*
|
||||
* @param delta amount to increment the counter by
|
||||
*/
|
||||
void increment(long delta);
|
||||
|
||||
/**
|
||||
* Gets the current value of the counter.
|
||||
*
|
||||
* @return the counter value
|
||||
*/
|
||||
long getValue();
|
||||
|
||||
/**
|
||||
* Sets the counter back to 0.
|
||||
*/
|
||||
void reset();
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package co.elastic.logstash.api;
|
||||
|
||||
/**
|
||||
* Represents a metric namespace that other namespaces can nested within.
|
||||
*/
|
||||
public interface Metric {
|
||||
/**
|
||||
* Creates a namespace under the current {@link Metric} and returns it.
|
||||
*
|
||||
* @param key namespace to traverse into
|
||||
* @return the {@code key} namespace under the current Metric
|
||||
*/
|
||||
NamespacedMetric namespace(String... key);
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package co.elastic.logstash.api;
|
||||
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Represents a nested namespace that metrics can be written into and other namespaces
|
||||
* can be nested within.
|
||||
*/
|
||||
public interface NamespacedMetric extends Metric {
|
||||
/**
|
||||
* Writes an absolute value to the {@code metric}.
|
||||
*
|
||||
* @param metric metric to write value to
|
||||
* @param value value to write
|
||||
*/
|
||||
void gauge(String metric, Object value);
|
||||
|
||||
/**
|
||||
* Creates a counter with the name {@code metric}.
|
||||
*
|
||||
* @param metric name of the counter
|
||||
* @return an instance tracking a counter metric allowing easy incrementing and resetting
|
||||
*/
|
||||
CounterMetric counter(String metric);
|
||||
|
||||
/**
|
||||
* Increment the {@code metric} metric by 1.
|
||||
*
|
||||
* @param metric metric to increment
|
||||
*/
|
||||
void increment(String metric);
|
||||
|
||||
/**
|
||||
* Increment the {@code metric} by {@code delta}.
|
||||
*
|
||||
* @param metric metric to increment
|
||||
* @param delta amount to increment by
|
||||
*/
|
||||
void increment(String metric, int delta);
|
||||
|
||||
/**
|
||||
* Times the {@code callable} and returns its value and increments the
|
||||
* {@code metric} with the time taken.
|
||||
*
|
||||
* @param metric metric to increment
|
||||
* @param callable callable to time
|
||||
* @param <T> return type of the {@code callable}
|
||||
* @return the return value from the {@code callable}
|
||||
*/
|
||||
<T> T time(String metric, Supplier<T> callable);
|
||||
|
||||
/**
|
||||
* Increments the {@code metric} by {@code duration}.
|
||||
*
|
||||
* @param metric metric to increment
|
||||
* @param duration duration to increment by
|
||||
*/
|
||||
void reportTime(String metric, long duration);
|
||||
|
||||
/**
|
||||
* Retrieves each namespace component that makes up this metric.
|
||||
*
|
||||
* @return the namespaces this metric is nested within
|
||||
*/
|
||||
String[] namespaceName();
|
||||
|
||||
/**
|
||||
* Gets Logstash's root metric namespace.
|
||||
*
|
||||
* @return the root namespace
|
||||
*/
|
||||
Metric root();
|
||||
}
|
|
@ -1,7 +1,11 @@
|
|||
package org.logstash.config.ir.compiler;
|
||||
|
||||
import co.elastic.logstash.api.Codec;
|
||||
import co.elastic.logstash.api.Context;
|
||||
import co.elastic.logstash.api.CounterMetric;
|
||||
import co.elastic.logstash.api.Event;
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import co.elastic.logstash.api.PluginConfigSpec;
|
||||
import org.jruby.RubySymbol;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
|
@ -20,45 +24,39 @@ import java.util.function.Consumer;
|
|||
|
||||
public class JavaCodecDelegator implements Codec {
|
||||
|
||||
public static final RubySymbol ENCODE_KEY = RubyUtil.RUBY.newSymbol("encode");
|
||||
public static final RubySymbol DECODE_KEY = RubyUtil.RUBY.newSymbol("decode");
|
||||
public static final RubySymbol IN_KEY = RubyUtil.RUBY.newSymbol("writes_in");
|
||||
public static final String ENCODE_KEY = "encode";
|
||||
public static final String DECODE_KEY = "decode";
|
||||
public static final String IN_KEY = "writes_in";
|
||||
|
||||
private final Codec codec;
|
||||
|
||||
protected final AbstractNamespacedMetricExt metricEncode;
|
||||
protected final CounterMetric encodeMetricIn;
|
||||
|
||||
protected final AbstractNamespacedMetricExt metricDecode;
|
||||
protected final CounterMetric encodeMetricTime;
|
||||
|
||||
protected final LongCounter encodeMetricIn;
|
||||
protected final CounterMetric decodeMetricIn;
|
||||
|
||||
protected final LongCounter encodeMetricTime;
|
||||
protected final CounterMetric decodeMetricOut;
|
||||
|
||||
protected final LongCounter decodeMetricIn;
|
||||
|
||||
protected final LongCounter decodeMetricOut;
|
||||
|
||||
protected final LongCounter decodeMetricTime;
|
||||
protected final CounterMetric decodeMetricTime;
|
||||
|
||||
|
||||
public JavaCodecDelegator(final AbstractNamespacedMetricExt metric,
|
||||
final Codec codec) {
|
||||
public JavaCodecDelegator(final Context context, final Codec codec) {
|
||||
this.codec = codec;
|
||||
|
||||
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
|
||||
final AbstractNamespacedMetricExt namespacedMetric =
|
||||
metric.namespace(context, RubyUtil.RUBY.newSymbol(codec.getId()));
|
||||
synchronized(namespacedMetric.getMetric()) {
|
||||
metricEncode = namespacedMetric.namespace(context, ENCODE_KEY);
|
||||
encodeMetricIn = LongCounter.fromRubyBase(metricEncode, IN_KEY);
|
||||
encodeMetricTime = LongCounter.fromRubyBase(metricEncode, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
final NamespacedMetric metric = context.getMetric(codec);
|
||||
|
||||
metricDecode = namespacedMetric.namespace(context, DECODE_KEY);
|
||||
decodeMetricIn = LongCounter.fromRubyBase(metricDecode, IN_KEY);
|
||||
decodeMetricOut = LongCounter.fromRubyBase(metricDecode, MetricKeys.OUT_KEY);
|
||||
decodeMetricTime = LongCounter.fromRubyBase(metricDecode, MetricKeys.DURATION_IN_MILLIS_KEY);
|
||||
synchronized(metric.root()) {
|
||||
metric.gauge(MetricKeys.NAME_KEY.asJavaString(), codec.getName());
|
||||
|
||||
namespacedMetric.gauge(context, MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(codec.getName()));
|
||||
final NamespacedMetric encodeMetric = metric.namespace(ENCODE_KEY);
|
||||
encodeMetricIn = encodeMetric.counter(IN_KEY);
|
||||
encodeMetricTime = encodeMetric.counter(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
|
||||
|
||||
final NamespacedMetric decodeMetric = metric.namespace(DECODE_KEY);
|
||||
decodeMetricIn = decodeMetric.counter(IN_KEY);
|
||||
decodeMetricOut = decodeMetric.counter(MetricKeys.OUT_KEY.asJavaString());
|
||||
decodeMetricTime = decodeMetric.counter(MetricKeys.DURATION_IN_MILLIS_KEY.asJavaString());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,8 @@ package org.logstash.plugins;
|
|||
import co.elastic.logstash.api.Context;
|
||||
import co.elastic.logstash.api.Event;
|
||||
import co.elastic.logstash.api.EventFactory;
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import co.elastic.logstash.api.Plugin;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
@ -16,8 +18,14 @@ public class ContextImpl implements Context {
|
|||
|
||||
private DeadLetterQueueWriter dlqWriter;
|
||||
|
||||
public ContextImpl(DeadLetterQueueWriter dlqWriter) {
|
||||
/**
|
||||
* This is a reference to the [stats, pipelines, *name*, plugins] metric namespace.
|
||||
*/
|
||||
private Metric pluginsScopedMetric;
|
||||
|
||||
public ContextImpl(DeadLetterQueueWriter dlqWriter, Metric metric) {
|
||||
this.dlqWriter = dlqWriter;
|
||||
this.pluginsScopedMetric = metric;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -25,6 +33,11 @@ public class ContextImpl implements Context {
|
|||
return dlqWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespacedMetric getMetric(Plugin plugin) {
|
||||
return pluginsScopedMetric.namespace(PluginLookup.PluginType.getTypeByPlugin(plugin).metricNamespace(), plugin.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Logger getLogger(Plugin plugin) {
|
||||
return LogManager.getLogger(plugin.getClass());
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.CounterMetric;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||
import org.logstash.instrument.metrics.counter.LongCounter;
|
||||
|
||||
public class CounterMetricImpl implements CounterMetric {
|
||||
private LongCounter longCounter;
|
||||
|
||||
public CounterMetricImpl(final ThreadContext threadContext,
|
||||
final AbstractNamespacedMetricExt metrics,
|
||||
final String metric) {
|
||||
this.longCounter = LongCounter.fromRubyBase(metrics, threadContext.getRuntime().newSymbol(metric));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increment() {
|
||||
this.longCounter.increment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increment(final long delta) {
|
||||
this.longCounter.increment(delta);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getValue() {
|
||||
return this.longCounter.getValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
this.longCounter.reset();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.CounterMetric;
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.RubySymbol;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.Rubyfier;
|
||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Wraps a {@link AbstractNamespacedMetricExt} that represents a nested namespace and adds
|
||||
* metrics and other namespaces to it.
|
||||
*/
|
||||
public class NamespacedMetricImpl implements NamespacedMetric {
|
||||
private final ThreadContext threadContext;
|
||||
private final AbstractNamespacedMetricExt metrics;
|
||||
|
||||
public NamespacedMetricImpl(final ThreadContext threadContext, final AbstractNamespacedMetricExt metrics) {
|
||||
this.threadContext = threadContext;
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void gauge(final String key, final Object value) {
|
||||
this.metrics.gauge(this.threadContext, this.getSymbol(key), Rubyfier.deep(this.threadContext.getRuntime(), value));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CounterMetric counter(final String metric) {
|
||||
return new CounterMetricImpl(this.threadContext, this.metrics, metric);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespacedMetric namespace(final String... key) {
|
||||
final IRubyObject[] rubyfiedKeys = Stream.of(key)
|
||||
.map(this::getSymbol)
|
||||
.toArray(IRubyObject[]::new);
|
||||
|
||||
return new NamespacedMetricImpl(
|
||||
this.threadContext,
|
||||
this.metrics.namespace(this.threadContext, RubyArray.newArray(this.threadContext.getRuntime(), rubyfiedKeys))
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increment(final String key) {
|
||||
this.metrics.increment(this.threadContext, new IRubyObject[] { this.getSymbol(key) });
|
||||
}
|
||||
|
||||
@Override
|
||||
public void increment(final String key, final int amount) {
|
||||
this.metrics.increment(this.threadContext, new IRubyObject[] {
|
||||
this.getSymbol(key),
|
||||
this.convert(amount)
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T time(final String key, final Supplier<T> callable) {
|
||||
final long start = System.nanoTime();
|
||||
final T ret = callable.get();
|
||||
final long end = System.nanoTime();
|
||||
this.reportTime(key, TimeUnit.NANOSECONDS.toMillis(end - start));
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reportTime(final String key, final long duration) {
|
||||
this.metrics.reportTime(this.threadContext, this.getSymbol(key), this.convert(duration));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] namespaceName() {
|
||||
final List<String> names = new ArrayList<>();
|
||||
|
||||
for (final Object o : this.metrics.namespaceName(this.threadContext)) {
|
||||
if (o instanceof RubyObject) {
|
||||
names.add(((RubyObject) o).to_s().toString());
|
||||
}
|
||||
}
|
||||
|
||||
return names.toArray(new String[0]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metric root() {
|
||||
return new RootMetricImpl(this.threadContext, this.metrics.root(this.threadContext));
|
||||
}
|
||||
|
||||
private RubySymbol getSymbol(final String s) {
|
||||
return this.threadContext.getRuntime().newSymbol(s);
|
||||
}
|
||||
|
||||
private IRubyObject convert(final Object o) {
|
||||
return Rubyfier.deep(this.threadContext.getRuntime(), o);
|
||||
}
|
||||
}
|
|
@ -274,7 +274,7 @@ public final class PluginFactoryExt {
|
|||
try {
|
||||
final Constructor<Output> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
|
||||
Configuration config = new ConfigurationImpl(pluginArgs, this);
|
||||
output = ctor.newInstance(id, config, executionContext.toContext(type));
|
||||
output = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
|
||||
PluginUtil.validateConfig(output, config);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
|
||||
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
|
||||
|
@ -296,7 +296,7 @@ public final class PluginFactoryExt {
|
|||
try {
|
||||
final Constructor<Filter> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
|
||||
Configuration config = new ConfigurationImpl(pluginArgs);
|
||||
filter = ctor.newInstance(id, config, executionContext.toContext(type));
|
||||
filter = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
|
||||
PluginUtil.validateConfig(filter, config);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
|
||||
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
|
||||
|
@ -318,7 +318,7 @@ public final class PluginFactoryExt {
|
|||
try {
|
||||
final Constructor<Input> ctor = cls.getConstructor(String.class, Configuration.class, Context.class);
|
||||
Configuration config = new ConfigurationImpl(pluginArgs, this);
|
||||
input = ctor.newInstance(id, config, executionContext.toContext(type));
|
||||
input = ctor.newInstance(id, config, executionContext.toContext(type, metrics.getRoot(context)));
|
||||
PluginUtil.validateConfig(input, config);
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
|
||||
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
|
||||
|
@ -335,13 +335,14 @@ public final class PluginFactoryExt {
|
|||
}
|
||||
} else if (type == PluginLookup.PluginType.CODEC) {
|
||||
final Class<Codec> cls = (Class<Codec>) pluginClass.klass();
|
||||
Codec codec = null;
|
||||
if (cls != null) {
|
||||
try {
|
||||
final Constructor<Codec> ctor = cls.getConstructor(Configuration.class, Context.class);
|
||||
Configuration config = new ConfigurationImpl(pluginArgs);
|
||||
codec = ctor.newInstance(config, executionContext.toContext(type));
|
||||
final Context pluginContext = executionContext.toContext(type, metrics.getRoot(context));
|
||||
final Codec codec = ctor.newInstance(config, pluginContext);
|
||||
PluginUtil.validateConfig(codec, config);
|
||||
return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(pluginContext, codec));
|
||||
} catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException ex) {
|
||||
if (ex instanceof InvocationTargetException && ex.getCause() != null) {
|
||||
throw new IllegalStateException((ex).getCause());
|
||||
|
@ -350,11 +351,7 @@ public final class PluginFactoryExt {
|
|||
}
|
||||
}
|
||||
|
||||
if (codec != null) {
|
||||
return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(typeScopedMetric, codec));
|
||||
} else {
|
||||
throw new IllegalStateException("Unable to instantiate codec: " + pluginClass);
|
||||
}
|
||||
throw new IllegalStateException("Unable to instantiate codec: " + pluginClass);
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException("Unable to create plugin: " + pluginClass.toReadableString());
|
||||
|
@ -397,7 +394,7 @@ public final class PluginFactoryExt {
|
|||
);
|
||||
}
|
||||
|
||||
public Context toContext(PluginLookup.PluginType pluginType) {
|
||||
public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) {
|
||||
DeadLetterQueueWriter dlq = null;
|
||||
if (pluginType == PluginLookup.PluginType.OUTPUT) {
|
||||
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
|
||||
|
@ -413,7 +410,7 @@ public final class PluginFactoryExt {
|
|||
}
|
||||
}
|
||||
|
||||
return new ContextImpl(dlq);
|
||||
return new ContextImpl(dlq, new NamespacedMetricImpl(RubyUtil.RUBY.getCurrentContext(), metric));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -444,8 +441,7 @@ public final class PluginFactoryExt {
|
|||
return this;
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) {
|
||||
AbstractNamespacedMetricExt getRoot(final ThreadContext context) {
|
||||
return metric.namespace(
|
||||
context,
|
||||
RubyArray.newArray(
|
||||
|
@ -454,7 +450,12 @@ public final class PluginFactoryExt {
|
|||
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS
|
||||
)
|
||||
)
|
||||
).namespace(
|
||||
);
|
||||
}
|
||||
|
||||
@JRubyMethod
|
||||
public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) {
|
||||
return getRoot(context).namespace(
|
||||
context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString()))
|
||||
);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,10 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.Codec;
|
||||
import co.elastic.logstash.api.Filter;
|
||||
import co.elastic.logstash.api.Input;
|
||||
import co.elastic.logstash.api.Output;
|
||||
import co.elastic.logstash.api.Plugin;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyString;
|
||||
import org.jruby.java.proxies.JavaProxy;
|
||||
|
@ -8,6 +13,9 @@ import org.jruby.runtime.builtin.IRubyObject;
|
|||
import org.logstash.RubyUtil;
|
||||
import org.logstash.plugins.discovery.PluginRegistry;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Java Implementation of the plugin that is implemented by wrapping the Ruby
|
||||
* {@code LogStash::Plugin} class for the Ruby plugin lookup.
|
||||
|
@ -93,16 +101,48 @@ public final class PluginLookup {
|
|||
}
|
||||
|
||||
public enum PluginType {
|
||||
INPUT("input"), FILTER("filter"), OUTPUT("output"), CODEC("codec");
|
||||
INPUT("input", "inputs", Input.class),
|
||||
FILTER("filter", "filters", Filter.class),
|
||||
OUTPUT("output", "outputs", Output.class),
|
||||
CODEC("codec", "codecs", Codec.class);
|
||||
|
||||
private final RubyString label;
|
||||
private final RubyString rubyLabel;
|
||||
private final String metricNamespace;
|
||||
private final Class<? extends Plugin> pluginClass;
|
||||
|
||||
PluginType(final String label) {
|
||||
this.label = RubyUtil.RUBY.newString(label);
|
||||
PluginType(final String label, final String metricNamespace, final Class<? extends Plugin> pluginClass) {
|
||||
this.rubyLabel = RubyUtil.RUBY.newString(label);
|
||||
this.metricNamespace = metricNamespace;
|
||||
this.pluginClass = pluginClass;
|
||||
}
|
||||
|
||||
public RubyString rubyLabel() {
|
||||
return label;
|
||||
return rubyLabel;
|
||||
}
|
||||
|
||||
public String metricNamespace() {
|
||||
return metricNamespace;
|
||||
}
|
||||
|
||||
public Class<? extends Plugin> pluginClass() {
|
||||
return pluginClass;
|
||||
}
|
||||
|
||||
public static PluginType getTypeByPlugin(Plugin plugin) {
|
||||
for (final PluginType type : PluginType.values()) {
|
||||
if (type.pluginClass().isInstance(plugin)) {
|
||||
return type;
|
||||
}
|
||||
}
|
||||
|
||||
final String allowedPluginTypes = Stream.of(PluginType.values())
|
||||
.map((t) -> t.pluginClass().getName()).collect(Collectors.joining(", "));
|
||||
|
||||
throw new IllegalArgumentException(String.format(
|
||||
"Plugin [%s] does not extend one of: %s",
|
||||
plugin.getName(),
|
||||
allowedPluginTypes
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.runtime.ThreadContext;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.logstash.instrument.metrics.AbstractMetricExt;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Wraps a {@link AbstractMetricExt} and represents a "root metric" that must be
|
||||
* namespaced in order to write metrics to.
|
||||
*/
|
||||
public class RootMetricImpl implements Metric {
|
||||
private final ThreadContext threadContext;
|
||||
private final AbstractMetricExt metrics;
|
||||
|
||||
public RootMetricImpl(final ThreadContext threadContext, final AbstractMetricExt root) {
|
||||
this.threadContext = threadContext;
|
||||
this.metrics = root;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespacedMetric namespace(final String... key) {
|
||||
final IRubyObject[] rubyfiedKeys = Stream.of(key)
|
||||
.map(this.threadContext.getRuntime()::newSymbol)
|
||||
.toArray(IRubyObject[]::new);
|
||||
|
||||
return new NamespacedMetricImpl(
|
||||
this.threadContext,
|
||||
this.metrics.namespace(this.threadContext, RubyArray.newArray(this.threadContext.getRuntime(), rubyfiedKeys))
|
||||
);
|
||||
}
|
||||
}
|
|
@ -2,11 +2,14 @@ package org.logstash.config.ir.compiler;
|
|||
|
||||
import co.elastic.logstash.api.Codec;
|
||||
import co.elastic.logstash.api.Event;
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.PluginConfigSpec;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.jruby.RubyHash;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.logstash.plugins.ContextImpl;
|
||||
import org.logstash.plugins.MetricTestCase;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -19,7 +22,7 @@ import java.util.function.Consumer;
|
|||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class JavaCodecDelegatorTest extends PluginDelegatorTestCase {
|
||||
public class JavaCodecDelegatorTest extends MetricTestCase {
|
||||
private Codec codec;
|
||||
|
||||
@Before
|
||||
|
@ -31,11 +34,6 @@ public class JavaCodecDelegatorTest extends PluginDelegatorTestCase {
|
|||
super.setup();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getBaseMetricsPath() {
|
||||
return "codec/foo";
|
||||
}
|
||||
|
||||
@Test
|
||||
public void plainCodecDelegatorInitializesCleanly() {
|
||||
constructCodecDelegator();
|
||||
|
@ -44,7 +42,7 @@ public class JavaCodecDelegatorTest extends PluginDelegatorTestCase {
|
|||
@Test
|
||||
public void plainCodecPluginPushesPluginNameToMetric() {
|
||||
constructCodecDelegator();
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"codec", "foo"});
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"codecs", "foo"});
|
||||
final String pluginName = getMetricStringValue(metricStore, "name");
|
||||
|
||||
assertEquals(codec.getName(), pluginName);
|
||||
|
@ -192,7 +190,7 @@ public class JavaCodecDelegatorTest extends PluginDelegatorTestCase {
|
|||
}
|
||||
|
||||
private RubyHash getMetricStore(final String type) {
|
||||
return getMetricStore(new String[]{"codec", "foo", type});
|
||||
return getMetricStore(new String[]{"codecs", "foo", type});
|
||||
}
|
||||
|
||||
private long getMetricLongValue(final String type, final String symbolName) {
|
||||
|
@ -200,7 +198,7 @@ public class JavaCodecDelegatorTest extends PluginDelegatorTestCase {
|
|||
}
|
||||
|
||||
private JavaCodecDelegator constructCodecDelegator() {
|
||||
return new JavaCodecDelegator(metric, codec);
|
||||
return new JavaCodecDelegator(new ContextImpl(null, this.getInstance()), codec);
|
||||
}
|
||||
|
||||
private abstract class AbstractCodec implements Codec {
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.CounterMetric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class CounterMetricImplTest extends MetricTestCase {
|
||||
@Test
|
||||
public void testIncrement() {
|
||||
final NamespacedMetric namespace = this.getInstance().namespace("ayo");
|
||||
final CounterMetric metric = namespace.counter("abcdef");
|
||||
metric.increment();
|
||||
assertThat(metric.getValue()).isEqualTo(1);
|
||||
assertThat(getMetricLongValue(getMetricStore(new String[]{"ayo"}), "abcdef")).isEqualTo(1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementByAmount() {
|
||||
final NamespacedMetric namespace = this.getInstance().namespace("ayo");
|
||||
final CounterMetric metric = namespace.counter("abcdef");
|
||||
metric.increment(5);
|
||||
assertThat(metric.getValue()).isEqualTo(5);
|
||||
assertThat(getMetricLongValue(getMetricStore(new String[]{"ayo"}), "abcdef")).isEqualTo(5);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReset() {
|
||||
final NamespacedMetric namespace = this.getInstance().namespace("ayo");
|
||||
final CounterMetric metric = namespace.counter("abcdef");
|
||||
|
||||
metric.increment(1);
|
||||
assertThat(metric.getValue()).isEqualTo(1);
|
||||
assertThat(getMetricLongValue(getMetricStore(new String[]{"ayo"}), "abcdef")).isEqualTo(1);
|
||||
|
||||
metric.reset();
|
||||
assertThat(metric.getValue()).isEqualTo(0);
|
||||
assertThat(getMetricLongValue(getMetricStore(new String[]{"ayo"}), "abcdef")).isEqualTo(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import com.google.common.base.Joiner;
|
||||
import org.jruby.RubyFixnum;
|
||||
import org.jruby.RubyHash;
|
||||
import org.jruby.RubyString;
|
||||
import org.jruby.java.proxies.ConcreteJavaProxy;
|
||||
import org.jruby.runtime.builtin.IRubyObject;
|
||||
import org.junit.Before;
|
||||
import org.logstash.RubyUtil;
|
||||
import org.logstash.config.ir.RubyEnvTestCase;
|
||||
import org.logstash.execution.ExecutionContextExt;
|
||||
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
|
||||
import org.logstash.instrument.metrics.NamespacedMetricExt;
|
||||
|
||||
import static org.logstash.RubyUtil.*;
|
||||
|
||||
public abstract class MetricTestCase extends RubyEnvTestCase {
|
||||
protected AbstractNamespacedMetricExt metric;
|
||||
protected ExecutionContextExt executionContext;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
final IRubyObject metricWithCollector =
|
||||
runRubyScript("require \"logstash/instrument/collector\"\n" +
|
||||
"metricWithCollector = LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new)");
|
||||
|
||||
metric = new NamespacedMetricExt(RUBY, NAMESPACED_METRIC_CLASS)
|
||||
.initialize(RUBY.getCurrentContext(), metricWithCollector, RUBY.newEmptyArray());
|
||||
executionContext = new ExecutionContextExt(RUBY, EXECUTION_CONTEXT_CLASS);
|
||||
}
|
||||
|
||||
protected static IRubyObject runRubyScript(String script) {
|
||||
IRubyObject m = RUBY.evalScriptlet(script);
|
||||
return m;
|
||||
}
|
||||
|
||||
protected RubyHash getMetricStore(String[] path) {
|
||||
RubyHash metricStore = (RubyHash) metric.collector(RUBY.getCurrentContext())
|
||||
.callMethod(RUBY.getCurrentContext(), "snapshot_metric")
|
||||
.callMethod(RUBY.getCurrentContext(), "metric_store")
|
||||
.callMethod(RUBY.getCurrentContext(), "get_with_path", RUBY.newString(Joiner.on("/").join(path)));
|
||||
|
||||
RubyHash rh = metricStore;
|
||||
for (String p : path) {
|
||||
rh = (RubyHash) rh.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(p));
|
||||
}
|
||||
return rh;
|
||||
}
|
||||
|
||||
protected String getMetricStringValue(RubyHash metricStore, String symbolName) {
|
||||
ConcreteJavaProxy counter = (ConcreteJavaProxy) metricStore.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(symbolName));
|
||||
RubyString value = (RubyString) counter.callMethod("value");
|
||||
return value.asJavaString();
|
||||
}
|
||||
|
||||
protected long getMetricLongValue(RubyHash metricStore, String symbolName) {
|
||||
ConcreteJavaProxy counter = (ConcreteJavaProxy) metricStore.op_aref(RUBY.getCurrentContext(), RUBY.newSymbol(symbolName));
|
||||
RubyFixnum count = (RubyFixnum) counter.callMethod("value");
|
||||
return count.getLongValue();
|
||||
}
|
||||
|
||||
protected Metric getInstance() {
|
||||
return new RootMetricImpl(RubyUtil.RUBY.getCurrentContext(), this.metric);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
package org.logstash.plugins;
|
||||
|
||||
import co.elastic.logstash.api.Metric;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import org.assertj.core.data.Percentage;
|
||||
import org.jruby.RubyHash;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class NamespacedMetricImplTest extends MetricTestCase {
|
||||
@Test
|
||||
public void testGauge() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
|
||||
metrics.gauge("abc", "def");
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricStringValue(metricStore, "abc")).isEqualTo("def");
|
||||
}
|
||||
|
||||
metrics.gauge("abc", "123");
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricStringValue(metricStore, "abc")).isEqualTo("123");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrement() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
|
||||
metrics.increment("abc");
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(1);
|
||||
}
|
||||
|
||||
metrics.increment("abc");
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(2);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementWithAmount() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
|
||||
metrics.increment("abc", 2);
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(2);
|
||||
}
|
||||
|
||||
metrics.increment("abc", 3);
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(5);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTimeCallable() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
metrics.time("abc", () -> {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isCloseTo(100, Percentage.withPercentage(5));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReportTime() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
|
||||
metrics.reportTime("abc", 123);
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(123);
|
||||
}
|
||||
|
||||
metrics.reportTime("abc", 877);
|
||||
{
|
||||
final RubyHash metricStore = getMetricStore(new String[]{"test"});
|
||||
assertThat(this.getMetricLongValue(metricStore, "abc")).isEqualTo(1000);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNamespace() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
|
||||
final NamespacedMetric namespaced = metrics.namespace("abcdef");
|
||||
assertThat(namespaced.namespaceName()).containsExactly("test", "abcdef");
|
||||
|
||||
final NamespacedMetric namespaced2 = namespaced.namespace("12345", "qwerty");
|
||||
assertThat(namespaced2.namespaceName()).containsExactly("test", "abcdef", "12345", "qwerty");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoot() {
|
||||
final NamespacedMetric metrics = this.getInstance().namespace("test");
|
||||
final Metric root = metrics.root();
|
||||
final NamespacedMetric namespaced = root.namespace("someothernamespace");
|
||||
assertThat(namespaced.namespaceName()).containsExactly("someothernamespace");
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ package org.logstash.plugins;
|
|||
|
||||
import co.elastic.logstash.api.Context;
|
||||
import co.elastic.logstash.api.EventFactory;
|
||||
import co.elastic.logstash.api.NamespacedMetric;
|
||||
import co.elastic.logstash.api.Plugin;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.logstash.common.io.DeadLetterQueueWriter;
|
||||
|
@ -13,6 +14,11 @@ public class TestContext implements Context {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NamespacedMetric getMetric(final Plugin plugin) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Logger getLogger(Plugin plugin) {
|
||||
return null;
|
||||
|
|
|
@ -36,6 +36,6 @@ public class TestPluginFactory implements RubyIntegration.PluginFactory {
|
|||
|
||||
@Override
|
||||
public Codec buildDefaultCodec(String codecName) {
|
||||
return new Line(new ConfigurationImpl(Collections.emptyMap()), new ContextImpl(null));
|
||||
return new Line(new ConfigurationImpl(Collections.emptyMap()), new ContextImpl(null, null));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ public class UuidTest {
|
|||
public void testUuidWithoutRequiredConfigThrows() {
|
||||
try {
|
||||
Configuration config = new ConfigurationImpl(Collections.emptyMap());
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null, null));
|
||||
PluginUtil.validateConfig(uuid, config);
|
||||
Assert.fail("java-uuid filter without required config should have thrown exception");
|
||||
} catch (IllegalStateException ex) {
|
||||
|
@ -40,7 +40,7 @@ public class UuidTest {
|
|||
Map<String, Object> rawConfig = new HashMap<>();
|
||||
rawConfig.put(Uuid.TARGET_CONFIG.name(), targetField);
|
||||
Configuration config = new ConfigurationImpl(rawConfig);
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null, null));
|
||||
PluginUtil.validateConfig(uuid, config);
|
||||
|
||||
org.logstash.Event e = new org.logstash.Event();
|
||||
|
@ -61,7 +61,7 @@ public class UuidTest {
|
|||
rawConfig.put(Uuid.TARGET_CONFIG.name(), targetField);
|
||||
rawConfig.put(Uuid.OVERWRITE_CONFIG.name(), true);
|
||||
Configuration config = new ConfigurationImpl(rawConfig);
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null));
|
||||
Uuid uuid = new Uuid(ID, config, new ContextImpl(null, null));
|
||||
PluginUtil.validateConfig(uuid, config);
|
||||
|
||||
org.logstash.Event e = new org.logstash.Event();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue