From 26b23d88aa77b43a48cd6a121519c5abc136818f Mon Sep 17 00:00:00 2001 From: John Verwolf Date: Tue, 28 Jan 2025 17:49:38 -0800 Subject: [PATCH] Revert "Reduce Data Loss in System Indices Migration" (#121119) --- docs/changelog/120168.yaml | 5 - docs/changelog/121119.yaml | 5 + .../AbstractFeatureMigrationIntegTest.java | 37 +------- .../migration/FeatureMigrationIT.java | 59 ------------ .../indices/alias/IndicesAliasesResponse.java | 18 ---- .../upgrades/SystemIndexMigrator.java | 92 ++++++------------- 6 files changed, 36 insertions(+), 180 deletions(-) delete mode 100644 docs/changelog/120168.yaml create mode 100644 docs/changelog/121119.yaml diff --git a/docs/changelog/120168.yaml b/docs/changelog/120168.yaml deleted file mode 100644 index d4bb32189516..000000000000 --- a/docs/changelog/120168.yaml +++ /dev/null @@ -1,5 +0,0 @@ -pr: 120168 -summary: Reduce Data Loss in System Indices Migration -area: Infra/Core -type: bug -issues: [] diff --git a/docs/changelog/121119.yaml b/docs/changelog/121119.yaml new file mode 100644 index 000000000000..ad05011affbb --- /dev/null +++ b/docs/changelog/121119.yaml @@ -0,0 +1,5 @@ +pr: 121119 +summary: Revert "Reduce Data Loss in System Indices Migration" +area: Infra/Core +type: bug +issues: [] diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java index 84e45024b69f..860d63000f12 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/AbstractFeatureMigrationIntegTest.java @@ -9,17 +9,14 @@ package org.elasticsearch.migration; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.support.ActionFilter; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; @@ -31,7 +28,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.AssociatedIndexDescriptor; import org.elasticsearch.indices.SystemIndexDescriptor; -import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.SystemIndexPlugin; @@ -54,10 +50,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; -import static java.util.Collections.emptySet; -import static java.util.Collections.singletonList; -import static java.util.Collections.unmodifiableSet; -import static org.elasticsearch.common.util.set.Sets.newHashSet; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; @@ -263,18 +255,12 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT)); } - public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin { + public static class TestPlugin extends Plugin implements SystemIndexPlugin { public final AtomicReference>> preMigrationHook = new AtomicReference<>(); public final AtomicReference>> postMigrationHook = new AtomicReference<>(); - private final BlockingActionFilter blockingActionFilter; public TestPlugin() { - blockingActionFilter = new BlockingActionFilter(); - } - @Override - public List getActionFilters() { - return singletonList(blockingActionFilter); } @Override @@ -313,26 +299,5 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata); listener.onResponse(true); } - - public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple { - private Set blockedActions = emptySet(); - - @Override - protected boolean apply(String action, ActionRequest request, ActionListener listener) { - if (blockedActions.contains(action)) { - throw new ElasticsearchException("force exception on [" + action + "]"); - } - return true; - } - - @Override - public int order() { - return 0; - } - - public void blockActions(String... actions) { - blockedActions = unmodifiableSet(newHashSet(actions)); - } - } } } diff --git a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java index ee95ce551382..cdf817a6b17b 100644 --- a/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java @@ -17,14 +17,11 @@ import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeAction import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest; import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse; import org.elasticsearch.action.admin.indices.alias.Alias; -import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction; import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.ActionFilter; -import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; @@ -39,12 +36,10 @@ import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.indices.SystemIndexDescriptor; -import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter; import org.elasticsearch.painless.PainlessPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.SystemIndexPlugin; import org.elasticsearch.reindex.ReindexPlugin; -import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.upgrades.FeatureMigrationResults; import org.elasticsearch.upgrades.SingleFeatureMigrationResult; @@ -277,60 +272,6 @@ public class FeatureMigrationIT extends AbstractFeatureMigrationIntegTest { }); } - @AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue - public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception { - createSystemIndexForDescriptor(INTERNAL_UNMANAGED); - ensureGreen(); - - // Block the alias request to simulate a failure - InternalTestCluster internalTestCluster = internalCluster(); - ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName()); - BlockingActionFilter blockingActionFilter = null; - for (ActionFilter filter : actionFilters.filters()) { - if (filter instanceof BlockingActionFilter) { - blockingActionFilter = (BlockingActionFilter) filter; - break; - } - } - assertNotNull("BlockingActionFilter should exist", blockingActionFilter); - blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME); - - // Start the migration - client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); - - // Wait till the migration fails - assertBusy(() -> { - GetFeatureUpgradeStatusResponse statusResp = client().execute( - GetFeatureUpgradeStatusAction.INSTANCE, - new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT) - ).get(); - logger.info(Strings.toString(statusResp)); - assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR)); - }); - - // Get the settings to see if the write block was removed - var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings(); - var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old"); - var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey()); - assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false")); - - // Unblock the alias request - blockingActionFilter.blockActions(); - - // Retry the migration - client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get(); - - // Ensure that the migration is successful after the alias request is unblocked - assertBusy(() -> { - GetFeatureUpgradeStatusResponse statusResp = client().execute( - GetFeatureUpgradeStatusAction.INSTANCE, - new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT) - ).get(); - logger.info(Strings.toString(statusResp)); - assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED)); - }); - } - public void testMigrationWillRunAfterError() throws Exception { createSystemIndexForDescriptor(INTERNAL_MANAGED); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java index 071e9b42752c..69ab9f57d2be 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/alias/IndicesAliasesResponse.java @@ -77,17 +77,6 @@ public class IndicesAliasesResponse extends AcknowledgedResponse { return errors; } - /** - * Get a list of all errors from the response. If there are no errors, an empty list is returned. - */ - public List getErrors() { - if (errors == false) { - return List.of(); - } else { - return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList(); - } - } - /** * Build a response from a list of action results. Sets the errors boolean based * on whether an of the individual results contain an error. @@ -176,13 +165,6 @@ public class IndicesAliasesResponse extends AcknowledgedResponse { return new AliasActionResult(indices, action, null); } - /** - * The error result if the action failed, null if the action succeeded. - */ - public ElasticsearchException getError() { - return error; - } - private int getStatus() { return error == null ? 200 : error.status().getStatus(); } diff --git a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java index cdd466c567e8..186618f3662f 100644 --- a/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java +++ b/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrator.java @@ -15,9 +15,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder; -import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; -import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -34,6 +32,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; @@ -60,7 +59,6 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION; -import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE; import static org.elasticsearch.core.Strings.format; @@ -450,33 +448,12 @@ public class SystemIndexMigrator extends AllocatedPersistentTask { logAndThrowExceptionForFailures(bulkByScrollResponse) ); } else { - // Successful completion of reindexing. Now we need to set the alias and remove the old index. - setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> { - if (aliasesResponse.hasErrors()) { - var e = new ElasticsearchException("Aliases request had errors"); - for (var error : aliasesResponse.getErrors()) { - e.addSuppressed(error); - } - throw e; - } - logger.info( - "Successfully migrated old index [{}] to new index [{}] from feature [{}]", - oldIndexName, - migrationInfo.getNextIndexName(), - migrationInfo.getFeatureName() - ); - delegate2.onResponse(bulkByScrollResponse); - }, e -> { - logger.error( - () -> format( - "An error occurred while changing aliases and removing the old index [%s] from feature [%s]", - oldIndexName, - migrationInfo.getFeatureName() - ), - e - ); - removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e); - })); + // Successful completion of reindexing - remove read only and delete old index + setWriteBlock( + oldIndex, + false, + delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse)) + ); } }, e -> { logger.error( @@ -534,7 +511,10 @@ public class SystemIndexMigrator extends AllocatedPersistentTask { ); } - private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) { + private CheckedBiConsumer, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex( + SystemIndexMigrationInfo migrationInfo, + BulkByScrollResponse bulkByScrollResponse + ) { final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases(); aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName()); aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName()); @@ -553,42 +533,30 @@ public class SystemIndexMigrator extends AllocatedPersistentTask { ); }); - aliasesRequest.execute(listener); + // Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing + // while we're trying to migrate them. + return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute( + listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse)) + ); } /** - * Sets the write block on the index to the given value. + * Makes the index readonly if it's not set as a readonly yet */ private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener listener) { - if (readOnlyValue) { - // Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all - // in-flight writes are completed before returning. - baseClient.admin() - .indices() - .addBlock( - new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT), - listener.delegateFailureAndWrap((l, response) -> { - if (response.isAcknowledged() == false) { - throw new ElasticsearchException("Failed to acknowledge read-only block index request"); - } - l.onResponse(response); - }) - ); - } else { - // The only way to remove a Block is via a settings update. - final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build(); - metadataUpdateSettingsService.updateSettings( - new UpdateSettingsClusterStateUpdateRequest( - MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, - TimeValue.ZERO, - readOnlySettings, - UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, - UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT, - index - ), - listener - ); - } + final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build(); + + metadataUpdateSettingsService.updateSettings( + new UpdateSettingsClusterStateUpdateRequest( + MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT, + TimeValue.ZERO, + readOnlySettings, + UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE, + UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT, + index + ), + listener + ); } private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener listener) {