mirror of
https://github.com/elastic/logstash.git
synced 2025-04-23 22:27:21 -04:00
rename queueMaxSizeInBytes to queueMaxBytes and currentSize to currentByteSize
Fixes #6297
This commit is contained in:
parent
ce608b0921
commit
265d45f3ce
12 changed files with 38 additions and 38 deletions
|
@ -67,14 +67,14 @@ public class JrubyAckedQueueExtLibrary implements Library {
|
|||
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
|
||||
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
|
||||
int checkpointMaxInterval = RubyFixnum.num2int(args[5]);
|
||||
long queueMaxSizeInBytes = RubyFixnum.num2long(args[6]);
|
||||
long queueMaxBytes = RubyFixnum.num2long(args[6]);
|
||||
|
||||
Settings s = new FileSettings(args[0].asJavaString());
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new MmapPageIO(pageNum, size, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new FileCheckpointIO(source);
|
||||
s.setCapacity(capacity);
|
||||
s.setMaxUnread(maxUnread);
|
||||
s.setQueueMaxSizeInBytes(queueMaxSizeInBytes);
|
||||
s.setQueueMaxBytes(queueMaxBytes);
|
||||
s.setCheckpointMaxAcks(checkpointMaxAcks);
|
||||
s.setCheckpointMaxWrites(checkpointMaxWrites);
|
||||
s.setCheckpointMaxInterval(checkpointMaxInterval);
|
||||
|
|
|
@ -64,14 +64,14 @@ public class JrubyAckedQueueMemoryExtLibrary implements Library {
|
|||
|
||||
int capacity = RubyFixnum.num2int(args[1]);
|
||||
int maxUnread = RubyFixnum.num2int(args[2]);
|
||||
long queueMaxSizeInBytes = RubyFixnum.num2long(args[3]);
|
||||
long queueMaxBytes = RubyFixnum.num2long(args[3]);
|
||||
|
||||
Settings s = new MemorySettings(args[0].asJavaString());
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
s.setCapacity(capacity);
|
||||
s.setMaxUnread(maxUnread);
|
||||
s.setQueueMaxSizeInBytes(queueMaxSizeInBytes);
|
||||
s.setQueueMaxBytes(queueMaxBytes);
|
||||
s.setElementIOFactory(pageIOFactory);
|
||||
s.setCheckpointIOFactory(checkpointIOFactory);
|
||||
s.setElementClass(Event.class);
|
||||
|
|
|
@ -42,7 +42,7 @@ module LogStash
|
|||
Setting::String.new("http.environment", "production"),
|
||||
Setting::String.new("queue.type", "memory", true, ["persisted", "memory", "memory_acked"]),
|
||||
Setting::Bytes.new("queue.page_capacity", "250mb"),
|
||||
Setting::Bytes.new("queue.max_size", "1024mb"),
|
||||
Setting::Bytes.new("queue.max_bytes", "1024mb"),
|
||||
Setting::Numeric.new("queue.max_events", 0), # 0 is unlimited
|
||||
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
|
||||
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
|
||||
|
|
|
@ -118,7 +118,7 @@ module LogStash; class Pipeline
|
|||
def build_queue_from_settings
|
||||
queue_type = settings.get("queue.type")
|
||||
queue_page_capacity = settings.get("queue.page_capacity")
|
||||
queue_max_size = settings.get("queue.max_size")
|
||||
queue_max_bytes = settings.get("queue.max_bytes")
|
||||
queue_max_events = settings.get("queue.max_events")
|
||||
checkpoint_max_acks = settings.get("queue.checkpoint.acks")
|
||||
checkpoint_max_writes = settings.get("queue.checkpoint.writes")
|
||||
|
@ -126,14 +126,14 @@ module LogStash; class Pipeline
|
|||
|
||||
if queue_type == "memory_acked"
|
||||
# memory_acked is used in tests/specs
|
||||
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_size)
|
||||
LogStash::Util::WrappedAckedQueue.create_memory_based("", queue_page_capacity, queue_max_events, queue_max_bytes)
|
||||
elsif queue_type == "memory"
|
||||
# memory is the legacy and default setting
|
||||
LogStash::Util::WrappedSynchronousQueue.new()
|
||||
elsif queue_type == "persisted"
|
||||
# persisted is the disk based acked queue
|
||||
queue_path = settings.get("path.queue")
|
||||
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_size)
|
||||
LogStash::Util::WrappedAckedQueue.create_file_based(queue_path, queue_page_capacity, queue_max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, queue_max_bytes)
|
||||
else
|
||||
raise(ConfigurationError, "invalid queue.type setting")
|
||||
end
|
||||
|
|
|
@ -19,15 +19,15 @@ module LogStash; module Util
|
|||
class QueueClosedError < ::StandardError; end
|
||||
class NotImplementedError < ::StandardError; end
|
||||
|
||||
def self.create_memory_based(path, capacity, max_events, max_size)
|
||||
def self.create_memory_based(path, capacity, max_events, max_bytes)
|
||||
self.allocate.with_queue(
|
||||
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_size)
|
||||
LogStash::AckedMemoryQueue.new(path, capacity, max_events, max_bytes)
|
||||
)
|
||||
end
|
||||
|
||||
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_size)
|
||||
def self.create_file_based(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
|
||||
self.allocate.with_queue(
|
||||
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_size)
|
||||
LogStash::AckedQueue.new(path, capacity, max_events, checkpoint_max_writes, checkpoint_max_acks, checkpoint_max_interval, max_bytes)
|
||||
)
|
||||
end
|
||||
|
||||
|
|
|
@ -78,7 +78,7 @@ describe LogStash::Pipeline do
|
|||
let(:worker_thread_count) { 8 } # 1 4 8
|
||||
let(:number_of_events) { 100_000 }
|
||||
let(:page_capacity) { 1 * 1024 * 512 } # 1 128
|
||||
let(:max_size) { 1024 * 1024 * 1024 } # 1 gb
|
||||
let(:max_bytes) { 1024 * 1024 * 1024 } # 1 gb
|
||||
let(:queue_type) { "persisted" } # "memory" "memory_acked"
|
||||
let(:times) { [] }
|
||||
|
||||
|
@ -96,7 +96,7 @@ describe LogStash::Pipeline do
|
|||
allow(pipeline_workers_setting).to receive(:default).and_return(worker_thread_count)
|
||||
pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) }
|
||||
pipeline_settings_obj.set("queue.page_capacity", page_capacity)
|
||||
pipeline_settings_obj.set("queue.max_size", max_size)
|
||||
pipeline_settings_obj.set("queue.max_bytes", max_bytes)
|
||||
Thread.new do
|
||||
# make sure we have received all the generated events
|
||||
while counting_output.event_count < number_of_events do
|
||||
|
@ -123,7 +123,7 @@ describe LogStash::Pipeline do
|
|||
expect(_metric[:out].value).to eq(number_of_events)
|
||||
STDOUT.puts " queue.type: #{pipeline_settings_obj.get("queue.type")}"
|
||||
STDOUT.puts " queue.page_capacity: #{pipeline_settings_obj.get("queue.page_capacity") / 1024}KB"
|
||||
STDOUT.puts " queue.max_size: #{pipeline_settings_obj.get("queue.max_size") / 1024}KB"
|
||||
STDOUT.puts " queue.max_bytes: #{pipeline_settings_obj.get("queue.max_bytes") / 1024}KB"
|
||||
STDOUT.puts " workers: #{worker_thread_count}"
|
||||
STDOUT.puts " events: #{number_of_events}"
|
||||
STDOUT.puts " took: #{times.first}s"
|
||||
|
|
|
@ -450,7 +450,7 @@ describe LogStash::Pipeline do
|
|||
allow(settings).to receive(:get).with("queue.type").and_return("memory")
|
||||
allow(settings).to receive(:get).with("queue.page_capacity").and_return(1024 * 1024)
|
||||
allow(settings).to receive(:get).with("queue.max_events").and_return(250)
|
||||
allow(settings).to receive(:get).with("queue.max_size").and_return(1024 * 1024 * 1024)
|
||||
allow(settings).to receive(:get).with("queue.max_bytes").and_return(1024 * 1024 * 1024)
|
||||
allow(settings).to receive(:get).with("queue.checkpoint.acks").and_return(1024)
|
||||
allow(settings).to receive(:get).with("queue.checkpoint.writes").and_return(1024)
|
||||
allow(settings).to receive(:get).with("queue.checkpoint.interval").and_return(1000)
|
||||
|
|
|
@ -9,7 +9,7 @@ public class FileSettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private long queueMaxSizeInBytes;
|
||||
private long queueMaxBytes;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
private int checkpointMaxWrites;
|
||||
|
@ -44,8 +44,8 @@ public class FileSettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(long size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
public Settings setQueueMaxBytes(long size) {
|
||||
this.queueMaxBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -114,7 +114,7 @@ public class FileSettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public long getQueueMaxSizeInBytes() { return queueMaxSizeInBytes; }
|
||||
public long getQueueMaxBytes() { return queueMaxBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
|
|
|
@ -8,7 +8,7 @@ public class MemorySettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private long queueMaxSizeInBytes;
|
||||
private long queueMaxBytes;
|
||||
private final String dirPath;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
|
@ -52,8 +52,8 @@ public class MemorySettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(long size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
public Settings setQueueMaxBytes(long size) {
|
||||
this.queueMaxBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -115,7 +115,7 @@ public class MemorySettings implements Settings {
|
|||
return this.dirPath;
|
||||
}
|
||||
|
||||
public long getQueueMaxSizeInBytes() { return this.queueMaxSizeInBytes; }
|
||||
public long getQueueMaxBytes() { return this.queueMaxBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
|
|
|
@ -41,12 +41,12 @@ public class Queue implements Closeable {
|
|||
|
||||
protected volatile long unreadCount;
|
||||
|
||||
protected volatile long currentSize;
|
||||
protected volatile long currentByteSize;
|
||||
|
||||
private final CheckpointIO checkpointIO;
|
||||
private final PageIOFactory pageIOFactory;
|
||||
private final int pageCapacity;
|
||||
private final long maxSizeInBytes;
|
||||
private final long maxBytes;
|
||||
private final String dirPath;
|
||||
private final int maxUnread;
|
||||
private final int checkpointMaxAcks;
|
||||
|
@ -68,7 +68,7 @@ public class Queue implements Closeable {
|
|||
this(
|
||||
settings.getDirPath(),
|
||||
settings.getCapacity(),
|
||||
settings.getQueueMaxSizeInBytes(),
|
||||
settings.getQueueMaxBytes(),
|
||||
settings.getCheckpointIOFactory().build(settings.getDirPath()),
|
||||
settings.getPageIOFactory(),
|
||||
settings.getElementClass(),
|
||||
|
@ -79,10 +79,10 @@ public class Queue implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
public Queue(String dirPath, int pageCapacity, long maxSizeInBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
public Queue(String dirPath, int pageCapacity, long maxBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
this.dirPath = dirPath;
|
||||
this.pageCapacity = pageCapacity;
|
||||
this.maxSizeInBytes = maxSizeInBytes;
|
||||
this.maxBytes = maxBytes;
|
||||
this.checkpointIO = checkpointIO;
|
||||
this.pageIOFactory = pageIOFactory;
|
||||
this.elementClass = elementClass;
|
||||
|
@ -94,7 +94,7 @@ public class Queue implements Closeable {
|
|||
this.checkpointMaxWrites = checkpointMaxWrites;
|
||||
this.checkpointMaxInterval = checkpointMaxInterval;
|
||||
this.unreadCount = 0;
|
||||
this.currentSize = 0;
|
||||
this.currentByteSize = 0;
|
||||
|
||||
// retrieve the deserialize method
|
||||
try {
|
||||
|
@ -153,7 +153,7 @@ public class Queue implements Closeable {
|
|||
|
||||
PageIO headPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
this.headPage = new HeadPage(headCheckpoint, this, headPageIO);
|
||||
this.currentSize += headPageIO.getCapacity();
|
||||
this.currentByteSize += headPageIO.getCapacity();
|
||||
|
||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||
this.headPage.checkpoint();
|
||||
|
@ -206,7 +206,7 @@ public class Queue implements Closeable {
|
|||
this.tailPages.add(p);
|
||||
this.unreadTailPages.add(p);
|
||||
this.unreadCount += p.unreadCount();
|
||||
this.currentSize += pageIO.getCapacity();
|
||||
this.currentByteSize += pageIO.getCapacity();
|
||||
|
||||
// for now deactivate all tail pages, we will only reactivate the first one at the end
|
||||
pageIO.deactivate();
|
||||
|
@ -225,7 +225,7 @@ public class Queue implements Closeable {
|
|||
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
this.headPage = new HeadPage(pageNum, this, headPageIO);
|
||||
this.headPage.forceCheckpoint();
|
||||
this.currentSize += headPageIO.getCapacity();
|
||||
this.currentByteSize += headPageIO.getCapacity();
|
||||
}
|
||||
|
||||
// @param element the Queueable object to write to the queue
|
||||
|
@ -300,7 +300,7 @@ public class Queue implements Closeable {
|
|||
public boolean isFull() {
|
||||
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside syncronized
|
||||
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
|
||||
if ((this.maxSizeInBytes > 0) && this.currentSize >= this.maxSizeInBytes) {
|
||||
if ((this.maxBytes > 0) && this.currentByteSize >= this.maxBytes) {
|
||||
return true;
|
||||
} else {
|
||||
return ((this.maxUnread > 0) && this.unreadCount >= this.maxUnread);
|
||||
|
@ -495,7 +495,7 @@ public class Queue implements Closeable {
|
|||
|
||||
// remove page data file regardless if it is the first or a middle tail page to free resources
|
||||
result.page.purge();
|
||||
this.currentSize -= result.page.getPageIO().getCapacity();
|
||||
this.currentByteSize -= result.page.getPageIO().getCapacity();
|
||||
|
||||
if (result.index == 0) {
|
||||
// if this is the first page also remove checkpoint file
|
||||
|
|
|
@ -12,7 +12,7 @@ public interface Settings {
|
|||
|
||||
Settings setCapacity(int capacity);
|
||||
|
||||
Settings setQueueMaxSizeInBytes(long size);
|
||||
Settings setQueueMaxBytes(long size);
|
||||
|
||||
Settings setMaxUnread(int maxUnread);
|
||||
|
||||
|
@ -32,7 +32,7 @@ public interface Settings {
|
|||
|
||||
int getCapacity();
|
||||
|
||||
long getQueueMaxSizeInBytes();
|
||||
long getQueueMaxBytes();
|
||||
|
||||
int getMaxUnread();
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ public class TestSettings {
|
|||
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
s.setCapacity(capacity);
|
||||
s.setQueueMaxSizeInBytes(size);
|
||||
s.setQueueMaxBytes(size);
|
||||
s.setElementIOFactory(pageIOFactory);
|
||||
s.setCheckpointIOFactory(checkpointIOFactory);
|
||||
s.setElementClass(StringElement.class);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue