mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
MINOR: Align behaviour of acked queue Ruby wrappers with behaviour of in memory queue wrappers
Fixes #9003
This commit is contained in:
parent
9bca2bf303
commit
28a2d1d48d
2 changed files with 11 additions and 31 deletions
|
@ -55,12 +55,6 @@ module LogStash; module Util
|
|||
end
|
||||
alias_method(:<<, :push)
|
||||
|
||||
# Block for X millis
|
||||
def poll(millis)
|
||||
check_closed("read")
|
||||
@queue.read_batch(1, millis).get_elements.first
|
||||
end
|
||||
|
||||
def read_batch(size, wait)
|
||||
check_closed("read a batch")
|
||||
@queue.read_batch(size, wait)
|
||||
|
@ -141,7 +135,7 @@ module LogStash; module Util
|
|||
# create a new empty batch
|
||||
# @return [ReadBatch] a new empty read batch
|
||||
def new_batch
|
||||
ReadBatch.new(@queue, @batch_size, @wait_for)
|
||||
ReadBatch.new(@queue, 0, 0)
|
||||
end
|
||||
|
||||
def read_batch
|
||||
|
@ -149,8 +143,7 @@ module LogStash; module Util
|
|||
raise QueueClosedError.new("Attempt to take a batch from a closed AckedQueue")
|
||||
end
|
||||
|
||||
batch = new_batch
|
||||
batch.read_next
|
||||
batch = ReadBatch.new(@queue, @batch_size, @wait_for)
|
||||
start_metrics(batch)
|
||||
batch
|
||||
end
|
||||
|
@ -191,23 +184,9 @@ module LogStash; module Util
|
|||
|
||||
class ReadBatch
|
||||
def initialize(queue, size, wait)
|
||||
@queue = queue
|
||||
@size = size
|
||||
@wait = wait
|
||||
|
||||
@originals = Hash.new
|
||||
|
||||
# TODO: disabled for https://github.com/elastic/logstash/issues/6055 - will have to properly refactor
|
||||
# @cancelled = Hash.new
|
||||
|
||||
@generated = Hash.new
|
||||
@acked_batch = nil
|
||||
end
|
||||
|
||||
def read_next
|
||||
@acked_batch = @queue.read_batch(@size, @wait)
|
||||
return if @acked_batch.nil?
|
||||
@acked_batch.get_elements.each { |e| @originals[e] = true }
|
||||
@acked_batch = queue.read_batch(size, wait)
|
||||
@originals = @acked_batch.nil? ? Hash.new : @acked_batch.get_elements
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -229,8 +208,6 @@ module LogStash; module Util
|
|||
end
|
||||
|
||||
def each(&blk)
|
||||
# below the checks for @cancelled.include?(e) have been replaced by e.cancelled?
|
||||
# TODO: for https://github.com/elastic/logstash/issues/6055 = will have to properly refactor
|
||||
@originals.each do |e, _|
|
||||
blk.call(e) unless e.cancelled?
|
||||
end
|
||||
|
|
|
@ -2,8 +2,8 @@ package org.logstash.ackedqueue.ext;
|
|||
|
||||
import java.io.IOException;
|
||||
import org.jruby.Ruby;
|
||||
import org.jruby.RubyArray;
|
||||
import org.jruby.RubyClass;
|
||||
import org.jruby.RubyHash;
|
||||
import org.jruby.RubyObject;
|
||||
import org.jruby.anno.JRubyClass;
|
||||
import org.jruby.anno.JRubyMethod;
|
||||
|
@ -32,9 +32,12 @@ public final class RubyAckedBatch extends RubyObject {
|
|||
|
||||
@JRubyMethod(name = "get_elements")
|
||||
public IRubyObject ruby_get_elements(ThreadContext context) {
|
||||
RubyArray result = context.runtime.newArray();
|
||||
this.batch.getElements().forEach(e -> result.add(
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(context.runtime, (Event) e)));
|
||||
final RubyHash result = RubyHash.newHash(context.runtime);
|
||||
this.batch.getElements().forEach(e -> result.put(
|
||||
JrubyEventExtLibrary.RubyEvent.newRubyEvent(context.runtime, (Event) e),
|
||||
context.tru
|
||||
)
|
||||
);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue