Anonymize AbstractRefCounted (#77208)

Today `AbstractRefCounted` has a `name` field which is only used to
construct the exception message when calling `incRef()` after it's been
closed. This isn't really necessary, the stack trace will identify the
reference in question and give loads more useful detail besides. It's
also slightly irksome to have to name every single implementation.

This commit drops the name and the constructor parameter, and also
introduces a handy factory method for use when there's no extra state
needed and you just want to run a method or lambda when all references
are released.
This commit is contained in:
David Turner 2021-09-03 07:59:44 +01:00
parent a479181143
commit 7c513a75c0
22 changed files with 89 additions and 132 deletions

View file

@ -11,17 +11,13 @@ package org.elasticsearch.core;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A basic RefCounted implementation that is initialized with a
* ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count
* A basic {@link RefCounted} implementation that is initialized with a ref count of 1 and calls {@link #closeInternal()} once it reaches
* a 0 ref count.
*/
public abstract class AbstractRefCounted implements RefCounted {
private final AtomicInteger refCount = new AtomicInteger(1);
private final String name;
public static final String ALREADY_CLOSED_MESSAGE = "already closed, can't increment ref count";
public AbstractRefCounted(String name) {
this.name = name;
}
private final AtomicInteger refCount = new AtomicInteger(1);
@Override
public final void incRef() {
@ -63,14 +59,16 @@ public abstract class AbstractRefCounted implements RefCounted {
}
/**
* Called whenever the ref count is incremented or decremented. Can be implemented by implementations to a record of access to the
* instance for debugging purposes.
* Called whenever the ref count is incremented or decremented. Can be overridden to record access to the instance for debugging
* purposes.
*/
protected void touch() {
}
protected void alreadyClosed() {
throw new IllegalStateException(name + " is already closed can't increment refCount current count [" + refCount.get() + "]");
final int currentRefCount = refCount.get();
assert currentRefCount == 0 : currentRefCount;
throw new IllegalStateException(ALREADY_CLOSED_MESSAGE);
}
/**
@ -80,15 +78,21 @@ public abstract class AbstractRefCounted implements RefCounted {
return this.refCount.get();
}
/** gets the name of this instance */
public String getName() {
return name;
}
/**
* Method that is invoked once the reference count reaches zero.
* Implementations of this method must handle all exceptions and may not throw any exceptions.
*/
protected abstract void closeInternal();
/**
* Construct an {@link AbstractRefCounted} which runs the given {@link Runnable} when all references are released.
*/
public static AbstractRefCounted of(Runnable onClose) {
return new AbstractRefCounted() {
@Override
protected void closeInternal() {
onClose.run();
}
};
}
}

View file

@ -11,7 +11,6 @@ import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.test.ESTestCase;
import org.hamcrest.Matchers;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -20,7 +19,8 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class RefCountedTests extends ESTestCase {
public void testRefCount() throws IOException {
public void testRefCount() {
MyRefCounted counted = new MyRefCounted();
int incs = randomIntBetween(1, 100);
@ -56,12 +56,9 @@ public class RefCountedTests extends ESTestCase {
counted.decRef();
assertFalse(counted.tryIncRef());
try {
counted.incRef();
fail(" expected exception");
} catch (IllegalStateException ex) {
assertThat(ex.getMessage(), equalTo("test is already closed can't increment refCount current count [0]"));
}
assertThat(
expectThrows(IllegalStateException.class, counted::incRef).getMessage(),
equalTo(AbstractRefCounted.ALREADY_CLOSED_MESSAGE));
try {
counted.ensureOpen();
@ -77,9 +74,7 @@ public class RefCountedTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1);
final CopyOnWriteArrayList<Exception> exceptions = new CopyOnWriteArrayList<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread() {
@Override
public void run() {
threads[i] = new Thread(() -> {
try {
latch.await();
for (int j = 0; j < 10000; j++) {
@ -93,13 +88,12 @@ public class RefCountedTests extends ESTestCase {
} catch (Exception e) {
exceptions.add(e);
}
}
};
});
threads[i].start();
}
latch.countDown();
for (int i = 0; i < threads.length; i++) {
threads[i].join();
for (Thread thread : threads) {
thread.join();
}
counted.decRef();
try {
@ -110,17 +104,12 @@ public class RefCountedTests extends ESTestCase {
}
assertThat(counted.refCount(), is(0));
assertThat(exceptions, Matchers.emptyIterable());
}
private final class MyRefCounted extends AbstractRefCounted {
private static final class MyRefCounted extends AbstractRefCounted {
private final AtomicBoolean closed = new AtomicBoolean(false);
MyRefCounted() {
super("test");
}
@Override
protected void closeInternal() {
this.closed.set(true);

View file

@ -65,7 +65,6 @@ public class Page implements Releasable {
private final Releasable closeable;
private RefCountedCloseable(Releasable closeable) {
super("byte array page");
this.closeable = closeable;
}

View file

@ -85,11 +85,9 @@ public final class SharedGroupFactory {
private static class RefCountedGroup extends AbstractRefCounted {
public static final String NAME = "ref-counted-event-loop-group";
private final EventLoopGroup eventLoopGroup;
private RefCountedGroup(EventLoopGroup eventLoopGroup) {
super(NAME);
this.eventLoopGroup = eventLoopGroup;
}

View file

@ -22,7 +22,6 @@ public class AmazonEc2Reference extends AbstractRefCounted implements Releasable
private final AmazonEC2 client;
AmazonEc2Reference(AmazonEC2 client) {
super("AWS_EC2_CLIENT");
this.client = client;
}

View file

@ -24,7 +24,6 @@ public class AmazonS3Reference extends AbstractRefCounted implements Releasable
private final AmazonS3 client;
AmazonS3Reference(AmazonS3 client) {
super("AWS_S3_CLIENT");
this.client = client;
}

View file

@ -86,11 +86,9 @@ public final class NioGroupFactory {
private static class RefCountedNioGroup extends AbstractRefCounted implements NioGroup {
public static final String NAME = "ref-counted-nio-group";
private final NioSelectorGroup nioGroup;
private RefCountedNioGroup(NioSelectorGroup nioGroup) {
super(NAME);
this.nioGroup = nioGroup;
}

View file

@ -216,7 +216,6 @@ public final class ReleasableBytesReference implements RefCounted, Releasable, B
private final Releasable releasable;
RefCountedReleasable(Releasable releasable) {
super("bytes-reference");
this.releasable = releasable;
}

View file

@ -178,7 +178,6 @@ public abstract class CancellableSingleObjectCache<Input, Key, Value> {
private final CancellationChecks cancellationChecks = new CancellationChecks();
CachedItem(Key key) {
super("cached item");
this.key = key;
incRef(); // start with a refcount of 2 so we're not closed while adding the first listener
this.future.addListener(new ActionListener<Value>() {

View file

@ -81,12 +81,7 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final PlainActionFuture<Void> allClientsClosedListener = PlainActionFuture.newFuture();
private final RefCounted refCounted = new AbstractRefCounted("abstract-http-server-transport") {
@Override
protected void closeInternal() {
allClientsClosedListener.onResponse(null);
}
};
private final RefCounted refCounted = AbstractRefCounted.of(() -> allClientsClosedListener.onResponse(null));
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final HttpClientStatsTracker httpClientStatsTracker;

View file

@ -145,13 +145,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ShardLock shardLock;
private final OnClose onClose;
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
protected void closeInternal() {
// close us once we are done
Store.this.closeInternal();
}
};
private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::closeInternal); // close us once we are done
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);

View file

@ -310,9 +310,7 @@ public class IndicesService extends AbstractLifecycleComponent
// avoid closing these resources while ongoing requests are still being processed, we use a
// ref count which will only close them when both this service and all index services are
// actually closed
indicesRefCount = new AbstractRefCounted("indices") {
@Override
protected void closeInternal() {
indicesRefCount = AbstractRefCounted.of(() -> {
try {
IOUtils.close(
analysisRegistry,
@ -326,8 +324,7 @@ public class IndicesService extends AbstractLifecycleComponent
} finally {
closeLatch.countDown();
}
}
};
});
final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
nodeWriteDanglingIndicesInfo = WRITE_DANGLING_INDICES_INFO_SETTING.get(settings);

View file

@ -39,7 +39,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class MultiFileWriter extends AbstractRefCounted implements Releasable {
public MultiFileWriter(Store store, RecoveryState.Index indexState, String tempFilePrefix, Logger logger, Runnable ensureOpen) {
super("multi_file_writer");
this.store = store;
this.indexState = indexState;
this.tempFilePrefix = tempFilePrefix;

View file

@ -96,7 +96,6 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
DiscoveryNode sourceNode,
SnapshotFilesProvider snapshotFilesProvider,
PeerRecoveryTargetService.RecoveryListener listener) {
super("recovery_status");
this.cancellableThreads = new CancellableThreads();
this.recoveryId = idGenerator.incrementAndGet();
this.listener = listener;

View file

@ -66,12 +66,7 @@ public class ReaderContext implements Releasable {
this.singleSession = singleSession;
this.keepAlive = new AtomicLong(keepAliveInMillis);
this.lastAccessTime = new AtomicLong(nowInMillis());
this.refCounted = new AbstractRefCounted("reader_context") {
@Override
protected void closeInternal() {
doClose();
}
};
this.refCounted = AbstractRefCounted.of(this::doClose);
}
public void validate(TransportRequest request) {

View file

@ -37,21 +37,8 @@ public class ClusterConnectionManager implements ConnectionManager {
private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
private final ConcurrentMap<DiscoveryNode, ListenableFuture<Void>> pendingConnections = ConcurrentCollections.newConcurrentMap();
private final AbstractRefCounted connectingRefCounter = new AbstractRefCounted("connection manager") {
@Override
protected void closeInternal() {
Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
closeLatch.countDown();
}
};
private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
private final Transport transport;
private final ConnectionProfile defaultProfile;
private final AtomicBoolean closing = new AtomicBoolean(false);
@ -237,6 +224,19 @@ public class ClusterConnectionManager implements ConnectionManager {
}
}
private void pendingConnectionsComplete() {
final Iterator<Map.Entry<DiscoveryNode, Transport.Connection>> iterator = connectedNodes.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<DiscoveryNode, Transport.Connection> next = iterator.next();
try {
IOUtils.closeWhileHandlingException(next.getValue());
} finally {
iterator.remove();
}
}
closeLatch.countDown();
}
private void internalOpenConnection(DiscoveryNode node, ConnectionProfile connectionProfile,
ActionListener<Transport.Connection> listener) {
transport.openConnection(node, connectionProfile, listener.map(connection -> {

View file

@ -43,6 +43,7 @@ import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
@ -350,7 +351,7 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
} catch (AlreadyClosedException ex) {
throw ex;
} catch (IllegalStateException ex) {
assertEquals("reader_context is already closed can't increment refCount current count [0]", ex.getMessage());
assertEquals(AbstractRefCounted.ALREADY_CLOSED_MESSAGE, ex.getMessage());
} catch (SearchContextMissingException ex) {
// that's fine
}

View file

@ -265,7 +265,6 @@ public class MockNioTransport extends TcpTransport {
private final Releasable releasable;
LeakAwareRefCounted(Releasable releasable) {
super("leak-aware-ref-counted");
this.releasable = releasable;
leak = LeakTracker.INSTANCE.track(releasable);
}

View file

@ -192,7 +192,6 @@ public class CcrRestoreSourceService extends AbstractLifecycleComponent implemen
private RestoreSession(String sessionUUID, IndexShard indexShard, Engine.IndexCommitRef commitRef,
Scheduler.Cancellable timeoutTask) {
super("restore-session");
this.sessionUUID = sessionUUID;
this.indexShard = indexShard;
this.commitRef = commitRef;

View file

@ -64,21 +64,7 @@ public class CacheFile {
* for it. Once this instance has been evicted, all listeners notified and all {@link FileChannelReference} for it released,
* it makes sure to delete the physical file backing this cache.
*/
private final AbstractRefCounted refCounter = new AbstractRefCounted("CacheFile") {
@Override
protected void closeInternal() {
assert evicted.get();
assert assertNoPendingListeners();
try {
Files.deleteIfExists(file);
} catch (IOException e) {
// nothing to do but log failures here since closeInternal could be called from anywhere and must not throw
logger.warn(() -> new ParameterizedMessage("Failed to delete [{}]", file), e);
} finally {
listener.onCacheFileDelete(CacheFile.this);
}
}
};
private final AbstractRefCounted refCounter = AbstractRefCounted.of(this::deleteFile);
private final SparseFileTracker tracker;
private final CacheKey cacheKey;
@ -115,7 +101,6 @@ public class CacheFile {
private final FileChannel fileChannel;
FileChannelReference() throws IOException {
super("FileChannel[" + file + "]");
this.fileChannel = FileChannel.open(file, OPEN_OPTIONS);
refCounter.incRef();
}
@ -527,4 +512,17 @@ public class CacheFile {
}
return Collections.emptySortedSet();
}
private void deleteFile() {
assert evicted.get();
assert assertNoPendingListeners();
try {
Files.deleteIfExists(file);
} catch (IOException e) {
// nothing to do but log failures here since closeInternal could be called from anywhere and must not throw
logger.warn(() -> new ParameterizedMessage("Failed to delete [{}]", file), e);
} finally {
listener.onCacheFileDelete(CacheFile.this);
}
}
}

View file

@ -693,7 +693,6 @@ public class FrozenCacheService implements Releasable {
volatile int sharedBytesPos = -1;
CacheFileRegion(RegionKey regionKey, long regionSize) {
super("CacheFileRegion");
this.regionKey = regionKey;
assert regionSize > 0L;
tracker = new SparseFileTracker("file", regionSize);

View file

@ -54,7 +54,6 @@ public class SharedBytes extends AbstractRefCounted {
SharedBytes(int numRegions, long regionSize, NodeEnvironment environment, IntConsumer writeBytes, IntConsumer readBytes)
throws IOException {
super("shared-bytes");
this.numRegions = numRegions;
this.regionSize = regionSize;
final long fileSize = numRegions * regionSize;
@ -161,7 +160,6 @@ public class SharedBytes extends AbstractRefCounted {
private final long pageStart;
private IO(final int sharedBytesPos) {
super("shared-bytes-io");
this.sharedBytesPos = sharedBytesPos;
pageStart = getPhysicalOffset(sharedBytesPos);
}