mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
5438d7e908
commit
2f6a472f52
6 changed files with 102 additions and 2 deletions
|
@ -9,6 +9,7 @@ public class FileSettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private int queueMaxSizeInBytes;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
private int checkpointMaxWrites;
|
||||
|
@ -42,6 +43,12 @@ public class FileSettings implements Settings {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(int size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings setCapacity(int capacity) {
|
||||
this.capacity = capacity;
|
||||
|
@ -106,6 +113,9 @@ public class FileSettings implements Settings {
|
|||
return dirForFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getQueueMaxSizeInBytes() { return queueMaxSizeInBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
return capacity;
|
||||
|
|
|
@ -8,6 +8,7 @@ public class MemorySettings implements Settings {
|
|||
private PageIOFactory pageIOFactory;
|
||||
private Class elementClass;
|
||||
private int capacity;
|
||||
private int queueMaxSizeInBytes;
|
||||
private final String dirPath;
|
||||
private int maxUnread;
|
||||
private int checkpointMaxAcks;
|
||||
|
@ -50,6 +51,12 @@ public class MemorySettings implements Settings {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings setQueueMaxSizeInBytes(int size) {
|
||||
this.queueMaxSizeInBytes = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings setMaxUnread(int maxUnread) {
|
||||
this.maxUnread = maxUnread;
|
||||
|
@ -108,6 +115,8 @@ public class MemorySettings implements Settings {
|
|||
return this.dirPath;
|
||||
}
|
||||
|
||||
public int getQueueMaxSizeInBytes() { return this.queueMaxSizeInBytes; }
|
||||
|
||||
@Override
|
||||
public int getCapacity() {
|
||||
return this.capacity;
|
||||
|
|
|
@ -41,9 +41,12 @@ public class Queue implements Closeable {
|
|||
|
||||
protected volatile long unreadCount;
|
||||
|
||||
protected volatile long currentSize;
|
||||
|
||||
private final CheckpointIO checkpointIO;
|
||||
private final PageIOFactory pageIOFactory;
|
||||
private final int pageCapacity;
|
||||
private final int maxSize;
|
||||
private final String dirPath;
|
||||
private final int maxUnread;
|
||||
private final int checkpointMaxAcks;
|
||||
|
@ -65,6 +68,7 @@ public class Queue implements Closeable {
|
|||
this(
|
||||
settings.getDirPath(),
|
||||
settings.getCapacity(),
|
||||
settings.getQueueMaxSizeInBytes(),
|
||||
settings.getCheckpointIOFactory().build(settings.getDirPath()),
|
||||
settings.getPageIOFactory(),
|
||||
settings.getElementClass(),
|
||||
|
@ -75,9 +79,10 @@ public class Queue implements Closeable {
|
|||
);
|
||||
}
|
||||
|
||||
public Queue(String dirPath, int pageCapacity, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
public Queue(String dirPath, int pageCapacity, int maxSize, CheckpointIO checkpointIO, PageIOFactory pageIOFactory, Class elementClass, int maxUnread, int checkpointMaxWrites, int checkpointMaxAcks, int checkpointMaxInterval) {
|
||||
this.dirPath = dirPath;
|
||||
this.pageCapacity = pageCapacity;
|
||||
this.maxSize = maxSize;
|
||||
this.checkpointIO = checkpointIO;
|
||||
this.pageIOFactory = pageIOFactory;
|
||||
this.elementClass = elementClass;
|
||||
|
@ -89,6 +94,7 @@ public class Queue implements Closeable {
|
|||
this.checkpointMaxWrites = checkpointMaxWrites;
|
||||
this.checkpointMaxInterval = checkpointMaxInterval;
|
||||
this.unreadCount = 0;
|
||||
this.currentSize = 0;
|
||||
|
||||
// retrieve the deserialize method
|
||||
try {
|
||||
|
@ -135,6 +141,7 @@ public class Queue implements Closeable {
|
|||
Checkpoint tailCheckpoint = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum));
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||
|
||||
add(tailCheckpoint, pageIO);
|
||||
}
|
||||
|
||||
|
@ -153,6 +160,7 @@ public class Queue implements Closeable {
|
|||
// head page is non-empty, transform it into a tail page and create a new empty head page
|
||||
|
||||
PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||
|
||||
TailPage p = new TailPage(headCheckpoint, this, pageIO);
|
||||
p.checkpoint();
|
||||
add(headCheckpoint, pageIO);
|
||||
|
@ -197,6 +205,7 @@ public class Queue implements Closeable {
|
|||
this.tailPages.add(p);
|
||||
this.unreadTailPages.add(p);
|
||||
this.unreadCount += p.unreadCount();
|
||||
this.currentSize += pageIO.getCapacity();
|
||||
|
||||
// for now deactivate all tail pages, we will only reactivate the first one at the end
|
||||
pageIO.deactivate();
|
||||
|
@ -255,6 +264,7 @@ public class Queue implements Closeable {
|
|||
|
||||
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
|
||||
this.unreadCount++;
|
||||
this.currentSize += data.length;
|
||||
|
||||
// if the queue was empty before write, signal non emptiness
|
||||
if (wasEmpty) { notEmpty.signal(); }
|
||||
|
@ -290,7 +300,7 @@ public class Queue implements Closeable {
|
|||
public boolean isFull() {
|
||||
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside syncronized
|
||||
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
|
||||
return (this.maxUnread > 0) ? this.unreadCount >= this.maxUnread : false;
|
||||
return (((this.maxUnread > 0) ? this.unreadCount >= this.maxUnread : false) || (this.currentSize >= this.maxSize));
|
||||
}
|
||||
|
||||
// @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing)
|
||||
|
|
|
@ -12,6 +12,8 @@ public interface Settings {
|
|||
|
||||
Settings setCapacity(int capacity);
|
||||
|
||||
Settings setQueueMaxSizeInBytes(int size);
|
||||
|
||||
Settings setMaxUnread(int maxUnread);
|
||||
|
||||
Settings setCheckpointMaxAcks(int checkpointMaxAcks);
|
||||
|
@ -30,6 +32,8 @@ public interface Settings {
|
|||
|
||||
int getCapacity();
|
||||
|
||||
int getQueueMaxSizeInBytes();
|
||||
|
||||
int getMaxUnread();
|
||||
|
||||
int getCheckpointMaxAcks();
|
||||
|
|
|
@ -390,4 +390,58 @@ public class QueueTest {
|
|||
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void reachMaxSizeTest() throws IOException, InterruptedException, ExecutionException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
Settings settings = TestSettings.getSettings(256, 1000); // allow 10 elements per page but only 1024 bytes in total
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 99; // should be able to write 100 events before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
}
|
||||
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
|
||||
Thread.sleep(1);
|
||||
assertThat(q.isFull(), is(true));
|
||||
// spin wait until data is written and write blocks
|
||||
//while (!q.isFull()) { Thread.sleep(1); }
|
||||
/*
|
||||
|
||||
|
||||
// read one element, which will unblock the last write
|
||||
Batch b = q.nonBlockReadBatch(1);
|
||||
assertThat(b, is(notNullValue()));
|
||||
assertThat(b.getElements().size(), is(equalTo(1)));
|
||||
b.close();
|
||||
|
||||
// future result is the blocked write seqNum for the second element
|
||||
assertThat(future.get(), is(equalTo(2L + i)));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
// all batches are acked, no tail pages should exist
|
||||
assertThat(q.getTailPages().size(), is(equalTo(0)));
|
||||
|
||||
// the last read unblocked the last write so some elements (1 unread and maybe some acked) should be in the head page
|
||||
assertThat(q.getHeadPage().getElementCount() > 0L, is(true));
|
||||
assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L)));
|
||||
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||
*/
|
||||
}
|
||||
|
||||
}
|
|
@ -20,6 +20,19 @@ public class TestSettings {
|
|||
return s;
|
||||
}
|
||||
|
||||
public static Settings getSettings(int capacity, int size) {
|
||||
MemoryCheckpointIO.clearSources();
|
||||
Settings s = new MemorySettings();
|
||||
PageIOFactory pageIOFactory = (pageNum, pageSize, path) -> new ByteBufferPageIO(pageNum, pageSize, path);
|
||||
CheckpointIOFactory checkpointIOFactory = (source) -> new MemoryCheckpointIO(source);
|
||||
s.setCapacity(capacity);
|
||||
s.setQueueMaxSizeInBytes(size);
|
||||
s.setElementIOFactory(pageIOFactory);
|
||||
s.setCheckpointIOFactory(checkpointIOFactory);
|
||||
s.setElementClass(StringElement.class);
|
||||
return s;
|
||||
}
|
||||
|
||||
public static Settings getSettingsCheckpointFilePageMemory(int capacity, String folder) {
|
||||
Settings s = new FileSettings(folder);
|
||||
PageIOFactory pageIOFactory = (pageNum, size, path) -> new ByteBufferPageIO(pageNum, size, path);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue