mirror of
https://github.com/elastic/logstash.git
synced 2025-04-24 06:37:19 -04:00
parent
9fe4e9b3b7
commit
71dc852b9d
1 changed files with 32 additions and 47 deletions
|
@ -14,6 +14,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
@ -28,13 +29,26 @@ import static org.hamcrest.MatcherAssert.assertThat;
|
|||
import static org.junit.Assert.fail;
|
||||
|
||||
public class QueueTest {
|
||||
@Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
private ExecutorService executor;
|
||||
|
||||
private String dataPath;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
dataPath = temporaryFolder.newFolder("data").getPath();
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
executor.shutdownNow();
|
||||
if (!executor.awaitTermination(2L, TimeUnit.MINUTES)) {
|
||||
throw new IllegalStateException("Failed to shut down Executor");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -339,12 +353,7 @@ public class QueueTest {
|
|||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
|
||||
// we expect the next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> {
|
||||
return q.write(element);
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
|
||||
while (!q.isFull()) {
|
||||
// spin wait until data is written and write blocks
|
||||
|
@ -360,8 +369,6 @@ public class QueueTest {
|
|||
// future result is the blocked write seqNum for the second element
|
||||
assertThat(future.get(), is(2L + i));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
// since we did not ack and pages hold a single item
|
||||
|
@ -391,10 +398,7 @@ public class QueueTest {
|
|||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
|
||||
// spin wait until data is written and write blocks
|
||||
while (!q.isFull()) {
|
||||
|
@ -409,8 +413,6 @@ public class QueueTest {
|
|||
// future result is the blocked write seqNum for the second element
|
||||
assertThat(future.get(), is(2L + i));
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
// all batches are acked, no tail pages should exist
|
||||
|
@ -436,22 +438,17 @@ public class QueueTest {
|
|||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
q.write(element);
|
||||
}
|
||||
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
executor.submit(() -> q.write(element));
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
assertThat(q.isFull(), is(true));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -464,7 +461,6 @@ public class QueueTest {
|
|||
|
||||
// allow 10 elements per page but only 100 events in total
|
||||
Settings settings = TestSettings.volatileQueueSettings(singleElementCapacity * 10, singleElementCapacity * 100);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
// should be able to write 90 events (9 pages) before getting full
|
||||
|
@ -475,8 +471,7 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
Future<Long> future = executor.submit(write);
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
assertThat(future.isDone(), is(false));
|
||||
|
||||
while (!q.isFull()) {
|
||||
|
@ -491,9 +486,6 @@ public class QueueTest {
|
|||
assertThat(q.isFull(), is(false));
|
||||
|
||||
assertThat(future.get(), is(ELEMENT_COUNT + 1));
|
||||
} finally {
|
||||
executor.shutdownNow();
|
||||
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -509,8 +501,8 @@ public class QueueTest {
|
|||
q.open();
|
||||
int ELEMENT_COUNT =
|
||||
90; // should be able to write 90 events (9 pages) before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
q.write(element);
|
||||
}
|
||||
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
@ -520,9 +512,7 @@ public class QueueTest {
|
|||
Batch b = q.readBatch(10);
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
Future<Long> future = executor.submit(() -> q.write(element));
|
||||
assertThat(future.isDone(), is(false));
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
|
@ -533,8 +523,6 @@ public class QueueTest {
|
|||
b.close(); // purge 1 page
|
||||
|
||||
assertThat(future.get(), is(ELEMENT_COUNT + 1L));
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -552,16 +540,13 @@ public class QueueTest {
|
|||
|
||||
int ELEMENT_COUNT = 90; // should be able to write 99 events before getting full
|
||||
for (int i = 0; i < ELEMENT_COUNT; i++) {
|
||||
long seqNum = q.write(element);
|
||||
q.write(element);
|
||||
}
|
||||
|
||||
assertThat(q.isFull(), is(false));
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
Callable<Long> write = () -> q.write(element);
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
Future<Long> future = executor.submit(write);
|
||||
executor.submit(() -> q.write(element));
|
||||
while (!q.isFull()) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
|
@ -571,8 +556,6 @@ public class QueueTest {
|
|||
b.close(); // this should not purge a page
|
||||
|
||||
assertThat(q.isFull(), is(true)); // queue should still be full
|
||||
|
||||
executor.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -631,13 +614,15 @@ public class QueueTest {
|
|||
@Test(timeout = 5000)
|
||||
public void concurrentWritesTest() throws IOException, InterruptedException, ExecutionException {
|
||||
|
||||
final int WRITER_COUNT = 5;
|
||||
|
||||
final ExecutorService executorService = Executors.newFixedThreadPool(WRITER_COUNT);
|
||||
// very small pages to maximize page creation
|
||||
Settings settings = TestSettings.volatileQueueSettings(100);
|
||||
try (TestQueue q = new TestQueue(settings)) {
|
||||
q.open();
|
||||
|
||||
int ELEMENT_COUNT = 10000;
|
||||
int WRITER_COUNT = 5;
|
||||
AtomicInteger element_num = new AtomicInteger(0);
|
||||
|
||||
// we expect this next write call to block so let's wrap it in a Future
|
||||
|
@ -650,9 +635,8 @@ public class QueueTest {
|
|||
};
|
||||
|
||||
List<Future<Integer>> futures = new ArrayList<>();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(WRITER_COUNT);
|
||||
for (int i = 0; i < WRITER_COUNT; i++) {
|
||||
futures.add(executor.submit(writer));
|
||||
futures.add(executorService.submit(writer));
|
||||
}
|
||||
|
||||
int BATCH_SIZE = 10;
|
||||
|
@ -671,8 +655,9 @@ public class QueueTest {
|
|||
|
||||
assertThat(q.getTailPages().isEmpty(), is(true));
|
||||
assertThat(q.isFullyAcked(), is(true));
|
||||
|
||||
executor.shutdown();
|
||||
} finally {
|
||||
executorService.shutdownNow();
|
||||
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue