JAVAFICATION: Port Plugin Factory to Java

Fixes #9491
This commit is contained in:
Armin 2018-04-26 11:26:28 +02:00 committed by Armin Braun
parent 47fd3a519c
commit d38e117246
25 changed files with 612 additions and 240 deletions

View file

@ -1,58 +1 @@
# encoding: utf-8 # This file is kept for backwards compatibility with plugins that include it directly.
require "logstash/instrument/null_metric"
module LogStash module Instrument
# This class acts a a proxy between the metric library and the user calls.
#
# This is the class that plugins authors will use to interact with the `MetricStore`
# It has the same public interface as `Metric` class but doesnt require to send
# the namespace on every call.
#
# @see Logstash::Instrument::Metric
class NamespacedNullMetric
attr_reader :namespace_name
# Create metric with a specific namespace
#
# @param metric [LogStash::Instrument::Metric] The metric instance to proxy
# @param namespace [Array] The namespace to use
def initialize(metric = nil, namespace_name = :null)
@metric = metric
@namespace_name = Array(namespace_name)
end
def increment(key, value = 1)
end
def decrement(key, value = 1)
end
def gauge(key, value)
end
def report_time(key, duration)
end
def time(key, &block)
if block_given?
yield
else
::LogStash::Instrument::NullMetric::NullTimedExecution
end
end
def collector
@metric.collector
end
def counter(_)
::LogStash::Instrument::NullMetric::NullGauge
end
def namespace(name)
NamespacedNullMetric.new(metric, namespace_name + Array(name))
end
private
attr_reader :metric
end
end; end

View file

@ -1,70 +1,2 @@
# encoding: utf-8 # The contents of this file have been ported to Java. It is included for for compatibility
# with plugins that directly require it.
module LogStash module Instrument
# This class is used in the context when we disable the metric collection
# for specific plugin to replace the `NamespacedMetric` class with this one
# which doesn't produce any metric to the collector.
class NullMetric
attr_reader :namespace_name, :collector
def initialize(collector = nil)
@collector = collector
end
def increment(namespace, key, value = 1)
Metric.validate_key!(key)
end
def decrement(namespace, key, value = 1)
Metric.validate_key!(key)
end
def gauge(namespace, key, value)
Metric.validate_key!(key)
end
def report_time(namespace, key, duration)
Metric.validate_key!(key)
end
# We have to manually redefine this method since it can return an
# object this object also has to be implemented as a NullObject
def time(namespace, key)
Metric.validate_key!(key)
if block_given?
yield
else
NullTimedExecution
end
end
def counter(_)
NullGauge
end
def namespace(name)
raise MetricNoNamespaceProvided if name.nil? || name.empty?
NamespacedNullMetric.new(self, name)
end
def self.validate_key!(key)
raise MetricNoKeyProvided if key.nil? || key.empty?
end
private
class NullGauge
def self.increment(_)
end
end
# Null implementation of the internal timer class
#
# @see LogStash::Instrument::TimedExecution`
class NullTimedExecution
def self.stop
0
end
end
end
end; end

View file

@ -7,8 +7,6 @@ require "logstash/inputs/base"
require "logstash/outputs/base" require "logstash/outputs/base"
require "logstash/shutdown_watcher" require "logstash/shutdown_watcher"
require "logstash/pipeline_reporter" require "logstash/pipeline_reporter"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector" require "logstash/instrument/collector"
require "logstash/queue_factory" require "logstash/queue_factory"
require "logstash/compiler" require "logstash/compiler"

View file

@ -10,8 +10,6 @@ require "logstash/inputs/base"
require "logstash/outputs/base" require "logstash/outputs/base"
require "logstash/shutdown_watcher" require "logstash/shutdown_watcher"
require "logstash/pipeline_reporter" require "logstash/pipeline_reporter"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector" require "logstash/instrument/collector"
require "logstash/filter_delegator" require "logstash/filter_delegator"
require "logstash/queue_factory" require "logstash/queue_factory"

View file

@ -1,7 +1,6 @@
# encoding: utf-8 # encoding: utf-8
require "logstash/logging" require "logstash/logging"
require "logstash/config/mixin" require "logstash/config/mixin"
require "logstash/instrument/null_metric"
require "concurrent" require "concurrent"
require "securerandom" require "securerandom"

View file

@ -2,33 +2,6 @@
module LogStash module LogStash
module Plugins module Plugins
class ExecutionContextFactory
def initialize(agent, pipeline, dlq_writer)
@agent = agent
@pipeline = pipeline
@dlq_writer = dlq_writer
end
def create(id, klass_cfg_name)
ExecutionContext.new(@pipeline, @agent, id, klass_cfg_name, @dlq_writer)
end
end
class PluginMetricFactory
def initialize(pipeline_id, metric)
@pipeline_id = pipeline_id.to_s.to_sym
@metric = metric
end
def create(plugin_type)
@metric.namespace([:stats, :pipelines, @pipeline_id, :plugins])
.namespace("#{plugin_type}s".to_sym)
end
end
class PluginFactory class PluginFactory
include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory include org.logstash.config.ir.compiler.RubyIntegration::PluginFactory

View file

@ -1,7 +1,6 @@
# encoding: utf-8 # encoding: utf-8
require "spec_helper" require "spec_helper"
require "logstash/filter_delegator" require "logstash/filter_delegator"
require "logstash/instrument/null_metric"
require "logstash/event" require "logstash/event"
require "support/shared_contexts" require "support/shared_contexts"
@ -18,11 +17,10 @@ describe LogStash::FilterDelegator do
let(:config) do let(:config) do
{ "host" => "127.0.0.1", "id" => filter_id } { "host" => "127.0.0.1", "id" => filter_id }
end end
let(:collector) { [] }
let(:counter_in) { MockGauge.new } let(:counter_in) { MockGauge.new }
let(:counter_out) { MockGauge.new } let(:counter_out) { MockGauge.new }
let(:counter_time) { MockGauge.new } let(:counter_time) { MockGauge.new }
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) } let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(nil, :null) }
let(:events) { [LogStash::Event.new, LogStash::Event.new] } let(:events) { [LogStash::Event.new, LogStash::Event.new] }
before :each do before :each do

View file

@ -1,6 +1,4 @@
# encoding: utf-8 # encoding: utf-8
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/null_metric"
require_relative "../../support/matchers" require_relative "../../support/matchers"
require "spec_helper" require "spec_helper"
@ -16,7 +14,7 @@ describe LogStash::Instrument::NamespacedNullMetric do
end end
it "returns a TimedException when we call without a block" do it "returns a TimedException when we call without a block" do
expect(subject.time(:duration_ms)).to be(LogStash::Instrument::NullMetric::NullTimedExecution) expect(subject.time(:duration_ms)).to be_kind_of(LogStash::Instrument::NullMetric::NullTimedExecution)
end end
it "returns the value of the block" do it "returns the value of the block" do

View file

@ -1,6 +1,4 @@
# encoding: utf-8 # encoding: utf-8
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_metric"
require_relative "../../support/shared_examples" require_relative "../../support/shared_examples"
require_relative "../../support/matchers" require_relative "../../support/matchers"
require "spec_helper" require "spec_helper"
@ -8,8 +6,7 @@ require "spec_helper"
describe LogStash::Instrument::NullMetric do describe LogStash::Instrument::NullMetric do
let(:key) { "test" } let(:key) { "test" }
let(:collector) { [] } subject { LogStash::Instrument::NullMetric.new(nil) }
subject { LogStash::Instrument::NullMetric.new(collector) }
it "defines the same interface as `Metric`" do it "defines the same interface as `Metric`" do
expect(described_class).to implement_interface_of(LogStash::Instrument::Metric) expect(described_class).to implement_interface_of(LogStash::Instrument::Metric)

View file

@ -1,7 +1,6 @@
# encoding: utf-8 # encoding: utf-8
require "spec_helper" require "spec_helper"
require "logstash/filter_delegator" require "logstash/filter_delegator"
require "logstash/instrument/null_metric"
require "logstash/event" require "logstash/event"
require "support/shared_contexts" require "support/shared_contexts"
@ -20,7 +19,6 @@ describe LogStash::JavaFilterDelegator do
let(:config) do let(:config) do
{ "host" => "127.0.0.1", "id" => filter_id } { "host" => "127.0.0.1", "id" => filter_id }
end end
let(:collector) { [] }
let(:metric) { let(:metric) {
LogStash::Instrument::NamespacedMetric.new( LogStash::Instrument::NamespacedMetric.new(
LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter] LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter]

View file

@ -3,7 +3,6 @@ require "spec_helper"
require_relative "../../support/helpers" require_relative "../../support/helpers"
require_relative "../../support/matchers" require_relative "../../support/matchers"
require "logstash/pipeline_action/create" require "logstash/pipeline_action/create"
require "logstash/instrument/null_metric"
require "logstash/inputs/generator" require "logstash/inputs/generator"
describe LogStash::PipelineAction::Create do describe LogStash::PipelineAction::Create do

View file

@ -3,7 +3,6 @@ require "spec_helper"
require_relative "../../support/helpers" require_relative "../../support/helpers"
require_relative "../../support/matchers" require_relative "../../support/matchers"
require "logstash/pipeline_action/reload" require "logstash/pipeline_action/reload"
require "logstash/instrument/null_metric"
describe LogStash::PipelineAction::Reload do describe LogStash::PipelineAction::Reload do
let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) } let(:metric) { LogStash::Instrument::NullMetric.new(LogStash::Instrument::Collector.new) }

View file

@ -3,7 +3,6 @@ require "spec_helper"
require_relative "../../support/helpers" require_relative "../../support/helpers"
require "logstash/pipeline_action/stop" require "logstash/pipeline_action/stop"
require "logstash/pipeline" require "logstash/pipeline"
require "logstash/instrument/null_metric"
describe LogStash::PipelineAction::Stop do describe LogStash::PipelineAction::Stop do
let(:pipeline_config) { "input { generator {} } output { null {} }" } let(:pipeline_config) { "input { generator {} } output { null {} }" }

View file

@ -4,7 +4,6 @@ require_relative "../support/helpers"
require_relative "../support/matchers" require_relative "../support/matchers"
require "logstash/state_resolver" require "logstash/state_resolver"
require "logstash/config/pipeline_config" require "logstash/config/pipeline_config"
require "logstash/instrument/null_metric"
require "logstash/pipeline" require "logstash/pipeline"
require "ostruct" require "ostruct"
require "digest" require "digest"

View file

@ -84,8 +84,8 @@ describe LogStash::WrappedSynchronousQueue do
context "when writing to the queue" do context "when writing to the queue" do
before :each do before :each do
read_client.set_events_metric(LogStash::Instrument::NamespacedNullMetric.new([], :null)) read_client.set_events_metric(LogStash::Instrument::NamespacedNullMetric.new(nil, :null))
read_client.set_pipeline_metric(LogStash::Instrument::NamespacedNullMetric.new([], :null)) read_client.set_pipeline_metric(LogStash::Instrument::NamespacedNullMetric.new(nil, :null))
end end
it "appends batches to the queue" do it "appends batches to the queue" do

View file

@ -26,8 +26,14 @@ import org.logstash.ext.JrubyMemoryReadClientExt;
import org.logstash.ext.JrubyMemoryWriteClientExt; import org.logstash.ext.JrubyMemoryWriteClientExt;
import org.logstash.ext.JrubyTimestampExtLibrary; import org.logstash.ext.JrubyTimestampExtLibrary;
import org.logstash.ext.JrubyWrappedSynchronousQueueExt; import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.AbstractSimpleMetricExt;
import org.logstash.instrument.metrics.MetricExt; import org.logstash.instrument.metrics.MetricExt;
import org.logstash.instrument.metrics.NamespacedMetricExt; import org.logstash.instrument.metrics.NamespacedMetricExt;
import org.logstash.instrument.metrics.NullMetricExt;
import org.logstash.instrument.metrics.NullNamespacedMetricExt;
import org.logstash.plugins.PluginFactoryExt;
/** /**
* Utilities around interaction with the {@link Ruby} runtime. * Utilities around interaction with the {@link Ruby} runtime.
@ -87,10 +93,22 @@ public final class RubyUtil {
public static final RubyClass BUFFERED_TOKENIZER; public static final RubyClass BUFFERED_TOKENIZER;
public static final RubyClass ABSTRACT_METRIC_CLASS;
public static final RubyClass ABSTRACT_SIMPLE_METRIC_CLASS;
public static final RubyClass ABSTRACT_NAMESPACED_METRIC_CLASS;
public static final RubyClass METRIC_CLASS; public static final RubyClass METRIC_CLASS;
public static final RubyClass NULL_METRIC_CLASS;
public static final RubyClass NULL_COUNTER_CLASS;
public static final RubyClass NAMESPACED_METRIC_CLASS; public static final RubyClass NAMESPACED_METRIC_CLASS;
public static final RubyClass NULL_NAMESPACED_METRIC_CLASS;
public static final RubyClass METRIC_EXCEPTION_CLASS; public static final RubyClass METRIC_EXCEPTION_CLASS;
public static final RubyClass METRIC_NO_KEY_PROVIDED_CLASS; public static final RubyClass METRIC_NO_KEY_PROVIDED_CLASS;
@ -101,6 +119,8 @@ public final class RubyUtil {
public static final RubyClass TIMED_EXECUTION_CLASS; public static final RubyClass TIMED_EXECUTION_CLASS;
public static final RubyClass NULL_TIMED_EXECUTION_CLASS;
public static final RubyClass ABSTRACT_DLQ_WRITER_CLASS; public static final RubyClass ABSTRACT_DLQ_WRITER_CLASS;
public static final RubyClass DUMMY_DLQ_WRITER_CLASS; public static final RubyClass DUMMY_DLQ_WRITER_CLASS;
@ -111,6 +131,10 @@ public final class RubyUtil {
public static final RubyClass BUG_CLASS; public static final RubyClass BUG_CLASS;
public static final RubyClass EXECUTION_CONTEXT_FACTORY_CLASS;
public static final RubyClass PLUGIN_METRIC_FACTORY_CLASS;
/** /**
* Logstash Ruby Module. * Logstash Ruby Module.
*/ */
@ -118,6 +142,8 @@ public final class RubyUtil {
private static final RubyModule OUTPUT_DELEGATOR_STRATEGIES; private static final RubyModule OUTPUT_DELEGATOR_STRATEGIES;
private static final RubyModule PLUGINS_MODULE;
static { static {
RUBY = Ruby.getGlobalRuntime(); RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash"); LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
@ -125,8 +151,20 @@ public final class RubyUtil {
"Inputs", "Outputs", "Filters", "Search", "Config", "File", "Web", "PluginMixins", "Inputs", "Outputs", "Filters", "Search", "Config", "File", "Web", "PluginMixins",
"PluginManager", "Api", "Modules" "PluginManager", "Api", "Modules"
).forEach(module -> RUBY.defineModuleUnder(module, LOGSTASH_MODULE)); ).forEach(module -> RUBY.defineModuleUnder(module, LOGSTASH_MODULE));
PLUGINS_MODULE = RUBY.defineModuleUnder("Plugins", LOGSTASH_MODULE);
final RubyModule instrumentModule = final RubyModule instrumentModule =
RUBY.defineModuleUnder("Instrument", LOGSTASH_MODULE); RUBY.defineModuleUnder("Instrument", LOGSTASH_MODULE);
EXECUTION_CONTEXT_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"ExecutionContextFactory", RUBY.getObject(),
PluginFactoryExt.ExecutionContext::new
);
PLUGIN_METRIC_FACTORY_CLASS = PLUGINS_MODULE.defineClassUnder(
"PluginMetricFactory", RUBY.getObject(), PluginFactoryExt.Metrics::new
);
PLUGIN_METRIC_FACTORY_CLASS.defineAnnotatedMethods(PluginFactoryExt.Metrics.class);
EXECUTION_CONTEXT_FACTORY_CLASS.defineAnnotatedMethods(
PluginFactoryExt.ExecutionContext.class
);
METRIC_EXCEPTION_CLASS = instrumentModule.defineClassUnder( METRIC_EXCEPTION_CLASS = instrumentModule.defineClassUnder(
"MetricException", RUBY.getException(), MetricExt.MetricException::new "MetricException", RUBY.getException(), MetricExt.MetricException::new
); );
@ -141,17 +179,50 @@ public final class RubyUtil {
"MetricNoNamespaceProvided", METRIC_EXCEPTION_CLASS, "MetricNoNamespaceProvided", METRIC_EXCEPTION_CLASS,
MetricExt.MetricNoNamespaceProvided::new MetricExt.MetricNoNamespaceProvided::new
); );
METRIC_CLASS ABSTRACT_METRIC_CLASS = instrumentModule.defineClassUnder(
= instrumentModule.defineClassUnder("Metric", RUBY.getObject(), MetricExt::new); "AbstractMetric", RUBY.getObject(),
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
ABSTRACT_NAMESPACED_METRIC_CLASS = instrumentModule.defineClassUnder(
"AbstractNamespacedMetric", ABSTRACT_METRIC_CLASS,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
ABSTRACT_SIMPLE_METRIC_CLASS = instrumentModule.defineClassUnder(
"AbstractSimpleMetric", ABSTRACT_METRIC_CLASS,
ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR
);
METRIC_CLASS = instrumentModule.defineClassUnder(
"Metric", ABSTRACT_SIMPLE_METRIC_CLASS, MetricExt::new
);
NULL_METRIC_CLASS = instrumentModule.defineClassUnder(
"NullMetric", ABSTRACT_SIMPLE_METRIC_CLASS, NullMetricExt::new
);
TIMED_EXECUTION_CLASS = METRIC_CLASS.defineClassUnder( TIMED_EXECUTION_CLASS = METRIC_CLASS.defineClassUnder(
"TimedExecution", RUBY.getObject(), MetricExt.TimedExecution::new "TimedExecution", RUBY.getObject(), MetricExt.TimedExecution::new
); );
NAMESPACED_METRIC_CLASS = instrumentModule.defineClassUnder( NULL_TIMED_EXECUTION_CLASS = NULL_METRIC_CLASS.defineClassUnder(
"NamespacedMetric", RUBY.getObject(), NamespacedMetricExt::new "NullTimedExecution", RUBY.getObject(), NullMetricExt.NullTimedExecution::new
); );
NULL_COUNTER_CLASS = METRIC_CLASS.defineClassUnder(
"NullCounter", RUBY.getObject(), NullNamespacedMetricExt.NullCounter::new
);
NAMESPACED_METRIC_CLASS = instrumentModule.defineClassUnder(
"NamespacedMetric", ABSTRACT_NAMESPACED_METRIC_CLASS, NamespacedMetricExt::new
);
NULL_NAMESPACED_METRIC_CLASS = instrumentModule.defineClassUnder(
"NamespacedNullMetric", ABSTRACT_NAMESPACED_METRIC_CLASS,
NullNamespacedMetricExt::new
);
ABSTRACT_METRIC_CLASS.defineAnnotatedMethods(AbstractMetricExt.class);
ABSTRACT_SIMPLE_METRIC_CLASS.defineAnnotatedMethods(AbstractSimpleMetricExt.class);
ABSTRACT_NAMESPACED_METRIC_CLASS.defineAnnotatedMethods(AbstractNamespacedMetricExt.class);
METRIC_CLASS.defineAnnotatedMethods(MetricExt.class); METRIC_CLASS.defineAnnotatedMethods(MetricExt.class);
NULL_METRIC_CLASS.defineAnnotatedMethods(NullMetricExt.class);
NAMESPACED_METRIC_CLASS.defineAnnotatedMethods(NamespacedMetricExt.class); NAMESPACED_METRIC_CLASS.defineAnnotatedMethods(NamespacedMetricExt.class);
NULL_NAMESPACED_METRIC_CLASS.defineAnnotatedMethods(NullNamespacedMetricExt.class);
TIMED_EXECUTION_CLASS.defineAnnotatedMethods(MetricExt.TimedExecution.class); TIMED_EXECUTION_CLASS.defineAnnotatedMethods(MetricExt.TimedExecution.class);
NULL_TIMED_EXECUTION_CLASS.defineAnnotatedMethods(NullMetricExt.NullTimedExecution.class);
NULL_COUNTER_CLASS.defineAnnotatedMethods(NullNamespacedMetricExt.NullCounter.class);
final RubyModule util = LOGSTASH_MODULE.defineModuleUnder("Util"); final RubyModule util = LOGSTASH_MODULE.defineModuleUnder("Util");
ABSTRACT_DLQ_WRITER_CLASS = util.defineClassUnder( ABSTRACT_DLQ_WRITER_CLASS = util.defineClassUnder(
"AbstractDeadLetterQueueWriterExt", RUBY.getObject(), "AbstractDeadLetterQueueWriterExt", RUBY.getObject(),

View file

@ -24,7 +24,7 @@ public final class ExecutionContextExt extends RubyObject {
} }
@JRubyMethod(required = 5) @JRubyMethod(required = 5)
public IRubyObject initialize(final ThreadContext context, public ExecutionContextExt initialize(final ThreadContext context,
final IRubyObject[] args) { final IRubyObject[] args) {
pipeline = args[0]; pipeline = args[0];
agent = args[1]; agent = args[1];

View file

@ -0,0 +1,34 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "AbstractMetric")
public abstract class AbstractMetricExt extends RubyObject {
public AbstractMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public final AbstractNamespacedMetricExt namespace(final ThreadContext context,
final IRubyObject name) {
return createNamespaced(context, name);
}
@JRubyMethod
public final IRubyObject collector(final ThreadContext context) {
return getCollector(context);
}
protected abstract AbstractNamespacedMetricExt createNamespaced(
ThreadContext context, IRubyObject name
);
protected abstract IRubyObject getCollector(ThreadContext context);
}

View file

@ -0,0 +1,71 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "AbstractNamespacedMetric")
public abstract class AbstractNamespacedMetricExt extends AbstractMetricExt {
AbstractNamespacedMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public IRubyObject counter(final ThreadContext context, final IRubyObject key) {
return getCounter(context, key);
}
@JRubyMethod
public IRubyObject gauge(final ThreadContext context, final IRubyObject key,
final IRubyObject value) {
return getGauge(context, key, value);
}
@JRubyMethod(required = 1, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
return doIncrement(context, args);
}
@JRubyMethod(required = 1, optional = 1)
public IRubyObject decrement(final ThreadContext context, final IRubyObject[] args) {
return doDecrement(context, args);
}
@JRubyMethod
public IRubyObject time(final ThreadContext context, final IRubyObject key, final Block block) {
return doTime(context, key, block);
}
@JRubyMethod(name = "report_time")
public IRubyObject reportTime(final ThreadContext context, final IRubyObject key,
final IRubyObject duration) {
return doReportTime(context, key, duration);
}
@JRubyMethod(name = "namespace_name")
public RubyArray namespaceName(final ThreadContext context) {
return getNamespaceName(context);
}
protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject key,
IRubyObject value);
protected abstract RubyArray getNamespaceName(ThreadContext context);
protected abstract IRubyObject getCounter(ThreadContext context, IRubyObject key);
protected abstract IRubyObject doTime(ThreadContext context, IRubyObject key, Block block);
protected abstract IRubyObject doReportTime(ThreadContext context,
IRubyObject key, IRubyObject duration);
protected abstract IRubyObject doIncrement(ThreadContext context, IRubyObject[] args);
protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args);
}

View file

@ -0,0 +1,58 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
@JRubyClass(name = "AbstractSimpleMetric")
public abstract class AbstractSimpleMetricExt extends AbstractMetricExt {
AbstractSimpleMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(required = 2, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
return doIncrement(context, args);
}
@JRubyMethod(required = 2, optional = 1)
public IRubyObject decrement(final ThreadContext context, final IRubyObject[] args) {
return doDecrement(context, args);
}
@JRubyMethod
public IRubyObject gauge(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject value) {
return getGauge(context, namespace, key, value);
}
@JRubyMethod(name = "report_time")
public IRubyObject reportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {
return doReportTime(context, namespace, key, duration);
}
@JRubyMethod
public IRubyObject time(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final Block block) {
return doTime(context, namespace, key, block);
}
protected abstract IRubyObject doDecrement(ThreadContext context, IRubyObject[] args);
protected abstract IRubyObject doIncrement(ThreadContext context, IRubyObject[] args);
protected abstract IRubyObject getGauge(ThreadContext context, IRubyObject namespace,
IRubyObject key, IRubyObject value);
protected abstract IRubyObject doReportTime(ThreadContext context, IRubyObject namespace,
IRubyObject key, IRubyObject duration);
protected abstract IRubyObject doTime(ThreadContext context, IRubyObject namespace,
IRubyObject key, Block block);
}

View file

@ -18,7 +18,7 @@ import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil; import org.logstash.RubyUtil;
@JRubyClass(name = "Metric") @JRubyClass(name = "Metric")
public final class MetricExt extends RubyObject { public final class MetricExt extends AbstractSimpleMetricExt {
public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter"); public static final RubySymbol COUNTER = RubyUtil.RUBY.newSymbol("counter");
@ -60,20 +60,6 @@ public final class MetricExt extends RubyObject {
return this; return this;
} }
@JRubyMethod
public IRubyObject collector(final ThreadContext context) {
return collector;
}
@JRubyMethod(required = 2, optional = 1)
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 2) {
return increment(context, args[0], args[1]);
} else {
return increment(context, args[0], args[1], args[2]);
}
}
public IRubyObject increment(final ThreadContext context, final IRubyObject namespace, public IRubyObject increment(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key) { final IRubyObject key) {
return increment(context, namespace, key, ONE); return increment(context, namespace, key, ONE);
@ -87,15 +73,6 @@ public final class MetricExt extends RubyObject {
); );
} }
@JRubyMethod(required = 2, optional = 1)
public IRubyObject decrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 2) {
return decrement(context, args[0], args[1], ONE);
} else {
return decrement(context, args[0], args[1], args[2]);
}
}
public IRubyObject decrement(final ThreadContext context, final IRubyObject namespace, public IRubyObject decrement(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key) { final IRubyObject key) {
return decrement(context, namespace, key, ONE); return decrement(context, namespace, key, ONE);
@ -109,8 +86,31 @@ public final class MetricExt extends RubyObject {
); );
} }
@JRubyMethod @Override
public IRubyObject gauge(final ThreadContext context, final IRubyObject namespace, protected IRubyObject doDecrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 2) {
return decrement(context, args[0], args[1], ONE);
} else {
return decrement(context, args[0], args[1], args[2]);
}
}
@Override
protected IRubyObject getCollector(final ThreadContext context) {
return collector;
}
@Override
protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 2) {
return increment(context, args[0], args[1]);
} else {
return increment(context, args[0], args[1], args[2]);
}
}
@Override
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject value) { final IRubyObject key, final IRubyObject value) {
MetricExt.validateKey(context, null, key); MetricExt.validateKey(context, null, key);
return collector.callMethod( return collector.callMethod(
@ -118,8 +118,8 @@ public final class MetricExt extends RubyObject {
); );
} }
@JRubyMethod(name = "report_time") @Override
public IRubyObject reportTime(final ThreadContext context, final IRubyObject namespace, protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) { final IRubyObject key, final IRubyObject duration) {
MetricExt.validateKey(context, null, key); MetricExt.validateKey(context, null, key);
return collector.callMethod( return collector.callMethod(
@ -127,8 +127,8 @@ public final class MetricExt extends RubyObject {
); );
} }
@JRubyMethod @Override
public IRubyObject time(final ThreadContext context, final IRubyObject namespace, protected IRubyObject doTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final Block block) { final IRubyObject key, final Block block) {
MetricExt.validateKey(context, null, key); MetricExt.validateKey(context, null, key);
if (!block.isGiven()) { if (!block.isGiven()) {
@ -144,8 +144,9 @@ public final class MetricExt extends RubyObject {
return res; return res;
} }
@JRubyMethod @Override
public NamespacedMetricExt namespace(final ThreadContext context, final IRubyObject name) { protected NamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS); validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return NamespacedMetricExt.create( return NamespacedMetricExt.create(
this, this,

View file

@ -3,7 +3,6 @@ package org.logstash.instrument.metrics;
import org.jruby.Ruby; import org.jruby.Ruby;
import org.jruby.RubyArray; import org.jruby.RubyArray;
import org.jruby.RubyClass; import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod; import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block; import org.jruby.runtime.Block;
@ -13,7 +12,7 @@ import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil; import org.logstash.RubyUtil;
@JRubyClass(name = "NamespacedMetric") @JRubyClass(name = "NamespacedMetric")
public final class NamespacedMetricExt extends RubyObject { public final class NamespacedMetricExt extends AbstractNamespacedMetricExt {
private RubyArray namespaceName; private RubyArray namespaceName;
@ -44,26 +43,26 @@ public final class NamespacedMetricExt extends RubyObject {
return this; return this;
} }
@JRubyMethod @Override
public IRubyObject collector(final ThreadContext context) { protected IRubyObject getCollector(final ThreadContext context) {
return metric.collector(context); return metric.collector(context);
} }
@JRubyMethod @Override
public IRubyObject counter(final ThreadContext context, final IRubyObject key) { protected IRubyObject getCounter(final ThreadContext context, final IRubyObject key) {
return collector(context).callMethod( return collector(context).callMethod(
context, "get", new IRubyObject[]{namespaceName, key, MetricExt.COUNTER} context, "get", new IRubyObject[]{namespaceName, key, MetricExt.COUNTER}
); );
} }
@JRubyMethod @Override
public IRubyObject gauge(final ThreadContext context, final IRubyObject key, protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key,
final IRubyObject value) { final IRubyObject value) {
return metric.gauge(context, namespaceName, key, value); return metric.gauge(context, namespaceName, key, value);
} }
@JRubyMethod(required = 1, optional = 1) @Override
public IRubyObject increment(final ThreadContext context, final IRubyObject[] args) { protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 1) { if (args.length == 1) {
return metric.increment(context, namespaceName, args[0]); return metric.increment(context, namespaceName, args[0]);
} else { } else {
@ -71,8 +70,8 @@ public final class NamespacedMetricExt extends RubyObject {
} }
} }
@JRubyMethod(required = 1, optional = 1) @Override
public IRubyObject decrement(final ThreadContext context, final IRubyObject[] args) { protected IRubyObject doDecrement(final ThreadContext context, final IRubyObject[] args) {
if (args.length == 1) { if (args.length == 1) {
return metric.decrement(context, namespaceName, args[0]); return metric.decrement(context, namespaceName, args[0]);
} else { } else {
@ -80,24 +79,25 @@ public final class NamespacedMetricExt extends RubyObject {
} }
} }
@JRubyMethod @Override
public IRubyObject time(final ThreadContext context, final IRubyObject key, final Block block) { protected IRubyObject doTime(final ThreadContext context, final IRubyObject key,
final Block block) {
return metric.time(context, namespaceName, key, block); return metric.time(context, namespaceName, key, block);
} }
@JRubyMethod(name = "report_time") protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject key,
public IRubyObject reportTime(final ThreadContext context, final IRubyObject key,
final IRubyObject duration) { final IRubyObject duration) {
return metric.reportTime(context, namespaceName, key, duration); return metric.reportTime(context, namespaceName, key, duration);
} }
@JRubyMethod(name = "namespace_name") @Override
public RubyArray namespaceName(final ThreadContext context) { protected RubyArray getNamespaceName(final ThreadContext context) {
return namespaceName; return namespaceName;
} }
@JRubyMethod @Override
public NamespacedMetricExt namespace(final ThreadContext context, final IRubyObject name) { protected NamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
MetricExt.validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS); MetricExt.validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return create(this.metric, (RubyArray) namespaceName.op_plus( return create(this.metric, (RubyArray) namespaceName.op_plus(
name instanceof RubyArray ? name : RubyArray.newArray(context.runtime, name) name instanceof RubyArray ? name : RubyArray.newArray(context.runtime, name)

View file

@ -0,0 +1,100 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@JRubyClass(name = "NullMetric")
public final class NullMetricExt extends AbstractSimpleMetricExt {
private IRubyObject collector;
public NullMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(optional = 1)
public IRubyObject initialize(final ThreadContext context, final IRubyObject[] collector) {
if (collector.length == 0) {
this.collector = context.nil;
} else {
this.collector = collector[0];
}
return this;
}
@Override
protected IRubyObject getCollector(final ThreadContext context) {
return collector;
}
@Override
protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
MetricExt.validateKey(context, null, args[1]);
return context.nil;
}
@Override
protected IRubyObject doDecrement(final ThreadContext context, final IRubyObject[] args) {
MetricExt.validateKey(context, null, args[1]);
return context.nil;
}
@Override
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject value) {
MetricExt.validateKey(context, null, key);
return context.nil;
}
@Override
protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final IRubyObject duration) {
MetricExt.validateKey(context, null, key);
return context.nil;
}
@Override
protected IRubyObject doTime(final ThreadContext context, final IRubyObject namespace,
final IRubyObject key, final Block block) {
MetricExt.validateKey(context, null, key);
if (!block.isGiven()) {
return NullMetricExt.NullTimedExecution.INSTANCE;
}
return block.call(context);
}
@Override
protected AbstractNamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
MetricExt.validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return NullNamespacedMetricExt.create(
this,
name instanceof RubyArray ? (RubyArray) name : RubyArray.newArray(context.runtime, name)
);
}
@JRubyClass(name = "NullTimedExecution")
public static final class NullTimedExecution extends RubyObject {
private static final NullMetricExt.NullTimedExecution INSTANCE =
new NullMetricExt.NullTimedExecution(RubyUtil.RUBY, RubyUtil.NULL_TIMED_EXECUTION_CLASS);
public NullTimedExecution(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public RubyFixnum stop(final ThreadContext context) {
return RubyFixnum.newFixnum(context.runtime, 0L);
}
}
}

View file

@ -0,0 +1,117 @@
package org.logstash.instrument.metrics;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@JRubyClass(name = "NamespacedNullMetric", parent = "AbstractNamespacedMetric")
public final class NullNamespacedMetricExt extends AbstractNamespacedMetricExt {
private static final RubySymbol NULL = RubyUtil.RUBY.newSymbol("null");
private RubyArray namespaceName;
private NullMetricExt metric;
public static AbstractNamespacedMetricExt create(final NullMetricExt metric,
final RubyArray namespaceName) {
final NullNamespacedMetricExt res =
new NullNamespacedMetricExt(RubyUtil.RUBY, RubyUtil.NULL_NAMESPACED_METRIC_CLASS);
res.metric = metric;
res.namespaceName = namespaceName;
return res;
}
public NullNamespacedMetricExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(optional = 2)
public NullNamespacedMetricExt initialize(final ThreadContext context,
final IRubyObject[] args) {
this.metric = args.length > 0 && !args[0].isNil() ? (NullMetricExt) args[0] : null;
final IRubyObject namespaceName = args.length == 2 ? args[1] : NULL;
if (namespaceName instanceof RubyArray) {
this.namespaceName = (RubyArray) namespaceName;
} else {
this.namespaceName = RubyArray.newArray(context.runtime, namespaceName);
}
return this;
}
@Override
protected IRubyObject getCollector(final ThreadContext context) {
return metric.collector(context);
}
@Override
protected IRubyObject getCounter(final ThreadContext context, final IRubyObject key) {
return NullNamespacedMetricExt.NullCounter.INSTANCE;
}
@Override
protected IRubyObject getGauge(final ThreadContext context, final IRubyObject key,
final IRubyObject value) {
return context.nil;
}
@Override
protected IRubyObject doIncrement(final ThreadContext context, final IRubyObject[] args) {
return context.nil;
}
@Override
protected IRubyObject doDecrement(final ThreadContext context, final IRubyObject[] args) {
return context.nil;
}
@Override
protected IRubyObject doTime(final ThreadContext context, final IRubyObject key,
final Block block) {
return metric.time(context, namespaceName, key, block);
}
@Override
protected IRubyObject doReportTime(final ThreadContext context, final IRubyObject key,
final IRubyObject duration) {
return context.nil;
}
@Override
protected RubyArray getNamespaceName(final ThreadContext context) {
return namespaceName;
}
@Override
protected AbstractNamespacedMetricExt createNamespaced(final ThreadContext context,
final IRubyObject name) {
MetricExt.validateName(context, name, RubyUtil.METRIC_NO_NAMESPACE_PROVIDED_CLASS);
return create(this.metric, (RubyArray) namespaceName.op_plus(
name instanceof RubyArray ? name : RubyArray.newArray(context.runtime, name)
));
}
@JRubyClass(name = "NullCounter")
public static final class NullCounter extends RubyObject {
public static final NullNamespacedMetricExt.NullCounter INSTANCE =
new NullNamespacedMetricExt.NullCounter(RubyUtil.RUBY, RubyUtil.NULL_COUNTER_CLASS);
public NullCounter(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public IRubyObject increment(final ThreadContext context, final IRubyObject value) {
return context.nil;
}
}
}

View file

@ -0,0 +1,90 @@
package org.logstash.plugins;
import java.util.Arrays;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
public final class PluginFactoryExt {
@JRubyClass(name = "ExecutionContextFactory")
public static final class ExecutionContext extends RubyBasicObject {
private IRubyObject agent;
private IRubyObject pipeline;
private IRubyObject dlqWriter;
public ExecutionContext(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public PluginFactoryExt.ExecutionContext initialize(final ThreadContext context,
final IRubyObject agent, final IRubyObject pipeline, final IRubyObject dlqWriter) {
this.agent = agent;
this.pipeline = pipeline;
this.dlqWriter = dlqWriter;
return this;
}
@JRubyMethod
public ExecutionContextExt create(final ThreadContext context, final IRubyObject id,
final IRubyObject classConfigName) {
return new ExecutionContextExt(
context.runtime, RubyUtil.EXECUTION_CONTEXT_CLASS
).initialize(
context, new IRubyObject[]{pipeline, agent, id, classConfigName, dlqWriter}
);
}
}
@JRubyClass(name = "PluginMetricFactory")
public static final class Metrics extends RubyBasicObject {
private static final RubySymbol STATS = RubyUtil.RUBY.newSymbol("stats");
private static final RubySymbol PIPELINES = RubyUtil.RUBY.newSymbol("pipelines");
private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins");
private RubySymbol pipelineId;
private AbstractMetricExt metric;
public Metrics(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod
public PluginFactoryExt.Metrics initialize(final ThreadContext context,
final IRubyObject pipelineId, final IRubyObject metrics) {
this.pipelineId = pipelineId.convertToString().intern19();
this.metric = (AbstractMetricExt) metrics;
return this;
}
@JRubyMethod
public AbstractNamespacedMetricExt create(final ThreadContext context, final IRubyObject pluginType) {
return metric.namespace(
context,
RubyArray.newArray(
context.runtime, Arrays.asList(STATS, PIPELINES, pipelineId, PLUGINS)
)
).namespace(
context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString()))
);
}
}
}