JAVAFICATION: Port wrapped write client to Java

Fixes #8977
This commit is contained in:
Armin 2018-01-17 15:54:01 +01:00 committed by Armin Braun
parent 71f6b362ec
commit edd8844670
7 changed files with 159 additions and 84 deletions

View file

@ -1,68 +0,0 @@
# encoding: utf-8
module LogStash module Instrument
class WrappedWriteClient
def initialize(write_client, pipeline, metric, plugin)
@write_client = write_client
pipeline_id = pipeline.pipeline_id.to_s.to_sym
plugin_type = "#{plugin.class.plugin_type}s".to_sym
@events_metrics = metric.namespace([:stats, :events])
@pipeline_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :events])
@plugin_events_metrics = metric.namespace([:stats, :pipelines, pipeline_id, :plugins, plugin_type, plugin.id.to_sym, :events])
@events_metrics_counter = @events_metrics.counter(:in)
@events_metrics_time = @events_metrics.counter(:queue_push_duration_in_millis)
@pipeline_metrics_counter = @pipeline_metrics.counter(:in)
@pipeline_metrics_time = @pipeline_metrics.counter(:queue_push_duration_in_millis)
@plugin_events_metrics_counter = @plugin_events_metrics.counter(:out)
@plugin_events_metrics_time = @plugin_events_metrics.counter(:queue_push_duration_in_millis)
define_initial_metrics_values
end
def get_new_batch
[]
end
def push(event)
increment_counters(1)
start_time = java.lang.System.nano_time
result = @write_client.push(event)
report_execution_time(start_time)
result
end
alias_method(:<<, :push)
def push_batch(batch)
increment_counters(batch.size)
start_time = java.lang.System.nano_time
result = @write_client.push_batch(batch)
report_execution_time(start_time)
result
end
private
def increment_counters(size)
@events_metrics_counter.increment(size)
@pipeline_metrics_counter.increment(size)
@plugin_events_metrics_counter.increment(size)
end
def report_execution_time(start_time)
execution_time = (java.lang.System.nano_time - start_time) / 1_000_000
@events_metrics_time.increment(execution_time)
@pipeline_metrics_time.increment(execution_time)
@plugin_events_metrics_time.increment(execution_time)
end
def define_initial_metrics_values
@events_metrics_counter.increment(0)
@pipeline_metrics_counter.increment(0)
@plugin_events_metrics_counter.increment(0)
@events_metrics_time.increment(0)
@pipeline_metrics_time.increment(0)
@plugin_events_metrics_time.increment(0)
end
end
end end

View file

@ -16,7 +16,6 @@ require "logstash/instrument/namespaced_metric"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/dead_letter_queue_manager"
require "logstash/output_delegator"
require "logstash/java_filter_delegator"
@ -454,7 +453,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
begin
input_queue_client = wrapped_write_client(plugin)
input_queue_client = wrapped_write_client(plugin.id.to_sym)
plugin.run(input_queue_client)
rescue => e
if plugin.stop?
@ -685,10 +684,10 @@ module LogStash; class JavaPipeline < JavaBasePipeline
@drain_queue ? !@filter_queue_client.empty? : false
end
def wrapped_write_client(plugin)
def wrapped_write_client(plugin_id)
#need to ensure that metrics are initialized one plugin at a time, else a race condition can exist.
@mutex.synchronize do
LogStash::Instrument::WrappedWriteClient.new(@input_queue_client, self, metric, plugin)
LogStash::WrappedWriteClient.new(@input_queue_client, @pipeline_id.to_s.to_sym, metric, plugin_id)
end
end
end; end

View file

@ -17,7 +17,6 @@ require "logstash/instrument/namespaced_metric"
require "logstash/instrument/null_metric"
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/dead_letter_queue_manager"
require "logstash/output_delegator"
require "logstash/filter_delegator"
@ -512,7 +511,7 @@ module LogStash; class Pipeline < BasePipeline
def inputworker(plugin)
Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}")
begin
input_queue_client = wrapped_write_client(plugin)
input_queue_client = wrapped_write_client(plugin.id.to_sym)
plugin.run(input_queue_client)
rescue => e
if plugin.stop?
@ -747,10 +746,10 @@ module LogStash; class Pipeline < BasePipeline
@drain_queue ? !@filter_queue_client.empty? : false
end
def wrapped_write_client(plugin)
def wrapped_write_client(plugin_id)
#need to ensure that metrics are initialized one plugin at a time, else a race condition can exist.
@mutex.synchronize do
LogStash::Instrument::WrappedWriteClient.new(@input_queue_client, self, metric, plugin)
LogStash::WrappedWriteClient.new(@input_queue_client, @pipeline_id.to_s.to_sym, metric, plugin_id)
end
end
end; end

View file

@ -1,22 +1,19 @@
# encoding: utf-8
require "logstash/instrument/metric"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/event"
require_relative "../../support/mocks_classes"
require "spec_helper"
describe LogStash::Instrument::WrappedWriteClient do
describe LogStash::WrappedWriteClient do
let!(:write_client) { queue.write_client }
let!(:read_client) { queue.read_client }
let(:pipeline) { double("pipeline", :pipeline_id => :main) }
let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }
let(:plugin) { LogStash::Inputs::DummyInput.new({ "id" => myid }) }
let(:event) { LogStash::Event.new }
let(:myid) { "1234myid" }
let(:myid) { ":1234myid".to_sym }
subject { described_class.new(write_client, pipeline, metric, plugin) }
subject { described_class.new(write_client, :main, metric, myid) }
def threaded_read_client
Thread.new do
@ -81,7 +78,7 @@ describe LogStash::Instrument::WrappedWriteClient do
end
it "record input `out`" do
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid.to_sym][:events][:out].value).to eq(1)
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid][:events][:out].value).to eq(1)
end
context "recording of the duration of pushing to the queue" do
@ -94,7 +91,7 @@ describe LogStash::Instrument::WrappedWriteClient do
end
it "records at the `plugin level" do
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid.to_sym][:events][:queue_push_duration_in_millis].value).to be_kind_of(Integer)
expect(snapshot_metric[:pipelines][:main][:plugins][:inputs][myid][:events][:queue_push_duration_in_millis].value).to be_kind_of(Integer)
end
end
end

View file

@ -2,6 +2,7 @@
require "logstash/outputs/base"
require "logstash/config/source_loader"
require "logstash/inputs/base"
require "logstash/filters/base"
require "thread"
module LogStash

View file

@ -10,6 +10,7 @@ import org.jruby.exceptions.RaiseException;
import org.jruby.runtime.ObjectAllocator;
import org.logstash.ackedqueue.ext.AbstractJRubyQueue;
import org.logstash.ackedqueue.ext.RubyAckedBatch;
import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyMemoryReadBatchExt;
import org.logstash.ext.JrubyTimestampExtLibrary;
@ -45,6 +46,8 @@ public final class RubyUtil {
public static final RubyClass MEMORY_READ_BATCH_CLASS;
public static final RubyClass WRAPPED_WRITE_CLIENT_CLASS;
static {
RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
@ -53,6 +56,8 @@ public final class RubyUtil {
);
MEMORY_READ_BATCH_CLASS =
setupLogstashClass(JrubyMemoryReadBatchExt::new, JrubyMemoryReadBatchExt.class);
WRAPPED_WRITE_CLIENT_CLASS =
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);

View file

@ -0,0 +1,142 @@
package org.logstash.ext;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.internal.runtime.methods.DynamicMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
@JRubyClass(name = "WrappedWriteClient")
public final class JRubyWrappedWriteClientExt extends RubyObject {
private static final RubySymbol PUSH_DURATION_KEY =
RubyUtil.RUBY.newSymbol("queue_push_duration_in_millis");
private static final RubySymbol IN_KEY = RubyUtil.RUBY.newSymbol("in");
private DynamicMethod pushOne;
private DynamicMethod pushBatch;
private IRubyObject writeClient;
private IRubyObject eventsMetricsCounter;
private IRubyObject eventsMetricsTime;
private IRubyObject pipelineMetricsCounter;
private IRubyObject pipelineMetricsTime;
private IRubyObject pluginMetricsCounter;
private IRubyObject pluginMetricsTime;
public JRubyWrappedWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize", optional = 4)
public IRubyObject ruby_initialize(final ThreadContext context, final IRubyObject[] args) {
this.writeClient = args[0];
final String pipelineId = args[1].asJavaString();
final IRubyObject metric = args[2];
final IRubyObject pluginId = args[3];
final IRubyObject eventsMetrics = getMetric(metric, "stats", "events");
eventsMetricsCounter = getCounter(eventsMetrics, IN_KEY);
eventsMetricsTime = getCounter(eventsMetrics, PUSH_DURATION_KEY);
final IRubyObject pipelineMetrics =
getMetric(metric, "stats", "pipelines", pipelineId, "events");
pipelineMetricsCounter = getCounter(pipelineMetrics, IN_KEY);
pipelineMetricsTime = getCounter(pipelineMetrics, PUSH_DURATION_KEY);
final IRubyObject pluginMetrics = getMetric(
metric, "stats", "pipelines", pipelineId, "plugins", "inputs",
pluginId.asJavaString(), "events"
);
pluginMetricsCounter = getCounter(pluginMetrics, context.runtime.newSymbol("out"));
pluginMetricsTime = getCounter(pluginMetrics, PUSH_DURATION_KEY);
final RubyClass writerClass = writeClient.getMetaClass();
pushOne = writerClass.searchMethod("push");
pushBatch = writerClass.searchMethod("push_batch");
return this;
}
@JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject push(final ThreadContext context, final IRubyObject event) {
final long start = System.nanoTime();
incrementCounters(context, 1L);
final IRubyObject res = pushOne.call(
context, writeClient, RubyUtil.WRAPPED_WRITE_CLIENT_CLASS, "push", event
);
incrementTimers(context, start);
return res;
}
@SuppressWarnings("unchecked")
@JRubyMethod(name = "push_batch", required = 1)
public IRubyObject pushBatch(final ThreadContext context, final IRubyObject batch) {
final long start = System.nanoTime();
incrementCounters(context, (long) ((Collection<IRubyObject>) batch).size());
final IRubyObject res = pushBatch.call(
context, writeClient, RubyUtil.WRAPPED_WRITE_CLIENT_CLASS, "push_batch", batch
);
incrementTimers(context, start);
return res;
}
/**
* @param context Ruby {@link ThreadContext}
* @return Empty {@link RubyArray}
* @deprecated This method exists for backwards compatibility only, it does not do anything but
* return an empty {@link RubyArray}.
*/
@Deprecated
@JRubyMethod(name = "get_new_batch")
public IRubyObject newBatch(final ThreadContext context) {
return context.runtime.newArray();
}
private void incrementCounters(final ThreadContext context, final long count) {
final IRubyObject increment = context.runtime.newFixnum(count);
eventsMetricsCounter.callMethod(context, "increment", increment);
pipelineMetricsCounter.callMethod(context, "increment", increment);
pluginMetricsCounter.callMethod(context, "increment", increment);
}
private void incrementTimers(final ThreadContext context, final long start) {
final IRubyObject increment = context.runtime.newFixnum(
TimeUnit.NANOSECONDS.convert(
System.nanoTime() - start, TimeUnit.MILLISECONDS
)
);
eventsMetricsTime.callMethod(context, "increment", increment);
pipelineMetricsTime.callMethod(context, "increment", increment);
pluginMetricsTime.callMethod(context, "increment", increment);
}
private static IRubyObject getMetric(final IRubyObject base, final String... keys) {
return base.callMethod(
RubyUtil.RUBY.getCurrentContext(), "namespace", toSymbolArray(keys)
);
}
private static IRubyObject toSymbolArray(final String... strings) {
final IRubyObject[] res = new IRubyObject[strings.length];
for (int i = 0; i < strings.length; ++i) {
res[i] = RubyUtil.RUBY.newSymbol(strings[i]);
}
return RubyUtil.RUBY.newArray(res);
}
private static IRubyObject getCounter(final IRubyObject metric, final RubySymbol key) {
final ThreadContext context = RubyUtil.RUBY.getCurrentContext();
final IRubyObject counter = metric.callMethod(context, "counter", key);
counter.callMethod(context, "increment", context.runtime.newFixnum(0));
return counter;
}
}