From 06226397fb95c1e72693008ebf929a7066e22c64 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 27 Aug 2022 14:41:29 +0200 Subject: [PATCH] Run potentially expensive checks in SystemIndexMetadataUpgraderService off the applier thread (#89449) Checking all indices for changes during all cluster state updates can become very expensive at scales of O(10k) indices or more. It takes more than 1% of the total CPU time when bootstrapping the many-shards benchmark cluster and quite a bit more than that in terms of wall-time by holding up CS application. As far as I can see we can safely move this check to the management pool. In order to simplify the mechanics of this move, this change brings the check in-line with how we do similar checks in e.g. the snapshot service. We don't have to hold on to the last inspected map of indices here. We can simply check all indices when becoming master and then keep checking against the last applied state. This makes the logic in the CS update task that might be triggered easier to reason about and more stable as well. For the master update task this comes at the cost of making it slightly more expensive but as it should effectively never run, its performance is irrelevant here. --- .../SystemIndexMetadataUpgradeService.java | 118 ++++++++++-------- .../cluster/service/ClusterService.java | 4 + 2 files changed, 69 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index b1242584a6b9..c5b80e50c830 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -15,10 +15,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; @@ -36,11 +37,10 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { private final SystemIndices systemIndices; private final ClusterService clusterService; - private boolean master = false; - - private volatile Map lastIndexMetadataMap = ImmutableOpenMap.of(); private volatile boolean updateTaskPending = false; + private volatile long triggeredVersion = -1L; + public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) { this.systemIndices = systemIndices; this.clusterService = clusterService; @@ -48,29 +48,44 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.localNodeMaster() != master) { - this.master = event.localNodeMaster(); - } - - if (master && updateTaskPending == false) { + if (updateTaskPending == false + && event.localNodeMaster() + && (event.previousState().nodes().isLocalNodeElectedMaster() == false + || event.state().metadata().indices() != event.previousState().metadata().indices())) { final Map indexMetadataMap = event.state().metadata().indices(); - - if (lastIndexMetadataMap != indexMetadataMap) { - for (Map.Entry cursor : indexMetadataMap.entrySet()) { - if (cursor.getValue() != lastIndexMetadataMap.get(cursor.getKey())) { - IndexMetadata indexMetadata = cursor.getValue(); - boolean requiresUpdate = requiresUpdate(indexMetadata); - if (requiresUpdate) { - updateTaskPending = true; - submitUnbatchedTask( - "system_index_metadata_upgrade_service {system metadata change}", - new SystemIndexMetadataUpdateTask() - ); - break; + final var previousIndices = event.previousState().metadata().indices(); + final long triggerV = event.state().version(); + triggeredVersion = triggerV; + // Fork to the management pool to avoid blocking the cluster applier thread unnecessarily for very large index counts + // TODO: we should have a more efficient way of getting just the changed indices so that we don't have to fork here + clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + protected void doRun() { + if (triggeredVersion != triggerV) { + // don't run if another newer check task was triggered already + return; + } + for (Map.Entry cursor : indexMetadataMap.entrySet()) { + if (cursor.getValue() != previousIndices.get(cursor.getKey())) { + IndexMetadata indexMetadata = cursor.getValue(); + if (requiresUpdate(indexMetadata)) { + updateTaskPending = true; + submitUnbatchedTask( + "system_index_metadata_upgrade_service {system metadata change}", + new SystemIndexMetadataUpdateTask() + ); + break; + } } } } - } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected exception on checking for metadata upgrades", e); + assert false : e; + } + }); } } @@ -124,37 +139,35 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { final List updatedMetadata = new ArrayList<>(); for (Map.Entry entry : indexMetadataMap.entrySet()) { final IndexMetadata indexMetadata = entry.getValue(); - if (indexMetadata != lastIndexMetadataMap.get(entry.getKey())) { - final boolean shouldBeSystem = shouldBeSystem(indexMetadata); - IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); - boolean updated = false; - if (shouldBeSystem != indexMetadata.isSystem()) { - builder.system(indexMetadata.isSystem() == false); - updated = true; - } - if (shouldBeSystem && isVisible(indexMetadata)) { - builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); - builder.settingsVersion(builder.settingsVersion() + 1); - updated = true; - } - if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { - for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { - if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { - builder.removeAlias(aliasMetadata.alias()); - builder.putAlias( - AliasMetadata.builder(aliasMetadata.alias()) - .filter(aliasMetadata.filter()) - .indexRouting(aliasMetadata.indexRouting()) - .isHidden(true) - .searchRouting(aliasMetadata.searchRouting()) - .writeIndex(aliasMetadata.writeIndex()) - ); - } + final boolean shouldBeSystem = shouldBeSystem(indexMetadata); + IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); + boolean updated = false; + if (shouldBeSystem != indexMetadata.isSystem()) { + builder.system(indexMetadata.isSystem() == false); + updated = true; + } + if (shouldBeSystem && isVisible(indexMetadata)) { + builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); + builder.settingsVersion(builder.settingsVersion() + 1); + updated = true; + } + if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { + for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { + if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { + builder.removeAlias(aliasMetadata.alias()); + builder.putAlias( + AliasMetadata.builder(aliasMetadata.alias()) + .filter(aliasMetadata.filter()) + .indexRouting(aliasMetadata.indexRouting()) + .isHidden(true) + .searchRouting(aliasMetadata.searchRouting()) + .writeIndex(aliasMetadata.writeIndex()) + ); } } - if (updated) { - updatedMetadata.add(builder.build()); - } + } + if (updated) { + updatedMetadata.add(builder.build()); } } @@ -174,7 +187,6 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - lastIndexMetadataMap = newState.metadata().indices(); updateTaskPending = false; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 9506eacd94c6..00f986ba4d54 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -82,6 +82,10 @@ public class ClusterService extends AbstractLifecycleComponent { this.clusterApplierService = clusterApplierService; } + public ThreadPool threadPool() { + return clusterApplierService.threadPool(); + } + public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { clusterApplierService.setNodeConnectionsService(nodeConnectionsService); }