diff --git a/server/src/main/java/org/elasticsearch/action/support/RefCountingListener.java b/server/src/main/java/org/elasticsearch/action/support/RefCountingListener.java new file mode 100644 index 000000000000..7fe6e7df1b40 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/RefCountingListener.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.core.Releasable; + +import java.util.Objects; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A mechanism to complete a listener on the completion of some (dynamic) collection of other actions. Basic usage is as follows: + * + *
+ * try (var refs = new RefCountingListener(finalListener)) {
+ *     for (var item : collection) {
+ *         runAsyncAction(item, refs.acquire()); // completes the acquired listener on completion
+ *     }
+ * }
+ * 
+ * + * The delegate listener is completed when execution leaves the try-with-resources block and every acquired reference is released. The + * {@link RefCountingListener} collects (a bounded number of) exceptions received by its subsidiary listeners, and completes the delegate + * listener with an exception if (and only if) any subsidiary listener fails. However, unlike a {@link GroupedActionListener} it leaves it + * to the caller to collect the results of successful completions by accumulating them in a data structure of its choice. Also unlike a + * {@link GroupedActionListener} there is no need to declare the number of subsidiary listeners up front: listeners can be acquired + * dynamically as needed. Finally, you can continue to acquire additional listeners even outside the try-with-resources block, perhaps in a + * separate thread, as long as there's at least one listener outstanding: + * + *
+ * try (var refs = new RefCountingListener(finalListener)) {
+ *     for (var item : collection) {
+ *         if (condition(item)) {
+ *             runAsyncAction(item, refs.acquire().map(results::add));
+ *         }
+ *     }
+ *     if (flag) {
+ *         runOneOffAsyncAction(refs.acquire().map(results::add));
+ *         return;
+ *     }
+ *     for (var item : otherCollection) {
+ *         var itemRef = refs.acquire(); // delays completion while the background action is pending
+ *         executorService.execute(() -> {
+ *             try {
+ *                 if (condition(item)) {
+ *                     runOtherAsyncAction(item, refs.acquire().map(results::add));
+ *                 }
+ *             } finally {
+ *                 itemRef.onResponse(null);
+ *             }
+ *         });
+ *     }
+ * }
+ * 
+ * + * In particular (and also unlike a {@link GroupedActionListener}) this works even if you don't acquire any extra refs at all: in that case, + * the delegate listener is completed at the end of the try-with-resources block. + */ +public final class RefCountingListener implements Releasable { + + private final ActionListener delegate; + private final RefCountingRunnable refs = new RefCountingRunnable(this::finish); + + private final AtomicReference exceptionRef = new AtomicReference<>(); + private final Semaphore exceptionPermits; + private final AtomicInteger droppedExceptionsRef = new AtomicInteger(); + + /** + * Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released. + * @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all + * the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed + * with failure then the delegate is completed with the first exception received, with other exceptions added to its + * collection of suppressed exceptions. + */ + public RefCountingListener(ActionListener delegate) { + this(10, delegate); + } + + /** + * Construct a {@link RefCountingListener} which completes {@code delegate} when all refs are released. + * @param delegate The listener to complete when all refs are released. This listener must not throw any exception on completion. If all + * the acquired listeners completed successfully then so is the delegate. If any of the acquired listeners completed + * with failure then the delegate is completed with the first exception received, with other exceptions added to its + * collection of suppressed exceptions. + * @param maxExceptions The maximum number of exceptions to accumulate on failure. + */ + public RefCountingListener(int maxExceptions, ActionListener delegate) { + if (maxExceptions <= 0) { + assert false : maxExceptions; + throw new IllegalArgumentException("maxExceptions must be positive"); + } + this.delegate = Objects.requireNonNull(delegate); + this.exceptionPermits = new Semaphore(maxExceptions); + } + + /** + * Release the original reference to this object, which commpletes the delegate {@link ActionListener} if there are no other references. + * + * It is invalid to call this method more than once. Doing so will trip an assertion if assertions are enabled, but will be ignored + * otherwise. This deviates from the contract of {@link java.io.Closeable}. + */ + @Override + public void close() { + refs.close(); + } + + private void finish() { + try { + var exception = exceptionRef.get(); + if (exception == null) { + delegate.onResponse(null); + } else { + final var droppedExceptions = droppedExceptionsRef.getAndSet(0); + if (droppedExceptions > 0) { + exception.addSuppressed(new ElasticsearchException(droppedExceptions + " further exceptions were dropped")); + } + delegate.onFailure(exception); + } + } catch (Exception e) { + assert false : e; + throw e; + } + } + + /** + * Acquire a reference to this object and return a listener which releases it. The delegate {@link ActionListener} is called when all + * its references have been released. + * + * It is invalid to call this method once all references are released. Doing so will trip an assertion if assertions are enabled, and + * will throw an {@link IllegalStateException} otherwise. + * + * It is also invalid to complete the returned listener more than once. Doing so will trip an assertion if assertions are enabled, but + * will be ignored otherwise. + */ + public ActionListener acquire() { + return new ActionListener<>() { + private final Releasable ref = refs.acquire(); + + @Override + public void onResponse(T unused) { + ref.close(); + } + + @Override + public void onFailure(Exception e) { + if (exceptionPermits.tryAcquire()) { + final var firstException = exceptionRef.compareAndExchange(null, e); + if (firstException != null && firstException != e) { + firstException.addSuppressed(e); + } + } else { + droppedExceptionsRef.incrementAndGet(); + } + ref.close(); + } + + @Override + public String toString() { + return RefCountingListener.this.toString(); + } + }; + } + + @Override + public String toString() { + return "refCounting[" + delegate + "]"; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java b/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java index 0fea6dbeb1ad..c3a5bb398972 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java +++ b/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java @@ -32,7 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * The delegate action is completed when execution leaves the try-with-resources block and every acquired reference is released. Unlike a * {@link CountDown} there is no need to declare the number of subsidiary actions up front (refs can be acquired dynamically as needed) nor * does the caller need to check for completion each time a reference is released. Moreover even outside the try-with-resources block you - * can continue to acquire additional listeners, even in a separate thread, as long as there's at least one listener outstanding: + * can continue to acquire additional references, even in a separate thread, as long as there's at least one reference outstanding: * *
  * try (var refs = new RefCountingRunnable(finalRunnable)) {
@@ -95,7 +95,11 @@ public final class RefCountingRunnable implements Releasable {
      * Acquire a reference to this object and return an action which releases it. The delegate {@link Runnable} is called when all its
      * references have been released.
      *
-     * Callers must take care to close the returned resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
+     * It is invalid to call this method once all references are released. Doing so will trip an assertion if assertions are enabled, and
+     * will throw an {@link IllegalStateException} otherwise.
+     *
+     * It is also invalid to release the acquired resource more than once. Doing so will trip an assertion if assertions are enabled, but
+     * will be ignored otherwise. This deviates from the contract of {@link java.io.Closeable}.
      */
     public Releasable acquire() {
         if (refCounted.tryIncRef()) {
@@ -116,7 +120,8 @@ public final class RefCountingRunnable implements Releasable {
     /**
      * Release the original reference to this object, which executes the delegate {@link Runnable} if there are no other references.
      *
-     * Callers must take care to close this resource exactly once. This deviates from the contract of {@link java.io.Closeable}.
+     * It is invalid to call this method more than once. Doing so will trip an assertion if assertions are enabled, but will be ignored
+     * otherwise. This deviates from the contract of {@link java.io.Closeable}.
      */
     @Override
     public void close() {
diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
index 8ba8f3acde60..9dea01238a02 100644
--- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
+++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java
@@ -27,10 +27,10 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.SingleResultDeduplicator;
 import org.elasticsearch.action.StepListener;
-import org.elasticsearch.action.support.CountDownActionListener;
 import org.elasticsearch.action.support.GroupedActionListener;
 import org.elasticsearch.action.support.ListenableActionFuture;
 import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.action.support.RefCountingRunnable;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.cluster.ClusterState;
@@ -1422,7 +1422,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                 indexMetaIdentifiers = null;
             }
 
-            final ActionListener allMetaListener = new CountDownActionListener(2 + indices.size(), ActionListener.wrap(v -> {
+            try (var allMetaListeners = new RefCountingListener(ActionListener.wrap(v -> {
                 final String slmPolicy = slmPolicy(snapshotInfo);
                 final SnapshotDetails snapshotDetails = new SnapshotDetails(
                     snapshotInfo.state(),
@@ -1445,52 +1445,53 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
                         }
                     }, onUpdateFailure)
                 );
-            }, onUpdateFailure));
+            }, onUpdateFailure))) {
 
-            // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
-            // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
-            // index or global metadata will be compatible with the segments written in this snapshot as well.
-            // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
-            // that decrements the generation it points at
-            final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
-            // Write Global MetaData
-            executor.execute(
-                ActionRunnable.run(
-                    allMetaListener,
-                    () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
-                )
-            );
+                // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method
+                // will mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of
+                // the index or global metadata will be compatible with the segments written in this snapshot as well.
+                // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
+                // that decrements the generation it points at
+                final Metadata clusterMetadata = finalizeSnapshotContext.clusterMetadata();
+                // Write Global MetaData
+                executor.execute(
+                    ActionRunnable.run(
+                        allMetaListeners.acquire(),
+                        () -> GLOBAL_METADATA_FORMAT.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), compress)
+                    )
+                );
 
-            // write the index metadata for each index in the snapshot
-            for (IndexId index : indices) {
-                executor.execute(ActionRunnable.run(allMetaListener, () -> {
-                    final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
-                    if (writeIndexGens) {
-                        final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
-                        String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
-                        if (metaUUID == null) {
-                            // We don't yet have this version of the metadata so we write it
-                            metaUUID = UUIDs.base64UUID();
-                            INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
-                            indexMetaIdentifiers.put(identifiers, metaUUID);
+                // write the index metadata for each index in the snapshot
+                for (IndexId index : indices) {
+                    executor.execute(ActionRunnable.run(allMetaListeners.acquire(), () -> {
+                        final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
+                        if (writeIndexGens) {
+                            final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
+                            String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
+                            if (metaUUID == null) {
+                                // We don't yet have this version of the metadata so we write it
+                                metaUUID = UUIDs.base64UUID();
+                                INDEX_METADATA_FORMAT.write(indexMetaData, indexContainer(index), metaUUID, compress);
+                                indexMetaIdentifiers.put(identifiers, metaUUID);
+                            }
+                            indexMetas.put(index, identifiers);
+                        } else {
+                            INDEX_METADATA_FORMAT.write(
+                                clusterMetadata.index(index.getName()),
+                                indexContainer(index),
+                                snapshotId.getUUID(),
+                                compress
+                            );
                         }
-                        indexMetas.put(index, identifiers);
-                    } else {
-                        INDEX_METADATA_FORMAT.write(
-                            clusterMetadata.index(index.getName()),
-                            indexContainer(index),
-                            snapshotId.getUUID(),
-                            compress
-                        );
-                    }
-                }));
+                    }));
+                }
+                executor.execute(
+                    ActionRunnable.run(
+                        allMetaListeners.acquire(),
+                        () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
+                    )
+                );
             }
-            executor.execute(
-                ActionRunnable.run(
-                    allMetaListener,
-                    () -> SNAPSHOT_FORMAT.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), compress)
-                )
-            );
         }, onUpdateFailure);
     }
 
diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java
new file mode 100644
index 000000000000..6b899748438a
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.support;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.elasticsearch.common.util.concurrent.EsExecutors.DIRECT_EXECUTOR_SERVICE;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+
+public class RefCountingListenerTests extends ESTestCase {
+
+    public void testBasicOperation() throws InterruptedException {
+        final var executed = new AtomicBoolean();
+        final var exceptionCount = new AtomicInteger();
+        final var threads = new Thread[between(0, 3)];
+        final var exceptionLimit = Math.max(1, between(0, threads.length));
+
+        boolean async = false;
+        final var startLatch = new CountDownLatch(1);
+
+        try (var refs = new RefCountingListener(exceptionLimit, new ActionListener<>() {
+            @Override
+            public void onResponse(Void unused) {
+                assertTrue(executed.compareAndSet(false, true));
+                assertEquals(0, exceptionCount.get());
+            }
+
+            @Override
+            public void onFailure(Exception e) {
+                assertTrue(executed.compareAndSet(false, true));
+                assertThat(exceptionCount.get(), greaterThan(0));
+                Throwable[] suppressed = e.getSuppressed();
+                if (exceptionCount.get() > exceptionLimit) {
+                    assertEquals(exceptionLimit, suppressed.length);
+                    for (int i = 0; i < suppressed.length; i++) {
+                        Throwable throwable = suppressed[i];
+                        if (i == suppressed.length - 1) {
+                            assertThat(
+                                throwable.getMessage(),
+                                equalTo((exceptionCount.get() - exceptionLimit) + " further exceptions were dropped")
+                            );
+                        } else {
+                            assertThat(throwable.getMessage(), equalTo("simulated"));
+                        }
+                    }
+                } else {
+                    assertEquals(exceptionCount.get() - 1, suppressed.length);
+                    for (Throwable throwable : suppressed) {
+                        assertThat(throwable.getMessage(), equalTo("simulated"));
+                    }
+                }
+            }
+
+            @Override
+            public String toString() {
+                return "test listener";
+            }
+        })) {
+            assertEquals("refCounting[test listener]", refs.toString());
+            var listener = refs.acquire();
+            assertThat(listener.toString(), containsString("refCounting[test listener]"));
+            listener.onResponse(null);
+
+            for (int i = 0; i < threads.length; i++) {
+                if (randomBoolean()) {
+                    async = true;
+                    var ref = refs.acquire();
+                    threads[i] = new Thread(() -> {
+                        try {
+                            assertTrue(startLatch.await(10, TimeUnit.SECONDS));
+                        } catch (InterruptedException e) {
+                            throw new AssertionError(e);
+                        }
+                        assertFalse(executed.get());
+                        if (randomBoolean()) {
+                            ref.onResponse(null);
+                        } else {
+                            exceptionCount.incrementAndGet();
+                            ref.onFailure(new ElasticsearchException("simulated"));
+                        }
+                    });
+                }
+            }
+
+            assertFalse(executed.get());
+        }
+
+        assertNotEquals(async, executed.get());
+
+        for (Thread thread : threads) {
+            if (thread != null) {
+                thread.start();
+            }
+        }
+
+        startLatch.countDown();
+
+        for (Thread thread : threads) {
+            if (thread != null) {
+                thread.join();
+            }
+        }
+
+        assertTrue(executed.get());
+    }
+
+    @SuppressWarnings("resource")
+    public void testNullCheck() {
+        expectThrows(NullPointerException.class, () -> new RefCountingListener(between(1, 10), null));
+    }
+
+    public void testValidation() {
+        final var callCount = new AtomicInteger();
+        final var refs = new RefCountingListener(Integer.MAX_VALUE, ActionListener.wrap(callCount::incrementAndGet));
+        refs.close();
+        assertEquals(1, callCount.get());
+
+        for (int i = between(1, 5); i > 0; i--) {
+            final ThrowingRunnable throwingRunnable;
+            final String expectedMessage;
+            if (randomBoolean()) {
+                throwingRunnable = refs::acquire;
+                expectedMessage = RefCountingRunnable.ALREADY_CLOSED_MESSAGE;
+            } else {
+                throwingRunnable = refs::close;
+                expectedMessage = "already closed";
+            }
+
+            assertEquals(expectedMessage, expectThrows(AssertionError.class, throwingRunnable).getMessage());
+            assertEquals(1, callCount.get());
+        }
+    }
+
+    public void testJavaDocExample() {
+        final var flag = new AtomicBoolean();
+        runExample(ActionListener.wrap(() -> assertTrue(flag.compareAndSet(false, true))));
+        assertTrue(flag.get());
+    }
+
+    private void runExample(ActionListener finalListener) {
+        final var collection = randomList(10, Object::new);
+        final var otherCollection = randomList(10, Object::new);
+        final var flag = randomBoolean();
+        @SuppressWarnings("UnnecessaryLocalVariable")
+        final var executorService = DIRECT_EXECUTOR_SERVICE;
+        final var results = new ArrayList<>();
+
+        try (var refs = new RefCountingListener(finalListener)) {
+            for (var item : collection) {
+                if (condition(item)) {
+                    runAsyncAction(item, refs.acquire().map(results::add));
+                }
+            }
+            if (flag) {
+                runOneOffAsyncAction(refs.acquire().map(results::add));
+                return;
+            }
+            for (var item : otherCollection) {
+                var itemRef = refs.acquire(); // delays completion while the background action is pending
+                executorService.execute(() -> {
+                    try {
+                        if (condition(item)) {
+                            runOtherAsyncAction(item, refs.acquire().map(results::add));
+                        }
+                    } finally {
+                        itemRef.onResponse(null);
+                    }
+                });
+            }
+        }
+    }
+
+    @SuppressWarnings("unused")
+    private boolean condition(Object item) {
+        return randomBoolean();
+    }
+
+    @SuppressWarnings("unused")
+    private void runAsyncAction(Object item, ActionListener listener) {
+        listener.onResponse(null);
+    }
+
+    @SuppressWarnings("unused")
+    private void runOtherAsyncAction(Object item, ActionListener listener) {
+        listener.onResponse(null);
+    }
+
+    private void runOneOffAsyncAction(ActionListener listener) {
+        listener.onResponse(null);
+    }
+}
diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java
index 8acabd1a8c20..73d2ed5efd41 100644
--- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java
+++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/common/SparseFileTracker.java
@@ -9,7 +9,7 @@ package org.elasticsearch.blobcache.common;
 
 import org.elasticsearch.Assertions;
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.GroupedActionListener;
+import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.core.Nullable;
 
 import java.util.ArrayList;
@@ -243,30 +243,7 @@ public class SparseFileTracker {
                     .collect(Collectors.toList());
         }
 
-        // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
-        // there is no risk of concurrent modification.
-
-        switch (requiredRanges.size()) {
-            case 0 ->
-                // no need to wait for the gaps to be filled, the listener can be executed immediately
-                wrappedListener.onResponse(null);
-            case 1 -> {
-                final Range requiredRange = requiredRanges.get(0);
-                requiredRange.completionListener.addListener(
-                    wrappedListener.map(progress -> null),
-                    Math.min(requiredRange.completionListener.end, subRange.end())
-                );
-            }
-            default -> {
-                final GroupedActionListener groupedActionListener = new GroupedActionListener<>(
-                    requiredRanges.size(),
-                    wrappedListener.map(progress -> null)
-                );
-                requiredRanges.forEach(
-                    r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, subRange.end()))
-                );
-            }
-        }
+        subscribeToCompletionListeners(requiredRanges, subRange.end(), wrappedListener);
 
         return Collections.unmodifiableList(gaps);
     }
@@ -332,31 +309,32 @@ public class SparseFileTracker {
             assert invariant();
         }
 
+        subscribeToCompletionListeners(pendingRanges, range.end(), wrappedListener);
+        return true;
+    }
+
+    private void subscribeToCompletionListeners(List requiredRanges, long rangeEnd, ActionListener listener) {
         // NB we work with ranges outside the mutex here, but only to interact with their completion listeners which are `final` so
         // there is no risk of concurrent modification.
-
-        switch (pendingRanges.size()) {
-            case 0 -> wrappedListener.onResponse(null);
+        switch (requiredRanges.size()) {
+            case 0 ->
+                // no need to wait for the gaps to be filled, the listener can be executed immediately
+                listener.onResponse(null);
             case 1 -> {
-                final Range pendingRange = pendingRanges.get(0);
-                pendingRange.completionListener.addListener(
-                    wrappedListener.map(progress -> null),
-                    Math.min(pendingRange.completionListener.end, range.end())
+                final Range requiredRange = requiredRanges.get(0);
+                requiredRange.completionListener.addListener(
+                    listener.map(progress -> null),
+                    Math.min(requiredRange.completionListener.end, rangeEnd)
                 );
-                return true;
             }
             default -> {
-                final GroupedActionListener groupedActionListener = new GroupedActionListener<>(
-                    pendingRanges.size(),
-                    wrappedListener.map(progress -> null)
-                );
-                pendingRanges.forEach(
-                    r -> r.completionListener.addListener(groupedActionListener, Math.min(r.completionListener.end, range.end()))
-                );
-                return true;
+                try (var listeners = new RefCountingListener(listener)) {
+                    for (Range range : requiredRanges) {
+                        range.completionListener.addListener(listeners.acquire(), Math.min(range.completionListener.end, rangeEnd));
+                    }
+                }
             }
         }
-        return true;
     }
 
     private ActionListener wrapWithAssertions(ActionListener listener) {