diff --git a/logstash-core/lib/logstash/codecs/delegator.rb b/logstash-core/lib/logstash/codecs/delegator.rb new file mode 100644 index 000000000..2cbb440c5 --- /dev/null +++ b/logstash-core/lib/logstash/codecs/delegator.rb @@ -0,0 +1,52 @@ +module LogStash::Codecs + class Delegator < SimpleDelegator + def initialize(obj) + super(obj) + @encode_metric = LogStash::Instrument::NamespacedNullMetric.new + @decode_metric = LogStash::Instrument::NamespacedNullMetric.new + end + + def class + __getobj__.class + end + + def metric=(metric) + __getobj__.metric = metric + + __getobj__.metric.gauge(:name, __getobj__.class.config_name) + + @encode_metric = __getobj__.metric.namespace(:encode) + @encode_metric.counter(:writes_in) + @encode_metric.report_time(:duration_in_millis, 0) + + @decode_metric = __getobj__.metric.namespace(:decode) + @decode_metric.counter(:writes_in) + @decode_metric.counter(:out) + @decode_metric.report_time(:duration_in_millis, 0) + end + + def encode(event) + @encode_metric.increment(:writes_in) + @encode_metric.time(:duration_in_millis) do + __getobj__.encode(event) + end + end + + def multi_encode(events) + @encode_metric.increment(:writes_in, events.length) + @encode_metric.time(:duration_in_millis) do + __getobj__.multi_encode(events) + end + end + + def decode(data) + @decode_metric.increment(:writes_in) + @decode_metric.time(:duration_in_millis) do + __getobj__.decode(data) do |event| + @decode_metric.increment(:out) + yield event + end + end + end + end +end diff --git a/logstash-core/lib/logstash/config/mixin.rb b/logstash-core/lib/logstash/config/mixin.rb index 30fb44d75..621ef9e0c 100644 --- a/logstash-core/lib/logstash/config/mixin.rb +++ b/logstash-core/lib/logstash/config/mixin.rb @@ -4,6 +4,7 @@ require "logstash/util/safe_uri" require "logstash/version" require "logstash/environment" require "logstash/util/plugin_version" +require "logstash/codecs/delegator" require "filesize" LogStash::Environment.load_locale! @@ -410,7 +411,7 @@ module LogStash::Config::Mixin case validator when :codec if value.first.is_a?(String) - value = LogStash::Plugin.lookup("codec", value.first).new + value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new return true, value else value = value.first diff --git a/logstash-core/lib/logstash/inputs/base.rb b/logstash-core/lib/logstash/inputs/base.rb index 2a8ee97e7..50878ffff 100644 --- a/logstash-core/lib/logstash/inputs/base.rb +++ b/logstash-core/lib/logstash/inputs/base.rb @@ -99,6 +99,13 @@ class LogStash::Inputs::Base < LogStash::Plugin cloned end + def metric=(metric) + super + # Hack to create a new metric namespace using 'plugins' as the root + @codec.metric = metric.root.namespace(metric.namespace_name[0...-2].push(:codecs, codec.id)) + metric + end + def execution_context=(context) super # There is no easy way to propage an instance variable into the codec, because the codec diff --git a/logstash-core/lib/logstash/outputs/base.rb b/logstash-core/lib/logstash/outputs/base.rb index 1efc8b079..34abe972c 100644 --- a/logstash-core/lib/logstash/outputs/base.rb +++ b/logstash-core/lib/logstash/outputs/base.rb @@ -102,6 +102,13 @@ class LogStash::Outputs::Base < LogStash::Plugin self.class.concurrency end + def metric=(metric) + super + # Hack to create a new metric namespace using 'plugins' as the root + @codec.metric = metric.root.namespace(metric.namespace_name[0...-2].push(:codecs, codec.id)) + metric + end + def execution_context=(context) super # There is no easy way to propage an instance variable into the codec, because the codec diff --git a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java index 414b05b99..5d1051e82 100644 --- a/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java +++ b/logstash-core/src/main/java/org/logstash/instrument/metrics/AbstractNamespacedMetricExt.java @@ -56,6 +56,11 @@ public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt { return getNamespaceName(context); } + @JRubyMethod(name = "root") + public AbstractMetricExt root(final ThreadContext context) { + return getMetric(); + } + protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject key, IRubyObject value);