mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Revert "Reduce Data Loss in System Indices Migration" (#121119)
This commit is contained in:
parent
635a4c21de
commit
26b23d88aa
6 changed files with 36 additions and 180 deletions
|
@ -1,5 +0,0 @@
|
|||
pr: 120168
|
||||
summary: Reduce Data Loss in System Indices Migration
|
||||
area: Infra/Core
|
||||
type: bug
|
||||
issues: []
|
5
docs/changelog/121119.yaml
Normal file
5
docs/changelog/121119.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 121119
|
||||
summary: Revert "Reduce Data Loss in System Indices Migration"
|
||||
area: Infra/Core
|
||||
type: bug
|
||||
issues: []
|
|
@ -9,17 +9,14 @@
|
|||
|
||||
package org.elasticsearch.migration;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -31,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.indices.AssociatedIndexDescriptor;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.plugins.ActionPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.PluginsService;
|
||||
import org.elasticsearch.plugins.SystemIndexPlugin;
|
||||
|
@ -54,10 +50,6 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static java.util.Collections.unmodifiableSet;
|
||||
import static org.elasticsearch.common.util.set.Sets.newHashSet;
|
||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -263,18 +255,12 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase
|
|||
assertThat(thisIndexStats.getTotal().getDocs().getCount(), is((long) INDEX_DOC_COUNT));
|
||||
}
|
||||
|
||||
public static class TestPlugin extends Plugin implements SystemIndexPlugin, ActionPlugin {
|
||||
public static class TestPlugin extends Plugin implements SystemIndexPlugin {
|
||||
public final AtomicReference<Function<ClusterState, Map<String, Object>>> preMigrationHook = new AtomicReference<>();
|
||||
public final AtomicReference<BiConsumer<ClusterState, Map<String, Object>>> postMigrationHook = new AtomicReference<>();
|
||||
private final BlockingActionFilter blockingActionFilter;
|
||||
|
||||
public TestPlugin() {
|
||||
blockingActionFilter = new BlockingActionFilter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ActionFilter> getActionFilters() {
|
||||
return singletonList(blockingActionFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -313,26 +299,5 @@ public abstract class AbstractFeatureMigrationIntegTest extends ESIntegTestCase
|
|||
postMigrationHook.get().accept(clusterService.state(), preUpgradeMetadata);
|
||||
listener.onResponse(true);
|
||||
}
|
||||
|
||||
public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple {
|
||||
private Set<String> blockedActions = emptySet();
|
||||
|
||||
@Override
|
||||
protected boolean apply(String action, ActionRequest request, ActionListener<?> listener) {
|
||||
if (blockedActions.contains(action)) {
|
||||
throw new ElasticsearchException("force exception on [" + action + "]");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int order() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public void blockActions(String... actions) {
|
||||
blockedActions = unmodifiableSet(newHashSet(actions));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,14 +17,11 @@ import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeAction
|
|||
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeRequest;
|
||||
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.alias.TransportIndicesAliasesAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
|
||||
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
|
@ -39,12 +36,10 @@ import org.elasticsearch.common.compress.CompressedXContent;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.indices.SystemIndexDescriptor;
|
||||
import org.elasticsearch.migration.AbstractFeatureMigrationIntegTest.TestPlugin.BlockingActionFilter;
|
||||
import org.elasticsearch.painless.PainlessPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.plugins.SystemIndexPlugin;
|
||||
import org.elasticsearch.reindex.ReindexPlugin;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.upgrades.FeatureMigrationResults;
|
||||
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
|
||||
|
||||
|
@ -277,60 +272,6 @@ public class FeatureMigrationIT extends AbstractFeatureMigrationIntegTest {
|
|||
});
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "ES-10666") // This test uncovered an existing issue
|
||||
public void testIndexBlockIsRemovedWhenAliasRequestFails() throws Exception {
|
||||
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
|
||||
ensureGreen();
|
||||
|
||||
// Block the alias request to simulate a failure
|
||||
InternalTestCluster internalTestCluster = internalCluster();
|
||||
ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName());
|
||||
BlockingActionFilter blockingActionFilter = null;
|
||||
for (ActionFilter filter : actionFilters.filters()) {
|
||||
if (filter instanceof BlockingActionFilter) {
|
||||
blockingActionFilter = (BlockingActionFilter) filter;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertNotNull("BlockingActionFilter should exist", blockingActionFilter);
|
||||
blockingActionFilter.blockActions(TransportIndicesAliasesAction.NAME);
|
||||
|
||||
// Start the migration
|
||||
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
|
||||
|
||||
// Wait till the migration fails
|
||||
assertBusy(() -> {
|
||||
GetFeatureUpgradeStatusResponse statusResp = client().execute(
|
||||
GetFeatureUpgradeStatusAction.INSTANCE,
|
||||
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
|
||||
).get();
|
||||
logger.info(Strings.toString(statusResp));
|
||||
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR));
|
||||
});
|
||||
|
||||
// Get the settings to see if the write block was removed
|
||||
var allsettings = client().admin().indices().prepareGetSettings(INTERNAL_UNMANAGED.getIndexPattern()).get().getIndexToSettings();
|
||||
var internalUnmanagedOldIndexSettings = allsettings.get(".int-unman-old");
|
||||
var writeBlock = internalUnmanagedOldIndexSettings.get(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey());
|
||||
assertThat("Write block on old index should be removed on migration ERROR status", writeBlock, equalTo("false"));
|
||||
|
||||
// Unblock the alias request
|
||||
blockingActionFilter.blockActions();
|
||||
|
||||
// Retry the migration
|
||||
client().execute(PostFeatureUpgradeAction.INSTANCE, new PostFeatureUpgradeRequest(TEST_REQUEST_TIMEOUT)).get();
|
||||
|
||||
// Ensure that the migration is successful after the alias request is unblocked
|
||||
assertBusy(() -> {
|
||||
GetFeatureUpgradeStatusResponse statusResp = client().execute(
|
||||
GetFeatureUpgradeStatusAction.INSTANCE,
|
||||
new GetFeatureUpgradeStatusRequest(TEST_REQUEST_TIMEOUT)
|
||||
).get();
|
||||
logger.info(Strings.toString(statusResp));
|
||||
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
|
||||
});
|
||||
}
|
||||
|
||||
public void testMigrationWillRunAfterError() throws Exception {
|
||||
createSystemIndexForDescriptor(INTERNAL_MANAGED);
|
||||
|
||||
|
|
|
@ -77,17 +77,6 @@ public class IndicesAliasesResponse extends AcknowledgedResponse {
|
|||
return errors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of all errors from the response. If there are no errors, an empty list is returned.
|
||||
*/
|
||||
public List<ElasticsearchException> getErrors() {
|
||||
if (errors == false) {
|
||||
return List.of();
|
||||
} else {
|
||||
return actionResults.stream().filter(a -> a.getError() != null).map(AliasActionResult::getError).toList();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a response from a list of action results. Sets the errors boolean based
|
||||
* on whether an of the individual results contain an error.
|
||||
|
@ -176,13 +165,6 @@ public class IndicesAliasesResponse extends AcknowledgedResponse {
|
|||
return new AliasActionResult(indices, action, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* The error result if the action failed, null if the action succeeded.
|
||||
*/
|
||||
public ElasticsearchException getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
private int getStatus() {
|
||||
return error == null ? 200 : error.status().getStatus();
|
||||
}
|
||||
|
|
|
@ -15,9 +15,7 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
|
@ -34,6 +32,7 @@ import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
|
|||
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
|
||||
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedBiConsumer;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -60,7 +59,6 @@ import java.util.function.Consumer;
|
|||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction.NO_UPGRADE_REQUIRED_INDEX_VERSION;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.State.CLOSE;
|
||||
import static org.elasticsearch.core.Strings.format;
|
||||
|
||||
|
@ -450,33 +448,12 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|||
logAndThrowExceptionForFailures(bulkByScrollResponse)
|
||||
);
|
||||
} else {
|
||||
// Successful completion of reindexing. Now we need to set the alias and remove the old index.
|
||||
setAliasAndRemoveOldIndex(migrationInfo, ActionListener.wrap(aliasesResponse -> {
|
||||
if (aliasesResponse.hasErrors()) {
|
||||
var e = new ElasticsearchException("Aliases request had errors");
|
||||
for (var error : aliasesResponse.getErrors()) {
|
||||
e.addSuppressed(error);
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
logger.info(
|
||||
"Successfully migrated old index [{}] to new index [{}] from feature [{}]",
|
||||
oldIndexName,
|
||||
migrationInfo.getNextIndexName(),
|
||||
migrationInfo.getFeatureName()
|
||||
);
|
||||
delegate2.onResponse(bulkByScrollResponse);
|
||||
}, e -> {
|
||||
logger.error(
|
||||
() -> format(
|
||||
"An error occurred while changing aliases and removing the old index [%s] from feature [%s]",
|
||||
oldIndexName,
|
||||
migrationInfo.getFeatureName()
|
||||
),
|
||||
e
|
||||
);
|
||||
removeReadOnlyBlockOnReindexFailure(oldIndex, delegate2, e);
|
||||
}));
|
||||
// Successful completion of reindexing - remove read only and delete old index
|
||||
setWriteBlock(
|
||||
oldIndex,
|
||||
false,
|
||||
delegate2.delegateFailureAndWrap(setAliasAndRemoveOldIndex(migrationInfo, bulkByScrollResponse))
|
||||
);
|
||||
}
|
||||
}, e -> {
|
||||
logger.error(
|
||||
|
@ -534,7 +511,10 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|||
);
|
||||
}
|
||||
|
||||
private void setAliasAndRemoveOldIndex(SystemIndexMigrationInfo migrationInfo, ActionListener<IndicesAliasesResponse> listener) {
|
||||
private CheckedBiConsumer<ActionListener<BulkByScrollResponse>, AcknowledgedResponse, Exception> setAliasAndRemoveOldIndex(
|
||||
SystemIndexMigrationInfo migrationInfo,
|
||||
BulkByScrollResponse bulkByScrollResponse
|
||||
) {
|
||||
final IndicesAliasesRequestBuilder aliasesRequest = migrationInfo.createClient(baseClient).admin().indices().prepareAliases();
|
||||
aliasesRequest.removeIndex(migrationInfo.getCurrentIndexName());
|
||||
aliasesRequest.addAlias(migrationInfo.getNextIndexName(), migrationInfo.getCurrentIndexName());
|
||||
|
@ -553,42 +533,30 @@ public class SystemIndexMigrator extends AllocatedPersistentTask {
|
|||
);
|
||||
});
|
||||
|
||||
aliasesRequest.execute(listener);
|
||||
// Technically this callback might have a different cluster state, but it shouldn't matter - these indices shouldn't be changing
|
||||
// while we're trying to migrate them.
|
||||
return (listener, unsetReadOnlyResponse) -> aliasesRequest.execute(
|
||||
listener.delegateFailureAndWrap((l, deleteIndexResponse) -> l.onResponse(bulkByScrollResponse))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the write block on the index to the given value.
|
||||
* Makes the index readonly if it's not set as a readonly yet
|
||||
*/
|
||||
private void setWriteBlock(Index index, boolean readOnlyValue, ActionListener<AcknowledgedResponse> listener) {
|
||||
if (readOnlyValue) {
|
||||
// Setting the Block with an AddIndexBlockRequest ensures all shards have accounted for the block and all
|
||||
// in-flight writes are completed before returning.
|
||||
baseClient.admin()
|
||||
.indices()
|
||||
.addBlock(
|
||||
new AddIndexBlockRequest(WRITE, index.getName()).masterNodeTimeout(MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT),
|
||||
listener.delegateFailureAndWrap((l, response) -> {
|
||||
if (response.isAcknowledged() == false) {
|
||||
throw new ElasticsearchException("Failed to acknowledge read-only block index request");
|
||||
}
|
||||
l.onResponse(response);
|
||||
})
|
||||
);
|
||||
} else {
|
||||
// The only way to remove a Block is via a settings update.
|
||||
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), false).build();
|
||||
metadataUpdateSettingsService.updateSettings(
|
||||
new UpdateSettingsClusterStateUpdateRequest(
|
||||
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
|
||||
TimeValue.ZERO,
|
||||
readOnlySettings,
|
||||
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
|
||||
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
|
||||
index
|
||||
),
|
||||
listener
|
||||
);
|
||||
}
|
||||
final Settings readOnlySettings = Settings.builder().put(IndexMetadata.INDEX_BLOCKS_WRITE_SETTING.getKey(), readOnlyValue).build();
|
||||
|
||||
metadataUpdateSettingsService.updateSettings(
|
||||
new UpdateSettingsClusterStateUpdateRequest(
|
||||
MasterNodeRequest.INFINITE_MASTER_NODE_TIMEOUT,
|
||||
TimeValue.ZERO,
|
||||
readOnlySettings,
|
||||
UpdateSettingsClusterStateUpdateRequest.OnExisting.OVERWRITE,
|
||||
UpdateSettingsClusterStateUpdateRequest.OnStaticSetting.REJECT,
|
||||
index
|
||||
),
|
||||
listener
|
||||
);
|
||||
}
|
||||
|
||||
private void reindex(SystemIndexMigrationInfo migrationInfo, ActionListener<BulkByScrollResponse> listener) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue