mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
882acda239
commit
9a467776a8
5 changed files with 14 additions and 13 deletions
|
@ -9,7 +9,7 @@ public class FileSettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private int queueMaxSizeInBytes;
|
||||
private long queueMaxSizeInBytes;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
private int checkpointMaxWrites;
|
||||
|
@ -44,7 +44,7 @@ public class FileSettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(int size) {
|
||||
public Settings setQueueMaxSizeInBytes(long size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ public class FileSettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public int getQueueMaxSizeInBytes() { return queueMaxSizeInBytes; }
|
||||
public long getQueueMaxSizeInBytes() { return queueMaxSizeInBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
|
|
|
@ -8,7 +8,7 @@ public class MemorySettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private int queueMaxSizeInBytes;
|
||||
private long queueMaxSizeInBytes;
|
||||
private final String dirPath;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
|
@ -52,7 +52,7 @@ public class MemorySettings implements Settings {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(int size) {
|
||||
public Settings setQueueMaxSizeInBytes(long size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ public class MemorySettings implements Settings {
|
|||
return this.dirPath;
|
||||
}
|
||||
|
||||
public int getQueueMaxSizeInBytes() { return this.queueMaxSizeInBytes; }
|
||||
public long getQueueMaxSizeInBytes() { return this.queueMaxSizeInBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
|
|
|
@ -46,7 +46,7 @@ public class Queue implements Closeable {
|
|||
private final CheckpointIO checkpointIO;
|
||||
private final PageIOFactory pageIOFactory;
|
||||
private final int pageCapacity;
|
||||
private final int maxSize;
|
||||
private final long maxSizeInBytes;
|
||||
private final String dirPath;
|
||||
private final int maxUnread;
|
||||
private final int checkpointMaxAcks;
|
||||
|
@ -79,10 +79,10 @@ public class Queue implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
public Queue(String dirPath, int pageCapacity, int maxSize, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
public Queue(String dirPath, int pageCapacity, long maxSizeInBytes, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
this.dirPath = dirPath;
|
||||
this.pageCapacity = pageCapacity;
|
||||
this.maxSize = maxSize;
|
||||
this.maxSizeInBytes = maxSizeInBytes;
|
||||
this.checkpointIO = checkpointIO;
|
||||
this.pageIOFactory = pageIOFactory;
|
||||
this.elementClass = elementClass;
|
||||
|
@ -300,7 +300,7 @@ public class Queue implements Closeable {
|
|||
public boolean isFull() {
|
||||
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside syncronized
|
||||
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
|
||||
return (((this.maxUnread > 0) ? this.unreadCount >= this.maxUnread : false) || (this.currentSize >= this.maxSize));
|
||||
return (((this.maxUnread > 0) ? this.unreadCount >= this.maxUnread : false) || (this.currentSize >= this.maxSizeInBytes));
|
||||
}
|
||||
|
||||
// @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing)
|
||||
|
@ -491,6 +491,7 @@ public class Queue implements Closeable {
|
|||
|
||||
// remove page data file regardless if it is the first or a middle tail page to free resources
|
||||
result.page.purge();
|
||||
this.currentSize -= result.page.getPageIO().getCapacity();
|
||||
|
||||
if (result.index == 0) {
|
||||
// if this is the first page also remove checkpoint file
|
||||
|
|
|
@ -12,7 +12,7 @@ public interface Settings {
|
|||
|
||||
Settings setCapacity(int capacity);
|
||||
|
||||
Settings setQueueMaxSizeInBytes(int size);
|
||||
Settings setQueueMaxSizeInBytes(long size);
|
||||
|
||||
Settings setMaxUnread(int maxUnread);
|
||||
|
||||
|
@ -32,7 +32,7 @@ public interface Settings {
|
|||
|
||||
int getCapacity();
|
||||
|
||||
int getQueueMaxSizeInBytes();
|
||||
long getQueueMaxSizeInBytes();
|
||||
|
||||
int getMaxUnread();
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ public class TestSettings {
|
|||
return s;
|
||||
}
|
||||
|
||||
public static Settings getSettings(int capacity, int size) {
|
||||
public static Settings getSettings(int capacity, long size) {
|
||||
MemoryCheckpointIO.clearSources();
|
||||
Settings s = new MemorySettings();
|
||||
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue