Run potentially expensive checks in SystemIndexMetadataUpgraderService off the applier thread (#89449)

Checking all indices for changes during all cluster state updates can become very expensive at scales of O(10k) indices or more. It takes more than 1% of the total CPU time when bootstrapping the many-shards benchmark cluster and quite a bit more than that in terms of wall-time by holding up CS application. As far as I can see we can safely move this check to the management pool.

In order to simplify the mechanics of this move, this change brings the check in-line with how we do similar checks in e.g. the snapshot service. We don't have to hold on to the last inspected map of indices here. We can simply check all indices when becoming master and then keep checking against the last applied state. This makes the logic in the CS update task that might be triggered easier to reason about and more stable as well. For the master update task this comes at the cost of making it slightly more expensive but as it should effectively never run, its performance is irrelevant here.
This commit is contained in:
Armin Braun 2022-08-27 14:41:29 +02:00 committed by GitHub
parent cdcf238787
commit 06226397fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 53 deletions

View file

@ -15,10 +15,11 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
@ -36,11 +37,10 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
private final SystemIndices systemIndices;
private final ClusterService clusterService;
private boolean master = false;
private volatile Map<String, IndexMetadata> lastIndexMetadataMap = ImmutableOpenMap.of();
private volatile boolean updateTaskPending = false;
private volatile long triggeredVersion = -1L;
public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) {
this.systemIndices = systemIndices;
this.clusterService = clusterService;
@ -48,19 +48,27 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster() != master) {
this.master = event.localNodeMaster();
}
if (master && updateTaskPending == false) {
if (updateTaskPending == false
&& event.localNodeMaster()
&& (event.previousState().nodes().isLocalNodeElectedMaster() == false
|| event.state().metadata().indices() != event.previousState().metadata().indices())) {
final Map<String, IndexMetadata> indexMetadataMap = event.state().metadata().indices();
if (lastIndexMetadataMap != indexMetadataMap) {
final var previousIndices = event.previousState().metadata().indices();
final long triggerV = event.state().version();
triggeredVersion = triggerV;
// Fork to the management pool to avoid blocking the cluster applier thread unnecessarily for very large index counts
// TODO: we should have a more efficient way of getting just the changed indices so that we don't have to fork here
clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
@Override
protected void doRun() {
if (triggeredVersion != triggerV) {
// don't run if another newer check task was triggered already
return;
}
for (Map.Entry<String, IndexMetadata> cursor : indexMetadataMap.entrySet()) {
if (cursor.getValue() != lastIndexMetadataMap.get(cursor.getKey())) {
if (cursor.getValue() != previousIndices.get(cursor.getKey())) {
IndexMetadata indexMetadata = cursor.getValue();
boolean requiresUpdate = requiresUpdate(indexMetadata);
if (requiresUpdate) {
if (requiresUpdate(indexMetadata)) {
updateTaskPending = true;
submitUnbatchedTask(
"system_index_metadata_upgrade_service {system metadata change}",
@ -71,6 +79,13 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
}
}
}
@Override
public void onFailure(Exception e) {
logger.error("unexpected exception on checking for metadata upgrades", e);
assert false : e;
}
});
}
}
@ -124,7 +139,6 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
final List<IndexMetadata> updatedMetadata = new ArrayList<>();
for (Map.Entry<String, IndexMetadata> entry : indexMetadataMap.entrySet()) {
final IndexMetadata indexMetadata = entry.getValue();
if (indexMetadata != lastIndexMetadataMap.get(entry.getKey())) {
final boolean shouldBeSystem = shouldBeSystem(indexMetadata);
IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata);
boolean updated = false;
@ -156,7 +170,6 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
updatedMetadata.add(builder.build());
}
}
}
if (updatedMetadata.isEmpty() == false) {
final Metadata.Builder builder = Metadata.builder(currentState.metadata());
@ -174,7 +187,6 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener {
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
lastIndexMetadataMap = newState.metadata().indices();
updateTaskPending = false;
}
}

View file

@ -82,6 +82,10 @@ public class ClusterService extends AbstractLifecycleComponent {
this.clusterApplierService = clusterApplierService;
}
public ThreadPool threadPool() {
return clusterApplierService.threadPool();
}
public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
clusterApplierService.setNodeConnectionsService(nodeConnectionsService);
}