diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index 2c6790667..3e43501a4 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -115,6 +115,7 @@ module LogStash :events => stats[:events], :plugins => { :inputs => plugin_stats(stats, :inputs), + :codecs => plugin_stats(stats, :codecs), :filters => plugin_stats(stats, :filters), :outputs => plugin_stats(stats, :outputs) }, diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java new file mode 100644 index 000000000..f47e4068b --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/JavaCodecDelegator.java @@ -0,0 +1,130 @@ +package org.logstash.config.ir.compiler; + +import co.elastic.logstash.api.Codec; +import co.elastic.logstash.api.Event; +import co.elastic.logstash.api.PluginConfigSpec; +import org.jruby.RubySymbol; +import org.jruby.runtime.ThreadContext; +import org.logstash.RubyUtil; +import org.logstash.instrument.metrics.AbstractNamespacedMetricExt; +import org.logstash.instrument.metrics.MetricKeys; +import org.logstash.instrument.metrics.counter.LongCounter; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +public class JavaCodecDelegator implements Codec { + + public static final RubySymbol ENCODE_KEY = RubyUtil.RUBY.newSymbol("encode"); + public static final RubySymbol DECODE_KEY = RubyUtil.RUBY.newSymbol("decode"); + public static final RubySymbol IN_KEY = RubyUtil.RUBY.newSymbol("writes_in"); + + private final String configName; + + private final String id; + + private final Codec codec; + + protected final AbstractNamespacedMetricExt metricEncode; + + protected final AbstractNamespacedMetricExt metricDecode; + + protected final LongCounter encodeMetricIn; + + protected final LongCounter encodeMetricTime; + + protected final LongCounter decodeMetricIn; + + protected final LongCounter decodeMetricOut; + + protected final LongCounter decodeMetricTime; + + + public JavaCodecDelegator(final String configName, final String id, + final AbstractNamespacedMetricExt metric, + final Codec codec) { + this.configName = configName; + this.id = id; + this.codec = codec; + + final ThreadContext context = RubyUtil.RUBY.getCurrentContext(); + final AbstractNamespacedMetricExt namespacedMetric = + metric.namespace(context, RubyUtil.RUBY.newSymbol(codec.getId())); + synchronized(namespacedMetric.getMetric()) { + metricEncode = namespacedMetric.namespace(context, ENCODE_KEY); + encodeMetricIn = LongCounter.fromRubyBase(metricEncode, IN_KEY); + encodeMetricTime = LongCounter.fromRubyBase(metricEncode, MetricKeys.DURATION_IN_MILLIS_KEY); + + metricDecode = namespacedMetric.namespace(context, DECODE_KEY); + decodeMetricIn = LongCounter.fromRubyBase(metricDecode, IN_KEY); + decodeMetricOut = LongCounter.fromRubyBase(metricDecode, MetricKeys.OUT_KEY); + decodeMetricTime = LongCounter.fromRubyBase(metricDecode, MetricKeys.DURATION_IN_MILLIS_KEY); + + namespacedMetric.gauge(context, MetricKeys.NAME_KEY, RubyUtil.RUBY.newString(configName)); + } + } + + @Override + public void decode(final ByteBuffer buffer, final Consumer> eventConsumer) { + decodeMetricIn.increment(); + + final long start = System.nanoTime(); + + codec.decode(buffer, (event) -> { + decodeMetricOut.increment(); + eventConsumer.accept(event); + }); + + decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)); + } + + @Override + public void flush(final ByteBuffer buffer, final Consumer> eventConsumer) { + decodeMetricIn.increment(); + + final long start = System.nanoTime(); + + codec.flush(buffer, (event) -> { + decodeMetricOut.increment(); + eventConsumer.accept(event); + }); + + decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)); + } + + @Override + public boolean encode(final Event event, final ByteBuffer buffer) throws EncodeException { + encodeMetricIn.increment(); + + final long start = System.nanoTime(); + + final boolean ret = codec.encode(event, buffer); + + decodeMetricTime.increment(TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS)); + + return ret; + } + + @Override + public Codec cloneCodec() { + return codec.cloneCodec(); + } + + @Override + public Collection> configSchema() { + return codec.configSchema(); + } + + @Override + public String getName() { + return codec.getName(); + } + + @Override + public String getId() { + return codec.getId(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java index 1176058fa..790c9b0e5 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java +++ b/logstash-core/src/main/java/org/logstash/plugins/PluginFactoryExt.java @@ -26,6 +26,7 @@ import org.logstash.config.ir.PipelineIR; import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt; import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt; import org.logstash.config.ir.compiler.FilterDelegatorExt; +import org.logstash.config.ir.compiler.JavaCodecDelegator; import org.logstash.config.ir.compiler.JavaFilterDelegatorExt; import org.logstash.config.ir.compiler.JavaInputDelegatorExt; import org.logstash.config.ir.compiler.JavaOutputDelegatorExt; @@ -338,7 +339,7 @@ public final class PluginFactoryExt { } if (codec != null) { - return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, codec); + return JavaUtil.convertJavaToRuby(RubyUtil.RUBY, new JavaCodecDelegator(name, id, typeScopedMetric, codec)); } else { throw new IllegalStateException("Unable to instantiate codec: " + pluginClass); }