mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
4251550826
commit
b53e313202
5 changed files with 73 additions and 1 deletions
52
logstash-core/lib/logstash/codecs/delegator.rb
Normal file
52
logstash-core/lib/logstash/codecs/delegator.rb
Normal file
|
@ -0,0 +1,52 @@
|
|||
module LogStash::Codecs
|
||||
class Delegator < SimpleDelegator
|
||||
def initialize(obj)
|
||||
super(obj)
|
||||
@encode_metric = LogStash::Instrument::NamespacedNullMetric.new
|
||||
@decode_metric = LogStash::Instrument::NamespacedNullMetric.new
|
||||
end
|
||||
|
||||
def class
|
||||
__getobj__.class
|
||||
end
|
||||
|
||||
def metric=(metric)
|
||||
__getobj__.metric = metric
|
||||
|
||||
__getobj__.metric.gauge(:name, __getobj__.class.config_name)
|
||||
|
||||
@encode_metric = __getobj__.metric.namespace(:encode)
|
||||
@encode_metric.counter(:writes_in)
|
||||
@encode_metric.report_time(:duration_in_millis, 0)
|
||||
|
||||
@decode_metric = __getobj__.metric.namespace(:decode)
|
||||
@decode_metric.counter(:writes_in)
|
||||
@decode_metric.counter(:out)
|
||||
@decode_metric.report_time(:duration_in_millis, 0)
|
||||
end
|
||||
|
||||
def encode(event)
|
||||
@encode_metric.increment(:writes_in)
|
||||
@encode_metric.time(:duration_in_millis) do
|
||||
__getobj__.encode(event)
|
||||
end
|
||||
end
|
||||
|
||||
def multi_encode(events)
|
||||
@encode_metric.increment(:writes_in, events.length)
|
||||
@encode_metric.time(:duration_in_millis) do
|
||||
__getobj__.multi_encode(events)
|
||||
end
|
||||
end
|
||||
|
||||
def decode(data)
|
||||
@decode_metric.increment(:writes_in)
|
||||
@decode_metric.time(:duration_in_millis) do
|
||||
__getobj__.decode(data) do |event|
|
||||
@decode_metric.increment(:out)
|
||||
yield event
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -4,6 +4,7 @@ require "logstash/util/safe_uri"
|
|||
require "logstash/version"
|
||||
require "logstash/environment"
|
||||
require "logstash/util/plugin_version"
|
||||
require "logstash/codecs/delegator"
|
||||
require "filesize"
|
||||
|
||||
LogStash::Environment.load_locale!
|
||||
|
@ -410,7 +411,7 @@ module LogStash::Config::Mixin
|
|||
case validator
|
||||
when :codec
|
||||
if value.first.is_a?(String)
|
||||
value = LogStash::Plugin.lookup("codec", value.first).new
|
||||
value = LogStash::Codecs::Delegator.new LogStash::Plugin.lookup("codec", value.first).new
|
||||
return true, value
|
||||
else
|
||||
value = value.first
|
||||
|
|
|
@ -99,6 +99,13 @@ class LogStash::Inputs::Base < LogStash::Plugin
|
|||
cloned
|
||||
end
|
||||
|
||||
def metric=(metric)
|
||||
super
|
||||
# Hack to create a new metric namespace using 'plugins' as the root
|
||||
@codec.metric = metric.root.namespace(metric.namespace_name[0...-2].push(:codecs, codec.id))
|
||||
metric
|
||||
end
|
||||
|
||||
def execution_context=(context)
|
||||
super
|
||||
# There is no easy way to propage an instance variable into the codec, because the codec
|
||||
|
|
|
@ -102,6 +102,13 @@ class LogStash::Outputs::Base < LogStash::Plugin
|
|||
self.class.concurrency
|
||||
end
|
||||
|
||||
def metric=(metric)
|
||||
super
|
||||
# Hack to create a new metric namespace using 'plugins' as the root
|
||||
@codec.metric = metric.root.namespace(metric.namespace_name[0...-2].push(:codecs, codec.id))
|
||||
metric
|
||||
end
|
||||
|
||||
def execution_context=(context)
|
||||
super
|
||||
# There is no easy way to propage an instance variable into the codec, because the codec
|
||||
|
|
|
@ -56,6 +56,11 @@ public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt {
|
|||
return getNamespaceName(context);
|
||||
}
|
||||
|
||||
@JRubyMethod(name = "root")
|
||||
public AbstractMetricExt root(final ThreadContext context) {
|
||||
return getMetric();
|
||||
}
|
||||
|
||||
protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject key,
|
||||
IRubyObject value);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue