diff --git a/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java b/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java index 04cd4375a42b..ca5704fa9866 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/core/AbstractRefCounted.java @@ -19,6 +19,7 @@ import java.util.Objects; public abstract class AbstractRefCounted implements RefCounted { public static final String ALREADY_CLOSED_MESSAGE = "already closed, can't increment ref count"; + public static final String INVALID_DECREF_MESSAGE = "invalid decRef call: already closed"; private static final VarHandle VH_REFCOUNT_FIELD; @@ -63,7 +64,7 @@ public abstract class AbstractRefCounted implements RefCounted { public final boolean decRef() { touch(); int i = (int) VH_REFCOUNT_FIELD.getAndAdd(this, -1); - assert i > 0 : "invalid decRef call: already closed"; + assert i > 0 : INVALID_DECREF_MESSAGE; if (i == 1) { try { closeInternal(); diff --git a/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java b/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java index 49c030609951..1f725ac48a16 100644 --- a/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java +++ b/libs/core/src/main/java/org/elasticsearch/core/RefCounted.java @@ -62,4 +62,16 @@ public interface RefCounted { * @return whether there are currently any active references to this object. */ boolean hasReferences(); + + /** + * Similar to {@link #incRef()} except that it also asserts that it managed to acquire the ref, for use in situations where it is a bug + * if all refs have been released. + */ + default void mustIncRef() { + if (tryIncRef()) { + return; + } + assert false : AbstractRefCounted.ALREADY_CLOSED_MESSAGE; + incRef(); // throws an ISE + } } diff --git a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java index db8e157d1576..5fedb357fff8 100644 --- a/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java +++ b/modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java @@ -180,15 +180,11 @@ public class GeoIpDownloaderTests extends ESTestCase { public void testIndexChunksNoData() throws IOException { client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var flushResponse = mock(FlushResponse.class); - when(flushResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(flushResponse); + flushResponseActionListener.onResponse(mock(FlushResponse.class)); }); client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var refreshResponse = mock(RefreshResponse.class); - when(refreshResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(refreshResponse); + flushResponseActionListener.onResponse(mock(RefreshResponse.class)); }); InputStream empty = new ByteArrayInputStream(new byte[0]); @@ -198,15 +194,11 @@ public class GeoIpDownloaderTests extends ESTestCase { public void testIndexChunksMd5Mismatch() { client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var flushResponse = mock(FlushResponse.class); - when(flushResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(flushResponse); + flushResponseActionListener.onResponse(mock(FlushResponse.class)); }); client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var refreshResponse = mock(RefreshResponse.class); - when(refreshResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(refreshResponse); + flushResponseActionListener.onResponse(mock(RefreshResponse.class)); }); IOException exception = expectThrows( @@ -238,21 +230,15 @@ public class GeoIpDownloaderTests extends ESTestCase { assertEquals("test", source.get("name")); assertArrayEquals(chunksData[chunk], (byte[]) source.get("data")); assertEquals(chunk + 15, source.get("chunk")); - var indexResponse = mock(IndexResponse.class); - when(indexResponse.hasReferences()).thenReturn(true); - listener.onResponse(indexResponse); + listener.onResponse(mock(IndexResponse.class)); }); client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var flushResponse = mock(FlushResponse.class); - when(flushResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(flushResponse); + flushResponseActionListener.onResponse(mock(FlushResponse.class)); }); client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener flushResponseActionListener) -> { assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices()); - var refreshResponse = mock(RefreshResponse.class); - when(refreshResponse.hasReferences()).thenReturn(true); - flushResponseActionListener.onResponse(refreshResponse); + flushResponseActionListener.onResponse(mock(RefreshResponse.class)); }); InputStream big = new ByteArrayInputStream(bigArray); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java index e33bfbff141b..3a8c35fffafa 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Service.java @@ -135,7 +135,7 @@ class S3Service implements Closeable { return existing; } final AmazonS3Reference clientReference = new AmazonS3Reference(buildClient(clientSettings)); - clientReference.incRef(); + clientReference.mustIncRef(); clientsCache = Maps.copyMapWithAddedEntry(clientsCache, clientSettings, clientReference); return clientReference; } diff --git a/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java b/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java index d05f698749a3..8dcc801f10c3 100644 --- a/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java +++ b/server/src/main/java/org/elasticsearch/action/support/RefCountingRunnable.java @@ -63,7 +63,6 @@ import org.elasticsearch.core.Releasables; public final class RefCountingRunnable implements Releasable { private static final Logger logger = LogManager.getLogger(RefCountingRunnable.class); - static final String ALREADY_CLOSED_MESSAGE = "already closed, cannot acquire or release any further refs"; private final RefCounted refCounted; @@ -86,14 +85,11 @@ public final class RefCountingRunnable implements Releasable { * will be ignored otherwise. This deviates from the contract of {@link java.io.Closeable}. */ public Releasable acquire() { - if (refCounted.tryIncRef()) { - // All refs are considered equal so there's no real need to allocate a new object here, although note that this deviates - // (subtly) from the docs for Closeable#close() which indicate that it should be idempotent. But only if assertions are - // disabled, and if assertions are enabled then we are asserting that we never double-close these things anyway. - return Releasables.assertOnce(this); - } - assert false : ALREADY_CLOSED_MESSAGE; - throw new IllegalStateException(ALREADY_CLOSED_MESSAGE); + refCounted.mustIncRef(); + // All refs are considered equal so there's no real need to allocate a new object here, although note that this deviates (subtly) + // from the docs for Closeable#close() which indicate that it should be idempotent. But only if assertions are disabled, and if + // assertions are enabled then we are asserting that we never double-close these things anyway. + return Releasables.assertOnce(this); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java index 194b4852c16d..19c7561ccdb1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java @@ -228,7 +228,7 @@ public abstract class TransportBroadcastByNodeAction< @Override protected void doExecute(Task task, Request request, ActionListener listener) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - request.incRef(); + request.mustIncRef(); executor.execute(ActionRunnable.wrapReleasing(listener, request::decRef, l -> doExecuteForked(task, request, listener))); } @@ -474,7 +474,7 @@ public abstract class TransportBroadcastByNodeAction< } NodeRequest(Request indicesLevelRequest, List shards, String nodeId) { - indicesLevelRequest.incRef(); + indicesLevelRequest.mustIncRef(); this.indicesLevelRequest = indicesLevelRequest; this.shards = shards; this.nodeId = nodeId; diff --git a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java index 71964d737e8d..b771f6cc512d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/TransportMasterNodeAction.java @@ -169,7 +169,7 @@ public abstract class TransportMasterNodeAction { CachedItem(Key key) { this.key = key; - incRef(); // start with a refcount of 2 so we're not closed while adding the first listener + mustIncRef(); // start with a refcount of 2 so we're not closed while adding the first listener this.future.addListener(new ActionListener<>() { @Override public void onResponse(Value value) { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java index d6ac42a9211c..34236b957dea 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/ThrottledIterator.java @@ -88,7 +88,7 @@ public class ThrottledIterator implements Releasable { } } try (var itemRefs = new ItemRefCounted()) { - itemRefs.incRef(); + itemRefs.mustIncRef(); itemConsumer.accept(Releasables.releaseOnce(itemRefs::decRef), item); } catch (Exception e) { logger.error(Strings.format("exception when processing [%s] with [%s]", item, itemConsumer), e); @@ -108,7 +108,7 @@ public class ThrottledIterator implements Releasable { private boolean isRecursive = true; ItemRefCounted() { - refs.incRef(); + refs.mustIncRef(); } @Override diff --git a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java index 419cbf97dfa0..4d6a66b6ec07 100644 --- a/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java +++ b/server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java @@ -223,7 +223,7 @@ public class ClusterConnectionManager implements ConnectionManager { IOUtils.closeWhileHandlingException(conn); } else { logger.debug("connected to node [{}]", node); - managerRefs.incRef(); + managerRefs.mustIncRef(); try { connectionListener.onNodeConnected(node, conn); } finally { diff --git a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java index 5d47c79abfd6..168621313972 100644 --- a/server/src/main/java/org/elasticsearch/transport/InboundHandler.java +++ b/server/src/main/java/org/elasticsearch/transport/InboundHandler.java @@ -293,7 +293,7 @@ public class InboundHandler { private void handleRequestForking(T request, RequestHandlerRegistry reg, TransportChannel channel) { boolean success = false; - request.incRef(); + request.mustIncRef(); try { reg.getExecutor().execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() { @Override @@ -381,7 +381,7 @@ public class InboundHandler { // no need to provide a buffer release here, we never escape the buffer when handling directly doHandleResponse(handler, remoteAddress, stream, inboundMessage.getHeader(), () -> {}); } else { - inboundMessage.incRef(); + inboundMessage.mustIncRef(); // release buffer once we deserialize the message, but have a fail-safe in #onAfter below in case that didn't work out final Releasable releaseBuffer = Releasables.releaseOnce(inboundMessage::decRef); executor.execute(new ForkingResponseHandlerRunnable(handler, null, threadPool) { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index aa28f8a76b58..ecd4ec6e4fc1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -65,7 +65,7 @@ public final class TransportActionProxy { @Override public void handleResponse(TransportResponse response) { try { - response.incRef(); + response.mustIncRef(); channel.sendResponse(response); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index 7550435ac0bb..8e6c1e67fcd1 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1013,7 +1013,7 @@ public class TransportService extends AbstractLifecycleComponent } } else { boolean success = false; - request.incRef(); + request.mustIncRef(); try { executor.execute(threadPool.getThreadContext().preserveContextWithTracing(new AbstractRunnable() { @Override @@ -1479,7 +1479,7 @@ public class TransportService extends AbstractLifecycleComponent if (executor == EsExecutors.DIRECT_EXECUTOR_SERVICE) { processResponse(handler, response); } else { - response.incRef(); + response.mustIncRef(); executor.execute(new ForkingResponseHandlerRunnable(handler, null, threadPool) { @Override protected void doRun() { diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java index fe1d45e6a500..6e2e984c060f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingListenerTests.java @@ -11,6 +11,7 @@ package org.elasticsearch.action.support; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ReachabilityChecker; @@ -174,10 +175,10 @@ public class RefCountingListenerTests extends ESTestCase { final String expectedMessage; if (randomBoolean()) { throwingRunnable = refs::acquire; - expectedMessage = RefCountingRunnable.ALREADY_CLOSED_MESSAGE; + expectedMessage = AbstractRefCounted.ALREADY_CLOSED_MESSAGE; } else { throwingRunnable = refs::close; - expectedMessage = "invalid decRef call: already closed"; + expectedMessage = AbstractRefCounted.INVALID_DECREF_MESSAGE; } assertEquals(expectedMessage, expectThrows(AssertionError.class, throwingRunnable).getMessage()); diff --git a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java index b5ccc4f50969..1018b073adb9 100644 --- a/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/RefCountingRunnableTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.test.ESTestCase; @@ -166,10 +167,10 @@ public class RefCountingRunnableTests extends ESTestCase { final String expectedMessage; if (randomBoolean()) { throwingRunnable = randomBoolean() ? refs::acquire : refs::acquireListener; - expectedMessage = RefCountingRunnable.ALREADY_CLOSED_MESSAGE; + expectedMessage = AbstractRefCounted.ALREADY_CLOSED_MESSAGE; } else { throwingRunnable = refs::close; - expectedMessage = "invalid decRef call: already closed"; + expectedMessage = AbstractRefCounted.INVALID_DECREF_MESSAGE; } assertEquals(expectedMessage, expectThrows(AssertionError.class, throwingRunnable).getMessage()); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java index 05d6eca0d021..eb85323caf5a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/DisruptableMockTransport.java @@ -150,7 +150,7 @@ public abstract class DisruptableMockTransport extends MockTransport { assert destinationTransport.getLocalNode().equals(getLocalNode()) == false : "non-local message from " + getLocalNode() + " to itself"; - request.incRef(); + request.mustIncRef(); destinationTransport.execute(new RebootSensitiveRunnable() { @Override diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java index 166624a77a35..d0efe69e8ac4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/persistence/BatchedDocumentsIteratorTests.java @@ -140,9 +140,7 @@ public class BatchedDocumentsIteratorTests extends ESTestCase { doAnswer(invocationOnMock -> { ActionListener listener = (ActionListener) invocationOnMock.getArguments()[2]; wasScrollCleared = true; - var clearScrollResponse = mock(ClearScrollResponse.class); - when(clearScrollResponse.hasReferences()).thenReturn(true); - listener.onResponse(clearScrollResponse); + listener.onResponse(mock(ClearScrollResponse.class)); return null; }).when(client).execute(eq(ClearScrollAction.INSTANCE), any(), any()); } @@ -173,7 +171,6 @@ public class BatchedDocumentsIteratorTests extends ESTestCase { protected SearchResponse createSearchResponseWithHits(String... hits) { SearchHits searchHits = createHits(hits); SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.hasReferences()).thenReturn(true); when(searchResponse.getScrollId()).thenReturn(SCROLL_ID); when(searchResponse.getHits()).thenReturn(searchHits); return searchResponse; diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java index 123814ec38c6..53dd31fe4679 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/transport/SecurityServerTransportInterceptor.java @@ -543,7 +543,7 @@ public class SecurityServerTransportInterceptor implements TransportInterceptor AbstractRunnable getReceiveRunnable(T request, TransportChannel channel, Task task) { final Runnable releaseRequest = new RunOnce(request::decRef); - request.incRef(); + request.mustIncRef(); return new AbstractRunnable() { @Override public boolean isForceExecution() { diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java index 623c72ee93c2..87c7c6e9748f 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherServiceTests.java @@ -163,7 +163,6 @@ public class WatcherServiceTests extends ESTestCase { // response setup, successful refresh response RefreshResponse refreshResponse = mock(RefreshResponse.class); - when(refreshResponse.hasReferences()).thenReturn(true); when(refreshResponse.getSuccessfulShards()).thenReturn( clusterState.getMetadata().getIndices().get(Watch.INDEX).getNumberOfShards() ); diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 385ee448e2b4..01547b898e4b 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -210,7 +210,6 @@ public class TriggeredWatchStoreTests extends ESTestCase { SearchResponse searchResponse1 = mock(SearchResponse.class); when(searchResponse1.getSuccessfulShards()).thenReturn(1); when(searchResponse1.getTotalShards()).thenReturn(1); - when(searchResponse1.hasReferences()).thenReturn(true); BytesArray source = new BytesArray("{}"); SearchHit hit = new SearchHit(0, "first_foo"); hit.version(1L); @@ -513,7 +512,6 @@ public class TriggeredWatchStoreTests extends ESTestCase { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); when(refreshResponse.getSuccessfulShards()).thenReturn(successful); - when(refreshResponse.hasReferences()).thenReturn(true); return refreshResponse; } }