mirror of
https://github.com/elastic/logstash.git
synced 2025-04-25 07:07:54 -04:00
parent
c9700220bd
commit
66f0b52318
1 changed files with 31 additions and 31 deletions
|
@ -19,6 +19,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.hamcrest.CoreMatchers.equalTo;
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
@ -473,26 +474,24 @@ public class QueueTest {
|
||||||
|
|
||||||
// allow 10 elements per page but only 100 events in total
|
// allow 10 elements per page but only 100 events in total
|
||||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||||
TestQueue q = new TestQueue(settings);
|
try (TestQueue q = new TestQueue(settings)) {
|
||||||
q.open();
|
q.open();
|
||||||
|
// should be able to write 90 events (9 pages) before getting full
|
||||||
int ELEMENT_COUNT = 90; // should be able to write 90 events (9 pages) before getting full
|
final long ELEMENT_COUNT = 90;
|
||||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||||
long seqNum = q.write(element);
|
q.write(element);
|
||||||
}
|
}
|
||||||
|
|
||||||
assertThat(q.isFull(), is(false));
|
assertThat(q.isFull(), is(false));
|
||||||
|
|
||||||
// we expect this next write call to block so let's wrap it in a Future
|
// we expect this next write call to block so let's wrap it in a Future
|
||||||
Callable<Long> write = () -> {
|
Callable<Long> write = () -> q.write(element);
|
||||||
return q.write(element);
|
|
||||||
};
|
|
||||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
|
||||||
Future<Long> future = executor.submit(write);
|
Future<Long> future = executor.submit(write);
|
||||||
assertThat(future.isDone(), is(false));
|
assertThat(future.isDone(), is(false));
|
||||||
|
|
||||||
while (!q.isFull()) { Thread.sleep(10); }
|
while (!q.isFull()) {
|
||||||
|
Thread.sleep(10);
|
||||||
|
}
|
||||||
assertThat(q.isFull(), is(true));
|
assertThat(q.isFull(), is(true));
|
||||||
|
|
||||||
Batch b = q.readBatch(10); // read 1 page (10 events)
|
Batch b = q.readBatch(10); // read 1 page (10 events)
|
||||||
|
@ -501,10 +500,11 @@ public class QueueTest {
|
||||||
while (q.isFull()) { Thread.sleep(10); }
|
while (q.isFull()) { Thread.sleep(10); }
|
||||||
assertThat(q.isFull(), is(false));
|
assertThat(q.isFull(), is(false));
|
||||||
|
|
||||||
// will not complete because write will not unblock until the page is purge with a batch close/acking.
|
assertThat(future.get(), is(ELEMENT_COUNT + 1));
|
||||||
assertThat(future.isDone(), is(false));
|
} finally {
|
||||||
|
executor.shutdownNow();
|
||||||
q.close();
|
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 5000)
|
@Test(timeout = 5000)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue