diff --git a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb index 5f28bb753..680f86126 100644 --- a/logstash-core/lib/logstash/util/wrapped_acked_queue.rb +++ b/logstash-core/lib/logstash/util/wrapped_acked_queue.rb @@ -31,12 +31,12 @@ module LogStash; module Util def with_queue(queue) @queue = queue @queue.open - @closed = Concurrent::AtomicBoolean.new(false) + @closed = java.util.concurrent.atomic.AtomicBoolean.new(false) self end def closed? - @closed.true? + @closed.get end # Push an object to the queue if the queue is full @@ -55,7 +55,7 @@ module LogStash; module Util end def write_client - WriteClient.new(self) + LogStash::AckedWriteClient.create(@queue, @closed) end def read_client() @@ -63,7 +63,7 @@ module LogStash; module Util end def check_closed(action) - if closed? + if @closed.get raise QueueClosedError.new("Attempted to #{action} on a closed AckedQueue") end end @@ -74,7 +74,7 @@ module LogStash; module Util def close @queue.close - @closed.make_true + @closed.set(true) end class ReadClient @@ -222,28 +222,5 @@ module LogStash; module Util @originals.size + @generated.size end end - - class WriteClient - def initialize(queue) - @queue = queue - end - - def push(event) - if @queue.closed? - raise QueueClosedError.new("Attempted to write an event to a closed AckedQueue") - end - @queue.push(event) - end - alias_method(:<<, :push) - - def push_batch(batch) - if @queue.closed? - raise QueueClosedError.new("Attempted to write a batch to a closed AckedQueue") - end - batch.each do |event| - push(event) - end - end - end end end end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 4d407a12a..fe9776bf7 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -11,6 +11,7 @@ import org.jruby.runtime.ObjectAllocator; import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.RubyAckedBatch; import org.logstash.ext.JRubyWrappedWriteClientExt; +import org.logstash.ext.JrubyAckedWriteClientExt; import org.logstash.ext.JrubyEventExtLibrary; import org.logstash.ext.JrubyMemoryReadBatchExt; import org.logstash.ext.JrubyMemoryReadClientExt; @@ -55,6 +56,8 @@ public final class RubyUtil { public static final RubyClass MEMORY_WRITE_CLIENT_CLASS; + public static final RubyClass ACKED_WRITE_CLIENT_CLASS; + public static final RubyClass WRAPPED_SYNCHRONOUS_QUEUE_CLASS; static { @@ -71,6 +74,8 @@ public final class RubyUtil { setupLogstashClass(JrubyMemoryReadClientExt::new, JrubyMemoryReadClientExt.class); MEMORY_WRITE_CLIENT_CLASS = setupLogstashClass(JrubyMemoryWriteClientExt::new, JrubyMemoryWriteClientExt.class); + ACKED_WRITE_CLIENT_CLASS = + setupLogstashClass(JrubyAckedWriteClientExt::new, JrubyAckedWriteClientExt.class); WRAPPED_SYNCHRONOUS_QUEUE_CLASS = setupLogstashClass(JrubyWrappedSynchronousQueueExt::new, JrubyWrappedSynchronousQueueExt.class); diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java new file mode 100644 index 000000000..666ec7668 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java @@ -0,0 +1,66 @@ +package org.logstash.ext; + +import java.util.Collection; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jruby.Ruby; +import org.jruby.RubyClass; +import org.jruby.RubyObject; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; + +@JRubyClass(name = "AckedWriteClient") +public final class JrubyAckedWriteClientExt extends RubyObject { + + private JRubyAckedQueueExt queue; + + private AtomicBoolean closed; + + @JRubyMethod(meta = true, required = 2) + public static IRubyObject create(final ThreadContext context, IRubyObject recv, + final IRubyObject queue, final IRubyObject closed) { + return new JrubyAckedWriteClientExt( + context.runtime, RubyUtil.ACKED_WRITE_CLIENT_CLASS, + (JRubyAckedQueueExt) queue.toJava( + JRubyAckedQueueExt.class + ), + (AtomicBoolean) closed.toJava(AtomicBoolean.class) + ); + } + + public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + private JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass, + final JRubyAckedQueueExt queue, final AtomicBoolean closed) { + super(runtime, metaClass); + this.queue = queue; + this.closed = closed; + } + + @JRubyMethod(name = {"push", "<<"}, required = 1) + public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) { + ensureOpen(); + queue.ruby_write(context, event); + return this; + } + + @JRubyMethod(name = "push_batch", required = 1) + public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) { + ensureOpen(); + for (final IRubyObject event : (Collection) batch) { + queue.ruby_write(context, event); + } + return this; + } + + private void ensureOpen() { + if (closed.get()) { + throw new IllegalStateException("Tried to write to a closed queue."); + } + } +}