mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 22:57:16 -04:00
parent
9a467776a8
commit
245e206e83
2 changed files with 64 additions and 12 deletions
|
@ -153,6 +153,7 @@ public class Queue implements Closeable {
|
||||||
|
|
||||||
PageIO headPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
PageIO headPageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath);
|
||||||
this.headPage = new HeadPage(headCheckpoint, this, headPageIO);
|
this.headPage = new HeadPage(headCheckpoint, this, headPageIO);
|
||||||
|
this.currentSize += headPageIO.getCapacity();
|
||||||
|
|
||||||
// but checkpoint it to update the firstUnackedPageNum if it changed
|
// but checkpoint it to update the firstUnackedPageNum if it changed
|
||||||
this.headPage.checkpoint();
|
this.headPage.checkpoint();
|
||||||
|
@ -224,7 +225,7 @@ public class Queue implements Closeable {
|
||||||
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
PageIO headPageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath);
|
||||||
this.headPage = new HeadPage(pageNum, this, headPageIO);
|
this.headPage = new HeadPage(pageNum, this, headPageIO);
|
||||||
this.headPage.forceCheckpoint();
|
this.headPage.forceCheckpoint();
|
||||||
|
this.currentSize += headPageIO.getCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
// @param element the Queueable object to write to the queue
|
// @param element the Queueable object to write to the queue
|
||||||
|
@ -264,7 +265,6 @@ public class Queue implements Closeable {
|
||||||
|
|
||||||
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
|
this.headPage.write(data, seqNum, this.checkpointMaxWrites);
|
||||||
this.unreadCount++;
|
this.unreadCount++;
|
||||||
this.currentSize += data.length;
|
|
||||||
|
|
||||||
// if the queue was empty before write, signal non emptiness
|
// if the queue was empty before write, signal non emptiness
|
||||||
if (wasEmpty) { notEmpty.signal(); }
|
if (wasEmpty) { notEmpty.signal(); }
|
||||||
|
@ -300,7 +300,11 @@ public class Queue implements Closeable {
|
||||||
public boolean isFull() {
|
public boolean isFull() {
|
||||||
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside syncronized
|
// TODO: I am not sure if having unreadCount as volatile is sufficient here. all unreadCount updates are done inside syncronized
|
||||||
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
|
// TODO: sections, I believe that to only read the value here, having it as volatile is sufficient?
|
||||||
return (((this.maxUnread > 0) ? this.unreadCount >= this.maxUnread : false) || (this.currentSize >= this.maxSizeInBytes));
|
if ((this.maxSizeInBytes > 0) && this.currentSize >= this.maxSizeInBytes) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return ((this.maxUnread > 0) && this.unreadCount >= this.maxUnread);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing)
|
// @param seqNum the element sequence number upper bound for which persistence should be garanteed (by fsync'ing)
|
||||||
|
|
|
@ -390,16 +390,19 @@ public class QueueTest {
|
||||||
assertThat(q.unreadCount, is(equalTo(1L)));
|
assertThat(q.unreadCount, is(equalTo(1L)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test(timeout = 5000)
|
||||||
public void reachMaxSizeTest() throws IOException, InterruptedException, ExecutionException {
|
public void reachMaxSizeTest() throws IOException, InterruptedException, ExecutionException {
|
||||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||||
|
|
||||||
Settings settings = TestSettings.getSettings(256, 1000); // allow 10 elements per page but only 1024 bytes in total
|
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||||
|
|
||||||
|
// allow 10 elements per page but only 100 events in total
|
||||||
|
Settings settings = TestSettings.getSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||||
|
|
||||||
TestQueue q = new TestQueue(settings);
|
TestQueue q = new TestQueue(settings);
|
||||||
q.open();
|
q.open();
|
||||||
|
|
||||||
int ELEMENT_COUNT = 99; // should be able to write 99 events before getting full
|
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||||
long seqNum = q.write(element);
|
long seqNum = q.write(element);
|
||||||
}
|
}
|
||||||
|
@ -414,7 +417,8 @@ public class QueueTest {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
Future<Long> future = executor.submit(write);
|
Future<Long> future = executor.submit(write);
|
||||||
|
|
||||||
Thread.sleep(1);
|
while (!q.isFull()) { Thread.sleep(1); }
|
||||||
|
|
||||||
assertThat(q.isFull(), is(true));
|
assertThat(q.isFull(), is(true));
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
|
@ -425,12 +429,15 @@ public class QueueTest {
|
||||||
|
|
||||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||||
|
|
||||||
Settings settings = TestSettings.getSettings(256, 1000); // allow 10 elements per page but only 1024 bytes in total
|
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||||
|
|
||||||
|
// allow 10 elements per page but only 100 events in total
|
||||||
|
Settings settings = TestSettings.getSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||||
|
|
||||||
TestQueue q = new TestQueue(settings);
|
TestQueue q = new TestQueue(settings);
|
||||||
q.open();
|
q.open();
|
||||||
|
|
||||||
int ELEMENT_COUNT = 99; // should be able to write 99 events before getting full
|
int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full
|
||||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||||
long seqNum = q.write(element);
|
long seqNum = q.write(element);
|
||||||
}
|
}
|
||||||
|
@ -445,18 +452,59 @@ public class QueueTest {
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
Future<Long> future = executor.submit(write);
|
Future<Long> future = executor.submit(write);
|
||||||
|
|
||||||
Thread.sleep(1);
|
while (!q.isFull()) { Thread.sleep(1); }
|
||||||
|
|
||||||
assertThat(q.isFull(), is(true));
|
assertThat(q.isFull(), is(true));
|
||||||
|
|
||||||
Batch b = q.readBatch(99);
|
Batch b = q.readBatch(9); // read 1 page (10 events)
|
||||||
|
b.close(); // purge 1 page
|
||||||
|
|
||||||
// spin wait until data is written and write blocks
|
// spin wait until data is written and write blocks
|
||||||
while (!q.isFull()) { Thread.sleep(1); }
|
while (q.isFull()) { Thread.sleep(1); }
|
||||||
|
|
||||||
assertThat(q.isFull(), is(false));
|
assertThat(q.isFull(), is(false));
|
||||||
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 5000)
|
||||||
|
public void queueStillFullAfterPartialPageAckTest() throws IOException, InterruptedException, ExecutionException {
|
||||||
|
|
||||||
|
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||||
|
|
||||||
|
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||||
|
|
||||||
|
// allow 10 elements per page but only 100 events in total
|
||||||
|
Settings settings = TestSettings.getSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||||
|
|
||||||
|
TestQueue q = new TestQueue(settings);
|
||||||
|
q.open();
|
||||||
|
|
||||||
|
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||||
|
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||||
|
long seqNum = q.write(element);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(q.isFull(), is(false));
|
||||||
|
|
||||||
|
// we expect this next write call to block so let's wrap it in a Future
|
||||||
|
Callable<Long> write = () -> {
|
||||||
|
return q.write(element);
|
||||||
|
};
|
||||||
|
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
|
Future<Long> future = executor.submit(write);
|
||||||
|
|
||||||
|
while (!q.isFull()) { Thread.sleep(1); }
|
||||||
|
|
||||||
|
assertThat(q.isFull(), is(true));
|
||||||
|
|
||||||
|
Batch b = q.readBatch(9); // read 90% of page (9 events)
|
||||||
|
b.close(); // this should not purge a page
|
||||||
|
|
||||||
|
assertThat(q.isFull(), is(true)); // queue should still be full
|
||||||
|
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Add table
Add a link
Reference in a new issue