diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java index b1242584a6b9..c5b80e50c830 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/SystemIndexMetadataUpgradeService.java @@ -15,10 +15,11 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.List; @@ -36,11 +37,10 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { private final SystemIndices systemIndices; private final ClusterService clusterService; - private boolean master = false; - - private volatile Map lastIndexMetadataMap = ImmutableOpenMap.of(); private volatile boolean updateTaskPending = false; + private volatile long triggeredVersion = -1L; + public SystemIndexMetadataUpgradeService(SystemIndices systemIndices, ClusterService clusterService) { this.systemIndices = systemIndices; this.clusterService = clusterService; @@ -48,29 +48,44 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { @Override public void clusterChanged(ClusterChangedEvent event) { - if (event.localNodeMaster() != master) { - this.master = event.localNodeMaster(); - } - - if (master && updateTaskPending == false) { + if (updateTaskPending == false + && event.localNodeMaster() + && (event.previousState().nodes().isLocalNodeElectedMaster() == false + || event.state().metadata().indices() != event.previousState().metadata().indices())) { final Map indexMetadataMap = event.state().metadata().indices(); - - if (lastIndexMetadataMap != indexMetadataMap) { - for (Map.Entry cursor : indexMetadataMap.entrySet()) { - if (cursor.getValue() != lastIndexMetadataMap.get(cursor.getKey())) { - IndexMetadata indexMetadata = cursor.getValue(); - boolean requiresUpdate = requiresUpdate(indexMetadata); - if (requiresUpdate) { - updateTaskPending = true; - submitUnbatchedTask( - "system_index_metadata_upgrade_service {system metadata change}", - new SystemIndexMetadataUpdateTask() - ); - break; + final var previousIndices = event.previousState().metadata().indices(); + final long triggerV = event.state().version(); + triggeredVersion = triggerV; + // Fork to the management pool to avoid blocking the cluster applier thread unnecessarily for very large index counts + // TODO: we should have a more efficient way of getting just the changed indices so that we don't have to fork here + clusterService.threadPool().executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() { + @Override + protected void doRun() { + if (triggeredVersion != triggerV) { + // don't run if another newer check task was triggered already + return; + } + for (Map.Entry cursor : indexMetadataMap.entrySet()) { + if (cursor.getValue() != previousIndices.get(cursor.getKey())) { + IndexMetadata indexMetadata = cursor.getValue(); + if (requiresUpdate(indexMetadata)) { + updateTaskPending = true; + submitUnbatchedTask( + "system_index_metadata_upgrade_service {system metadata change}", + new SystemIndexMetadataUpdateTask() + ); + break; + } } } } - } + + @Override + public void onFailure(Exception e) { + logger.error("unexpected exception on checking for metadata upgrades", e); + assert false : e; + } + }); } } @@ -124,37 +139,35 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { final List updatedMetadata = new ArrayList<>(); for (Map.Entry entry : indexMetadataMap.entrySet()) { final IndexMetadata indexMetadata = entry.getValue(); - if (indexMetadata != lastIndexMetadataMap.get(entry.getKey())) { - final boolean shouldBeSystem = shouldBeSystem(indexMetadata); - IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); - boolean updated = false; - if (shouldBeSystem != indexMetadata.isSystem()) { - builder.system(indexMetadata.isSystem() == false); - updated = true; - } - if (shouldBeSystem && isVisible(indexMetadata)) { - builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); - builder.settingsVersion(builder.settingsVersion() + 1); - updated = true; - } - if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { - for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { - if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { - builder.removeAlias(aliasMetadata.alias()); - builder.putAlias( - AliasMetadata.builder(aliasMetadata.alias()) - .filter(aliasMetadata.filter()) - .indexRouting(aliasMetadata.indexRouting()) - .isHidden(true) - .searchRouting(aliasMetadata.searchRouting()) - .writeIndex(aliasMetadata.writeIndex()) - ); - } + final boolean shouldBeSystem = shouldBeSystem(indexMetadata); + IndexMetadata.Builder builder = IndexMetadata.builder(indexMetadata); + boolean updated = false; + if (shouldBeSystem != indexMetadata.isSystem()) { + builder.system(indexMetadata.isSystem() == false); + updated = true; + } + if (shouldBeSystem && isVisible(indexMetadata)) { + builder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.SETTING_INDEX_HIDDEN, true)); + builder.settingsVersion(builder.settingsVersion() + 1); + updated = true; + } + if (shouldBeSystem && hasVisibleAlias(indexMetadata)) { + for (AliasMetadata aliasMetadata : indexMetadata.getAliases().values()) { + if (Boolean.FALSE.equals(aliasMetadata.isHidden())) { + builder.removeAlias(aliasMetadata.alias()); + builder.putAlias( + AliasMetadata.builder(aliasMetadata.alias()) + .filter(aliasMetadata.filter()) + .indexRouting(aliasMetadata.indexRouting()) + .isHidden(true) + .searchRouting(aliasMetadata.searchRouting()) + .writeIndex(aliasMetadata.writeIndex()) + ); } } - if (updated) { - updatedMetadata.add(builder.build()); - } + } + if (updated) { + updatedMetadata.add(builder.build()); } } @@ -174,7 +187,6 @@ public class SystemIndexMetadataUpgradeService implements ClusterStateListener { @Override public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - lastIndexMetadataMap = newState.metadata().indices(); updateTaskPending = false; } } diff --git a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 9506eacd94c6..00f986ba4d54 100644 --- a/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -82,6 +82,10 @@ public class ClusterService extends AbstractLifecycleComponent { this.clusterApplierService = clusterApplierService; } + public ThreadPool threadPool() { + return clusterApplierService.threadPool(); + } + public synchronized void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) { clusterApplierService.setNodeConnectionsService(nodeConnectionsService); }