From 87223c6701e771bf6325c3e293fd367587c5b559 Mon Sep 17 00:00:00 2001 From: Armin Date: Thu, 8 Jun 2017 10:18:34 +0200 Subject: [PATCH] #7382 Signal Queue not empty condition on every append as is standard BlockingQueue behaviour Fixes #7380 --- .../java/org/logstash/ackedqueue/Queue.java | 8 +-- .../org/logstash/ackedqueue/QueueTest.java | 49 +++++++++++++++++++ 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 00dda4704..19274cb1d 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -324,7 +324,6 @@ public class Queue implements Closeable { lock.lock(); try { - boolean wasEmpty = (firstUnreadPage() == null); // create a new head page if the current does not have sufficient space left for data to be written if (! this.headPage.hasSpace(data.length)) { @@ -356,11 +355,8 @@ public class Queue implements Closeable { long seqNum = nextSeqNum(); this.headPage.write(data, seqNum, this.checkpointMaxWrites); this.unreadCount++; - - // if the queue was empty before write, signal non emptiness - // a simple signal and not signalAll is necessary here since writing a single element - // can only really enable a single thread to read a batch - if (wasEmpty) { notEmpty.signal(); } + + notEmpty.signal(); // now check if we reached a queue full state and block here until it is not full // for the next write or the queue was closed. diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index f6d6fee42..3877aae97 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.After; import org.junit.Before; @@ -561,6 +562,54 @@ public class QueueTest { } } + @Test + public void queueStableUnderStress() throws Exception { + Settings settings = TestSettings.persistedQueueSettings(1000000, dataPath); + final ExecutorService exec = Executors.newScheduledThreadPool(2); + try (Queue queue = new Queue(settings)) { + final int count = 20_000; + final int concurrent = 2; + queue.open(); + final Future[] futures = new Future[concurrent]; + for (int c = 0; c < concurrent; ++c) { + futures[c] = exec.submit(() -> { + int i = 0; + try { + while (i < count / concurrent) { + final Batch batch = queue.readBatch(1); + for (final Queueable elem : batch.getElements()) { + if (elem != null) { + ++i; + } + } + } + return i; + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + }); + } + for (int i = 0; i < count; ++i) { + try { + final Queueable evnt = new StringElement("foo"); + queue.write(evnt); + } catch (final IOException ex) { + throw new IllegalStateException(ex); + } + } + assertThat( + Arrays.stream(futures).map(i -> { + try { + return i.get(10L, TimeUnit.SECONDS); + } catch (final InterruptedException | ExecutionException | TimeoutException ex) { + throw new IllegalStateException(ex); + } + }).reduce((x, y) -> x + y).orElse(0), + is(20_000) + ); + } + } + @Test public void testAckedCount() throws IOException { Settings settings = TestSettings.persistedQueueSettings(100, dataPath);