add ruby wiring to the queue.max_size setting

Fixes #6297
This commit is contained in:
Joao Duarte 2016-11-25 18:32:35 +00:00 committed by João Duarte
parent 461949ba13
commit ce608b0921
7 changed files with 21 additions and 11 deletions

View file

@ -57,22 +57,24 @@ public class JrubyAckedQueueExtLibrary implements Library {
}
// def initialize
@JRubyMethod(name = "initialize", optional = 6)
@JRubyMethod(name = "initialize", optional = 7)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args)
{
args = Arity.scanArgs(context.runtime, args, 6, 0);
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
int checkpointMaxInterval = RubyFixnum.num2int(args[5]);
long queueMaxSizeInBytes = RubyFixnum.num2long(args[6]);
Settings s = new FileSettings(args[0].asJavaString());
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
s.setCapacity(capacity);
s.setMaxUnread(maxUnread);
s.setQueueMaxSizeInBytes(queueMaxSizeInBytes);
s.setCheckpointMaxAcks(checkpointMaxAcks);
s.setCheckpointMaxWrites(checkpointMaxWrites);
s.setCheckpointMaxInterval(checkpointMaxInterval);

View file

@ -57,19 +57,21 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library {
}
// def initialize
@JRubyMethod(name = "initialize", optional = 3)
@JRubyMethod(name = "initialize", optional = 4)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args)
{
args = Arity.scanArgs(context.runtime, args, 3, 0);
args = Arity.scanArgs(context.runtime, args, 4, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
long queueMaxSizeInBytes = RubyFixnum.num2long(args[3]);
Settings s = new MemorySettings(args[0].asJavaString());
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
s.setCapacity(capacity);
s.setMaxUnread(maxUnread);
s.setQueueMaxSizeInBytes(queueMaxSizeInBytes);
s.setElementIOFactory(pageIOFactory);
s.setCheckpointIOFactory(checkpointIOFactory);
s.setElementClass(Event.class);

View file

@ -42,6 +42,7 @@ module LogStash
Setting::String.new("http.environment", "production"),
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
Setting::Bytes.new("queue.page_capacity", "250mb"),
Setting::Bytes.new("queue.max_size", "1024mb"),
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited

View file

@ -118,21 +118,22 @@ module LogStash; class Pipeline
def build_queue_from_settings
queue_type = settings.get("queue.type")
queue_page_capacity = settings.get("queue.page_capacity")
max_events = settings.get("queue.max_events")
queue_max_size = settings.get("queue.max_size")
queue_max_events = settings.get("queue.max_events")
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
checkpoint_max_interval = settings.get("queue.checkpoint.interval")
if queue_type == "memory_acked"
# memory_acked is used in tests/specs
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, max_events)
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_size)
elsif queue_type == "memory"
# memory is the legacy and default setting
LogStash::Util::WrappedSynchronousQueue.new()
elsif queue_type == "persisted"
# persisted is the disk based acked queue
queue_path = settings.get("path.queue")
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_size)
else
raise(ConfigurationError, "invalid queue.type setting")
end

View file

@ -19,15 +19,15 @@ module LogStash; module Util
class QueueClosedError < ::StandardError; end
class NotImplementedError < ::StandardError; end
def self.create_memory_based(path, capacity, size)
def self.create_memory_based(path, capacity, max_events, max_size)
self.allocate.with_queue(
LogStash::AckedMemoryQueue.new(path, capacity, size)
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_size)
)
end
def self.create_file_based(path, capacity, size, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_size)
self.allocate.with_queue(
LogStash::AckedQueue.new(path, capacity, size, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval)
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_size)
)
end

View file

@ -78,6 +78,7 @@ describe LogStash::Pipeline do
let(:worker_thread_count) { 8 } # 1 4 8
let(:number_of_events) { 100_000 }
let(:page_capacity) { 1 * 1024 * 512 } # 1 128
let(:max_size) { 1024 * 1024 * 1024 } # 1 gb
let(:queue_type) { "persisted" } # "memory" "memory_acked"
let(:times) { [] }
@ -95,6 +96,7 @@ describe LogStash::Pipeline do
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
pipeline_settings_obj.set("queue.page_capacity", page_capacity)
pipeline_settings_obj.set("queue.max_size", max_size)
Thread.new do
# make sure we have received all the generated events
while counting_output.event_count < number_of_events do
@ -121,6 +123,7 @@ describe LogStash::Pipeline do
expect(_metric[:out].value).to eq(number_of_events)
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
STDOUT.puts " queue.max_size: #{pipeline_settings_obj.get("queue.max_size") / 1024}KB"
STDOUT.puts " workers: #{worker_thread_count}"
STDOUT.puts " events: #{number_of_events}"
STDOUT.puts " took: #{times.first}s"

View file

@ -450,6 +450,7 @@ describe LogStash::Pipeline do
allow(settings).to receive(:get).with("queue.type").and_return("memory")
allow(settings).to receive(:get).with("queue.page_capacity").and_return(1024 * 1024)
allow(settings).to receive(:get).with("queue.max_events").and_return(250)
allow(settings).to receive(:get).with("queue.max_size").and_return(1024 * 1024 * 1024)
allow(settings).to receive(:get).with("queue.checkpoint.acks").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.writes").and_return(1024)
allow(settings).to receive(:get).with("queue.checkpoint.interval").and_return(1000)