JAVAFICATION: Port remainder of synchronous queue to Java (#8991)

* JAVAFICATION: Port WrappedSynchronousQueue.WriteClient and WrappedSynchronousQueue to Java
This commit is contained in:
Dan Hermann 2018-01-22 01:24:32 -08:00 committed by Armin
parent 42147e8e25
commit 4aa7a2c995
10 changed files with 143 additions and 61 deletions

View file

@ -4,7 +4,6 @@ require "logstash/logging"
require "logstash/plugin"
require "logstash/namespace"
require "logstash/config/mixin"
require "logstash/util/wrapped_synchronous_queue"
require "concurrent/atomic/atomic_fixnum"
class LogStash::Outputs::Base < LogStash::Plugin

View file

@ -3,7 +3,6 @@ require "fileutils"
require "logstash/event"
require "logstash/namespace"
require "logstash/util/wrapped_acked_queue"
require "logstash/util/wrapped_synchronous_queue"
module LogStash
class QueueFactory
@ -29,7 +28,7 @@ module LogStash
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
when "memory"
# memory is the legacy and default setting
LogStash::Util::WrappedSynchronousQueue.new(
LogStash::WrappedSynchronousQueue.new(
settings.get("pipeline.batch.size") * settings.get("pipeline.workers") * 2
)
else

View file

@ -1,42 +0,0 @@
# encoding: utf-8
module LogStash; module Util
class WrappedSynchronousQueue
java_import java.util.concurrent.ArrayBlockingQueue
java_import java.util.concurrent.TimeUnit
java_import org.logstash.common.LsQueueUtils
def initialize (size)
@queue = ArrayBlockingQueue.new(size)
end
attr_reader :queue
def write_client
WriteClient.new(@queue)
end
def read_client
LogStash::MemoryReadClient.new(@queue, 125, 50)
end
def close
# ignore
end
class WriteClient
def initialize(queue)
@queue = queue
end
def push(event)
@queue.put(event)
end
alias_method(:<<, :push)
def push_batch(batch)
LsQueueUtils.addAll(@queue, batch)
end
end
end
end end

View file

@ -1,6 +1,5 @@
# encoding: utf-8
require "logstash/instrument/metric"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/event"
require_relative "../../support/mocks_classes"
require "spec_helper"
@ -98,7 +97,7 @@ describe LogStash::WrappedWriteClient do
end
context "WrappedSynchronousQueue" do
let(:queue) { LogStash::Util::WrappedSynchronousQueue.new(1024) }
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
before do
read_client.set_events_metric(metric.namespace([:stats, :events]))

View file

@ -79,7 +79,7 @@ describe LogStash::QueueFactory do
it "returns a `WrappedSynchronousQueue`" do
queue = subject.create(settings)
expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue)
expect(queue).to be_kind_of(LogStash::WrappedSynchronousQueue)
queue.close
end
end

View file

@ -1,16 +1,15 @@
# encoding: utf-8
require "spec_helper"
require "logstash/util/wrapped_synchronous_queue"
require "logstash/instrument/collector"
describe LogStash::Util::WrappedSynchronousQueue do
describe LogStash::WrappedSynchronousQueue do
subject {LogStash::Util::WrappedSynchronousQueue.new(5)}
subject {LogStash::WrappedSynchronousQueue.new(5)}
describe "queue clients" do
context "when requesting a write client" do
it "returns a client" do
expect(subject.write_client).to be_a(LogStash::Util::WrappedSynchronousQueue::WriteClient)
expect(subject.write_client).to be_a(LogStash::MemoryWriteClient)
end
end

View file

@ -14,7 +14,9 @@ import org.logstash.ext.JRubyWrappedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.ext.JrubyMemoryReadBatchExt;
import org.logstash.ext.JrubyMemoryReadClientExt;
import org.logstash.ext.JrubyMemoryWriteClientExt;
import org.logstash.ext.JrubyTimestampExtLibrary;
import org.logstash.ext.JrubyWrappedSynchronousQueueExt;
/**
* Utilities around interaction with the {@link Ruby} runtime.
@ -51,6 +53,10 @@ public final class RubyUtil {
public static final RubyClass MEMORY_READ_CLIENT_CLASS;
public static final RubyClass MEMORY_WRITE_CLIENT_CLASS;
public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS;
static {
RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
@ -63,6 +69,11 @@ public final class RubyUtil {
setupLogstashClass(JRubyWrappedWriteClientExt::new, JRubyWrappedWriteClientExt.class);
MEMORY_READ_CLIENT_CLASS =
setupLogstashClass(JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class);
MEMORY_WRITE_CLIENT_CLASS =
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
JrubyWrappedSynchronousQueueExt.class);
RUBY_EVENT_CLASS = setupLogstashClass(
JrubyEventExtLibrary.RubyEvent::new, JrubyEventExtLibrary.RubyEvent.class
);

View file

@ -29,8 +29,8 @@ public class JrubyMemoryReadClientExt extends RubyObject {
RubyUtil.RUBY.newSymbol("duration_in_millis");
private BlockingQueue queue;
private ConcurrentHashMap<Long, IRubyObject> inflightBatches;
private ConcurrentHashMap<Long, Long> inflightClocks;
private final ConcurrentHashMap<Long, IRubyObject> inflightBatches = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Long, Long> inflightClocks = new ConcurrentHashMap<>();
private int batchSize;
private long waitForNanos;
private LongCounter eventMetricOut;
@ -44,23 +44,30 @@ public class JrubyMemoryReadClientExt extends RubyObject {
super(runtime, metaClass);
}
private JrubyMemoryReadClientExt(final Ruby runtime, final RubyClass metaClass,
BlockingQueue queue, int batchSize, int waitForMillis) {
super(runtime, metaClass);
this.queue = queue;
this.batchSize = batchSize;
waitForNanos = TimeUnit.NANOSECONDS.convert(waitForMillis, TimeUnit.MILLISECONDS);
}
@JRubyMethod(name = "initialize")
@SuppressWarnings("unchecked")
public void rubyInitialize(final ThreadContext context, IRubyObject queue,
IRubyObject batchSize, IRubyObject waitForMillis) {
this.queue = (BlockingQueue) (((JavaProxy) queue).getObject());
// Note that @inflight_batches as a central mechanism for tracking inflight
// batches will fail if we have multiple read clients in the pipeline.
inflightBatches = new ConcurrentHashMap<>();
// allow the worker thread to report the execution time of the filter + output
inflightClocks = new ConcurrentHashMap<>();
this.batchSize = ((RubyNumeric) batchSize).getIntValue();
waitForNanos = TimeUnit.NANOSECONDS.convert(
((RubyNumeric) waitForMillis).getIntValue(), TimeUnit.MILLISECONDS);
}
public static JrubyMemoryReadClientExt create(BlockingQueue queue, int batchSize,
int waitForMillis) {
return new JrubyMemoryReadClientExt(RubyUtil.RUBY,
RubyUtil.MEMORY_READ_CLIENT_CLASS, queue, batchSize, waitForMillis);
}
@JRubyMethod(name = "close")
public void rubyClose(final ThreadContext context) {
// noop, for compatibility with acked queue read client

View file

@ -0,0 +1,61 @@
package org.logstash.ext;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.java.proxies.JavaProxy;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.LsQueueUtils;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
@JRubyClass(name = "MemoryWriteClient")
public class JrubyMemoryWriteClientExt extends RubyObject {
private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
public JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
private JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass,
BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
super(runtime, metaClass);
this.queue = queue;
}
public static JrubyMemoryWriteClientExt create(
BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue) {
return new JrubyMemoryWriteClientExt(RubyUtil.RUBY,
RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue);
}
@JRubyMethod(name = "initialize")
@SuppressWarnings("unchecked")
public void rubyInitialize(final ThreadContext context, IRubyObject queue) {
this.queue =
(BlockingQueue<JrubyEventExtLibrary.RubyEvent>) (((JavaProxy) queue).getObject());
}
@JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event)
throws InterruptedException {
queue.put((JrubyEventExtLibrary.RubyEvent) event);
return this;
}
@JRubyMethod(name = "push_batch", required = 1)
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch)
throws InterruptedException {
Collection<JrubyEventExtLibrary.RubyEvent> typedBatch =
(Collection<JrubyEventExtLibrary.RubyEvent>)batch;
LsQueueUtils.addAll(queue, typedBatch);
return this;
}
}

View file

@ -0,0 +1,49 @@
package org.logstash.ext;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@JRubyClass(name = "WrappedSynchronousQueue")
public class JrubyWrappedSynchronousQueueExt extends RubyObject {
private BlockingQueue<JrubyEventExtLibrary.RubyEvent> queue;
public JrubyWrappedSynchronousQueueExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
@JRubyMethod(name = "initialize")
@SuppressWarnings("unchecked")
public void rubyInitialize(final ThreadContext context, IRubyObject size) {
int typedSize = ((RubyNumeric)size).getIntValue();
this.queue = new ArrayBlockingQueue<>(typedSize);
}
@JRubyMethod(name = "write_client")
public IRubyObject getWriteClient(final ThreadContext context) {
return JrubyMemoryWriteClientExt.create(queue);
}
@JRubyMethod(name = "read_client")
public IRubyObject getReadClient(final ThreadContext context) {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
return JrubyMemoryReadClientExt.create(queue, 125, 50);
}
@JRubyMethod(name = "close")
public IRubyObject rubyClose(final ThreadContext context) {
// no op
return this;
}
}