diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index 3877aae97..860275480 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -563,51 +563,13 @@ public class QueueTest { } @Test - public void queueStableUnderStress() throws Exception { - Settings settings = TestSettings.persistedQueueSettings(1000000, dataPath); - final ExecutorService exec = Executors.newScheduledThreadPool(2); - try (Queue queue = new Queue(settings)) { - final int count = 20_000; - final int concurrent = 2; - queue.open(); - final Future[] futures = new Future[concurrent]; - for (int c = 0; c < concurrent; ++c) { - futures[c] = exec.submit(() -> { - int i = 0; - try { - while (i < count / concurrent) { - final Batch batch = queue.readBatch(1); - for (final Queueable elem : batch.getElements()) { - if (elem != null) { - ++i; - } - } - } - return i; - } catch (final IOException ex) { - throw new IllegalStateException(ex); - } - }); - } - for (int i = 0; i < count; ++i) { - try { - final Queueable evnt = new StringElement("foo"); - queue.write(evnt); - } catch (final IOException ex) { - throw new IllegalStateException(ex); - } - } - assertThat( - Arrays.stream(futures).map(i -> { - try { - return i.get(10L, TimeUnit.SECONDS); - } catch (final InterruptedException | ExecutionException | TimeoutException ex) { - throw new IllegalStateException(ex); - } - }).reduce((x, y) -> x + y).orElse(0), - is(20_000) - ); - } + public void queueStableUnderStressHugeCapacity() throws Exception { + stableUnderStress(100_000); + } + + @Test + public void queueStableUnderStressLowCapacity() throws Exception { + stableUnderStress(50); } @Test @@ -755,4 +717,51 @@ public class QueueTest { assertThat(q.getPersistedByteSize(), is(0L)); } } + + private void stableUnderStress(final int capacity) throws IOException { + Settings settings = TestSettings.persistedQueueSettings(capacity, dataPath); + final ExecutorService exec = Executors.newScheduledThreadPool(2); + try (Queue queue = new Queue(settings)) { + final int count = 20_000; + final int concurrent = 2; + queue.open(); + final Future[] futures = new Future[concurrent]; + for (int c = 0; c < concurrent; ++c) { + futures[c] = exec.submit(() -> { + int i = 0; + try { + while (i < count / concurrent) { + final Batch batch = queue.readBatch(1); + for (final Queueable elem : batch.getElements()) { + if (elem != null) { + ++i; + } + } + } + return i; + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + }); + } + for (int i = 0; i < count; ++i) { + try { + final Queueable evnt = new StringElement("foo"); + queue.write(evnt); + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + } + assertThat( + Arrays.stream(futures).map(i -> { + try { + return i.get(2L, TimeUnit.MINUTES); + } catch (final InterruptedException | ExecutionException | TimeoutException ex) { + throw new IllegalStateException(ex); + } + }).reduce((x, y) -> x + y).orElse(0), + is(20_000) + ); + } + } }