mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 14:47:19 -04:00
parent
584a3860a0
commit
32777c682b
2 changed files with 52 additions and 7 deletions
|
@ -45,7 +45,6 @@ public class Queue implements Closeable {
|
|||
protected final List<TailPage> unreadTailPages;
|
||||
|
||||
protected volatile long unreadCount;
|
||||
|
||||
protected volatile long currentByteSize;
|
||||
|
||||
private final CheckpointIO checkpointIO;
|
||||
|
@ -331,7 +330,7 @@ public class Queue implements Closeable {
|
|||
this.unreadCount++;
|
||||
|
||||
// if the queue was empty before write, signal non emptiness
|
||||
if (wasEmpty) { notEmpty.signal(); }
|
||||
if (wasEmpty) { notEmpty.signalAll(); }
|
||||
|
||||
// now check if we reached a queue full state and block here until it is not full
|
||||
// for the next write or the queue was closed.
|
||||
|
@ -481,7 +480,7 @@ public class Queue implements Closeable {
|
|||
this.unreadCount -= b.size();
|
||||
|
||||
if (p.isFullyRead()) { removeUnreadPage(p); }
|
||||
if (wasFull) { notFull.signal(); }
|
||||
if (wasFull) { notFull.signalAll(); }
|
||||
|
||||
return b;
|
||||
}
|
||||
|
@ -565,6 +564,8 @@ public class Queue implements Closeable {
|
|||
|
||||
// cleanup fully acked tail page
|
||||
if (result.page.isFullyAcked()) {
|
||||
boolean wasFull = isFull();
|
||||
|
||||
this.tailPages.remove(result.index);
|
||||
|
||||
// remove page data file regardless if it is the first or a middle tail page to free resources
|
||||
|
@ -583,6 +584,8 @@ public class Queue implements Closeable {
|
|||
this.tailPages.remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (wasFull) { notFull.signalAll(); }
|
||||
}
|
||||
|
||||
this.headPage.checkpoint();
|
||||
|
|
|
@ -464,7 +464,7 @@ public class QueueTest {
|
|||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException {
|
||||
public void ackingMakesQueueNotFullAgainTest() throws IOException, InterruptedException, ExecutionException {
|
||||
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
|
@ -487,22 +487,64 @@ public class QueueTest {
|
|||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
while (!q.isFull()) { Thread.sleep(10); }
|
||||
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
Batch b = q.readBatch(10); // read 1 page (10 events)
|
||||
b.close(); // purge 1 page
|
||||
|
||||
// spin wait until data is written and write blocks
|
||||
while (q.isFull()) { Thread.sleep(10); }
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// will not complete because write will not unblock until the page is purge with a batch close/acking.
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
q.close();
|
||||
}
|
||||
|
||||
@Test(timeout = 5000)
|
||||
public void resumeWriteOnNoLongerFullQueueTest() throws IOException, InterruptedException, ExecutionException {
|
||||
Queueable element = new StringElement("0123456789"); // 10 bytes
|
||||
|
||||
int singleElementCapacity = ByteBufferPageIO.HEADER_SIZE + ByteBufferPageIO._persistedByteCount(element.serialize().length);
|
||||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
|
||||
TestQueue q = new TestQueue(settings);
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
}
|
||||
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// read 1 page (10 events) here while not full yet so that the read will not singal the not full state
|
||||
// we want the batch closing below to signal the not full state
|
||||
Batch b = q.readBatch(10);
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
while (!q.isFull()) { Thread.sleep(10); }
|
||||
assertThat(q.isFull(), is(true));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
b.close(); // purge 1 page
|
||||
|
||||
assertThat(future.get(), is(equalTo(ELEMENT_COUNT + 1L)));
|
||||
|
||||
executor.shutdown();
|
||||
q.close();
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue