mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
0a1a761442
commit
4ada6dbf4d
3 changed files with 76 additions and 28 deletions
|
@ -31,12 +31,12 @@ module LogStash; module Util
|
||||||
def with_queue(queue)
|
def with_queue(queue)
|
||||||
@queue = queue
|
@queue = queue
|
||||||
@queue.open
|
@queue.open
|
||||||
@closed = Concurrent::AtomicBoolean.new(false)
|
@closed = java.util.concurrent.atomic.AtomicBoolean.new(false)
|
||||||
self
|
self
|
||||||
end
|
end
|
||||||
|
|
||||||
def closed?
|
def closed?
|
||||||
@closed.true?
|
@closed.get
|
||||||
end
|
end
|
||||||
|
|
||||||
# Push an object to the queue if the queue is full
|
# Push an object to the queue if the queue is full
|
||||||
|
@ -55,7 +55,7 @@ module LogStash; module Util
|
||||||
end
|
end
|
||||||
|
|
||||||
def write_client
|
def write_client
|
||||||
WriteClient.new(self)
|
LogStash::AckedWriteClient.create(@queue, @closed)
|
||||||
end
|
end
|
||||||
|
|
||||||
def read_client()
|
def read_client()
|
||||||
|
@ -63,7 +63,7 @@ module LogStash; module Util
|
||||||
end
|
end
|
||||||
|
|
||||||
def check_closed(action)
|
def check_closed(action)
|
||||||
if closed?
|
if @closed.get
|
||||||
raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
|
raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -74,7 +74,7 @@ module LogStash; module Util
|
||||||
|
|
||||||
def close
|
def close
|
||||||
@queue.close
|
@queue.close
|
||||||
@closed.make_true
|
@closed.set(true)
|
||||||
end
|
end
|
||||||
|
|
||||||
class ReadClient
|
class ReadClient
|
||||||
|
@ -222,28 +222,5 @@ module LogStash; module Util
|
||||||
@originals.size + @generated.size
|
@originals.size + @generated.size
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class WriteClient
|
|
||||||
def initialize(queue)
|
|
||||||
@queue = queue
|
|
||||||
end
|
|
||||||
|
|
||||||
def push(event)
|
|
||||||
if @queue.closed?
|
|
||||||
raise QueueClosedError.new("Attempted to write an event to a closed AckedQueue")
|
|
||||||
end
|
|
||||||
@queue.push(event)
|
|
||||||
end
|
|
||||||
alias_method(:<<, :push)
|
|
||||||
|
|
||||||
def push_batch(batch)
|
|
||||||
if @queue.closed?
|
|
||||||
raise QueueClosedError.new("Attempted to write a batch to a closed AckedQueue")
|
|
||||||
end
|
|
||||||
batch.each do |event|
|
|
||||||
push(event)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
end end
|
end end
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.jruby.runtime.ObjectAllocator;
|
||||||
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||||
import org.logstash.ackedqueue.ext.RubyAckedBatch;
|
import org.logstash.ackedqueue.ext.RubyAckedBatch;
|
||||||
import org.logstash.ext.JRubyWrappedWriteClientExt;
|
import org.logstash.ext.JRubyWrappedWriteClientExt;
|
||||||
|
import org.logstash.ext.JrubyAckedWriteClientExt;
|
||||||
import org.logstash.ext.JrubyEventExtLibrary;
|
import org.logstash.ext.JrubyEventExtLibrary;
|
||||||
import org.logstash.ext.JrubyMemoryReadBatchExt;
|
import org.logstash.ext.JrubyMemoryReadBatchExt;
|
||||||
import org.logstash.ext.JrubyMemoryReadClientExt;
|
import org.logstash.ext.JrubyMemoryReadClientExt;
|
||||||
|
@ -55,6 +56,8 @@ public final class RubyUtil {
|
||||||
|
|
||||||
public static final RubyClass MEMORY_WRITE_CLIENT_CLASS;
|
public static final RubyClass MEMORY_WRITE_CLIENT_CLASS;
|
||||||
|
|
||||||
|
public static final RubyClass ACKED_WRITE_CLIENT_CLASS;
|
||||||
|
|
||||||
public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS;
|
public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
|
@ -71,6 +74,8 @@ public final class RubyUtil {
|
||||||
setupLogstashClass(JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class);
|
setupLogstashClass(JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class);
|
||||||
MEMORY_WRITE_CLIENT_CLASS =
|
MEMORY_WRITE_CLIENT_CLASS =
|
||||||
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
|
setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class);
|
||||||
|
ACKED_WRITE_CLIENT_CLASS =
|
||||||
|
setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class);
|
||||||
WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
|
WRAPPED_SYNCHRONOUS_QUEUE_CLASS =
|
||||||
setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
|
setupLogstashClass(JrubyWrappedSynchronousQueueExt::new,
|
||||||
JrubyWrappedSynchronousQueueExt.class);
|
JrubyWrappedSynchronousQueueExt.class);
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
package org.logstash.ext;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import org.jruby.Ruby;
|
||||||
|
import org.jruby.RubyClass;
|
||||||
|
import org.jruby.RubyObject;
|
||||||
|
import org.jruby.anno.JRubyClass;
|
||||||
|
import org.jruby.anno.JRubyMethod;
|
||||||
|
import org.jruby.runtime.ThreadContext;
|
||||||
|
import org.jruby.runtime.builtin.IRubyObject;
|
||||||
|
import org.logstash.RubyUtil;
|
||||||
|
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
|
||||||
|
|
||||||
|
@JRubyClass(name = "AckedWriteClient")
|
||||||
|
public final class JrubyAckedWriteClientExt extends RubyObject {
|
||||||
|
|
||||||
|
private JRubyAckedQueueExt queue;
|
||||||
|
|
||||||
|
private AtomicBoolean closed;
|
||||||
|
|
||||||
|
@JRubyMethod(meta = true, required = 2)
|
||||||
|
public static IRubyObject create(final ThreadContext context, IRubyObject recv,
|
||||||
|
final IRubyObject queue, final IRubyObject closed) {
|
||||||
|
return new JrubyAckedWriteClientExt(
|
||||||
|
context.runtime, RubyUtil.ACKED_WRITE_CLIENT_CLASS,
|
||||||
|
(JRubyAckedQueueExt) queue.toJava(
|
||||||
|
JRubyAckedQueueExt.class
|
||||||
|
),
|
||||||
|
(AtomicBoolean) closed.toJava(AtomicBoolean.class)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) {
|
||||||
|
super(runtime, metaClass);
|
||||||
|
}
|
||||||
|
|
||||||
|
private JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass,
|
||||||
|
final JRubyAckedQueueExt queue, final AtomicBoolean closed) {
|
||||||
|
super(runtime, metaClass);
|
||||||
|
this.queue = queue;
|
||||||
|
this.closed = closed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JRubyMethod(name = {"push", "<<"}, required = 1)
|
||||||
|
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) {
|
||||||
|
ensureOpen();
|
||||||
|
queue.ruby_write(context, event);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@JRubyMethod(name = "push_batch", required = 1)
|
||||||
|
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) {
|
||||||
|
ensureOpen();
|
||||||
|
for (final IRubyObject event : (Collection<JrubyEventExtLibrary.RubyEvent>) batch) {
|
||||||
|
queue.ruby_write(context, event);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void ensureOpen() {
|
||||||
|
if (closed.get()) {
|
||||||
|
throw new IllegalStateException("Tried to write to a closed queue.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue