diff --git a/docs/changelog/103817.yaml b/docs/changelog/103817.yaml new file mode 100644 index 000000000000..ff8978f1d377 --- /dev/null +++ b/docs/changelog/103817.yaml @@ -0,0 +1,6 @@ +pr: 103817 +summary: Fix deleting index during snapshot finalization +area: Snapshot/Restore +type: bug +issues: + - 101029 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 8d2e15f5027d..1152cf5f03e5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -17,17 +17,24 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.core.PathUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.AbstractDisruptionTestCase; +import org.elasticsearch.index.Index; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoryConflictException; @@ -36,6 +43,7 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.transport.MockTransportService; @@ -48,10 +56,12 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists; @@ -2060,6 +2070,106 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { assertSuccessful(partialFuture); } + public void testDeleteIndexWithOutOfOrderFinalization() { + + final var indexToDelete = "index-to-delete"; + final var indexNames = List.of(indexToDelete, "index-0", "index-1", "index-2"); + + for (final var indexName : indexNames) { + assertAcked(prepareCreate(indexName, indexSettingsNoReplicas(1))); + } + + final var repoName = "test-repo"; + createRepository(repoName, "fs"); + + // block the update-shard-snapshot-status requests so we can execute them in a specific order + final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName()); + final Map> otherIndexSnapshotListeners = indexNames.stream() + .collect(Collectors.toMap(k -> k, k -> new SubscribableListener<>())); + masterTransportService.addRequestHandlingBehavior( + SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, + (handler, request, channel, task) -> { + final var indexName = request.shardId().getIndexName(); + if (indexName.equals(indexToDelete)) { + handler.messageReceived(request, channel, task); + } else { + final var listener = otherIndexSnapshotListeners.get(indexName); + assertNotNull(indexName, listener); + listener.addListener( + ActionTestUtils.assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task)) + ); + } + } + ); + + // start the snapshots, each targeting index-to-delete and one other index so we can control their finalization order + final var snapshotCompleters = new HashMap(); + for (final var blockingIndex : List.of("index-0", "index-1", "index-2")) { + final var snapshotName = "snapshot-with-" + blockingIndex; + final var snapshotFuture = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .setPartial(true) + .setIndices(indexToDelete, blockingIndex) + .execute(); + + // ensure each snapshot has really started before moving on to the next one + safeAwait( + ClusterServiceUtils.addTemporaryStateListener( + internalCluster().getInstance(ClusterService.class), + cs -> SnapshotsInProgress.get(cs) + .forRepo(repoName) + .stream() + .anyMatch(e -> e.snapshot().getSnapshotId().getName().equals(snapshotName)) + ) + ); + + snapshotCompleters.put(blockingIndex, () -> { + assertFalse(snapshotFuture.isDone()); + otherIndexSnapshotListeners.get(blockingIndex).onResponse(null); + assertEquals(SnapshotState.SUCCESS, snapshotFuture.actionGet(10, TimeUnit.SECONDS).getSnapshotInfo().state()); + }); + } + + // set up to delete the index at a very specific moment during finalization + final var masterDeleteIndexService = internalCluster().getCurrentMasterNodeInstance(MetadataDeleteIndexService.class); + final var indexRecreatedListener = ClusterServiceUtils + // wait until the snapshot has entered finalization + .addTemporaryStateListener( + internalCluster().getInstance(ClusterService.class), + cs -> SnapshotsInProgress.get(cs) + .forRepo(repoName) + .stream() + .anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-with-index-1") && e.state().completed()) + ) + // execute the index deletion _directly on the master_ so it happens before the snapshot finalization executes + .andThen((l, ignored) -> masterDeleteIndexService.deleteIndices(new DeleteIndexClusterStateUpdateRequest(l.map(r -> { + assertTrue(r.isAcknowledged()); + return null; + })).indices(new Index[] { internalCluster().clusterService().state().metadata().index(indexToDelete).getIndex() }) + .ackTimeout(TimeValue.timeValueSeconds(10)) + .masterNodeTimeout(TimeValue.timeValueSeconds(10)))) + // ultimately create the index again so that taking a full snapshot will pick up any missing shard gen blob, and deleting that + // full snapshot will clean up all dangling shard-level blobs + .andThen((l, ignored) -> prepareCreate(indexToDelete, indexSettingsNoReplicas(1)).execute(l.map(r -> { + assertTrue(r.isAcknowledged()); + return null; + }))); + + // release the snapshots to be finalized, in this order + for (final var blockingIndex : List.of("index-1", "index-2", "index-0")) { + snapshotCompleters.get(blockingIndex).run(); + } + + safeAwait(indexRecreatedListener); + masterTransportService.clearAllRules(); + + // create a full snapshot to verify that the repo is still ok + createFullSnapshot(repoName, "final-full-snapshot"); + + // delete the full snapshot to clean up the leftover shard-level metadata (which trips repo consistency assertions otherwise) + startDeleteSnapshot(repoName, "final-full-snapshot").actionGet(10, TimeUnit.SECONDS); + } + private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) { final SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus(otherBlockedRepoName).get(); final List snapshotStatuses = snapshotsStatusResponse.getSnapshots(); diff --git a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java index 9129f6abd373..b459e1cfc733 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/FinalizeSnapshotContext.java @@ -100,7 +100,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener getGens(IndexId indexId) { return shardGenerations.getOrDefault(indexId, Collections.emptyList()); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 48caafc6bfab..b8b0498d9512 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1741,6 +1741,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp (indexId, gens) -> gens.forEach( (shardId, oldGen) -> toDelete.add( shardPath(indexId, shardId).buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen + .toBlobNamePart() ) ) ); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f973d456a6b7..bbabfca866a6 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -450,7 +450,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement endingSnapshots.add(targetSnapshot); initializingClones.remove(targetSnapshot); logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e); - removeFailedSnapshotFromClusterState(targetSnapshot, e, null); + removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY); }; // 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone @@ -1312,7 +1312,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement if (entry.isClone() && entry.state() == State.FAILED) { logger.debug("Removing failed snapshot clone [{}] from cluster state", entry); if (newFinalization) { - removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null); + removeFailedSnapshotFromClusterState( + snapshot, + new SnapshotException(snapshot, entry.failure()), + null, + ShardGenerations.EMPTY + ); } return; } @@ -1496,7 +1501,15 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement // a fatal like e.g. this node stopped being the master node snapshotListeners.onResponse(endAndGetListenersToResolve(snapshot)); runNextQueuedOperation(updatedRepositoryData, repository, true); - }, e -> handleFinalizationFailure(e, snapshot, repositoryData)), + }, + e -> handleFinalizationFailure( + e, + snapshot, + repositoryData, + // we might have written the new root blob before failing here, so we must use the updated shardGenerations + shardGenerations + ) + ), snInfo -> snapshotListeners.addListener(new ActionListener<>() { @Override public void onResponse(List> actionListeners) { @@ -1512,11 +1525,20 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement }) ) ); - }, e -> handleFinalizationFailure(e, snapshot, repositoryData))); + }, + e -> handleFinalizationFailure( + e, + snapshot, + repositoryData, + // a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can + // use the updated shardGenerations for all pending shard snapshots + shardGenerations + ) + )); } catch (Exception e) { logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e); assert false : new AssertionError("unexpected failure finalizing " + snapshot, e); - handleFinalizationFailure(e, snapshot, repositoryData); + handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY); } } @@ -1568,7 +1590,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement * @param snapshot snapshot that failed to finalize * @param repositoryData current repository data for the snapshot's repository */ - private void handleFinalizationFailure(Exception e, Snapshot snapshot, RepositoryData repositoryData) { + private void handleFinalizationFailure( + Exception e, + Snapshot snapshot, + RepositoryData repositoryData, + ShardGenerations shardGenerations + ) { if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) { // Failure due to not being master any more, don't try to remove snapshot from cluster state the next master // will try ending this snapshot again @@ -1581,7 +1608,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement failAllListenersOnMasterFailOver(e); } else { logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e); - removeFailedSnapshotFromClusterState(snapshot, e, repositoryData); + removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations); } } @@ -1701,7 +1728,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement * @param snapshot snapshot for which to remove the snapshot operation * @return updated cluster state */ - public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { + public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) { final SnapshotsInProgress snapshots = SnapshotsInProgress.get(state); ClusterState result = state; int indexOfEntry = -1; @@ -1762,7 +1789,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); final RepositoryShardId repositoryShardId = finishedShardEntry.getKey(); if (shardState.state() != ShardState.SUCCESS - || previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) { + || previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false + || shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) { continue; } updatedShardAssignments = maybeAddUpdatedAssignment( @@ -1779,7 +1807,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement .entrySet()) { final ShardSnapshotStatus shardState = finishedShardEntry.getValue(); if (shardState.state() == ShardState.SUCCESS - && previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey())) { + && previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey()) + && shardGenerations.hasShardGen(finishedShardEntry.getKey())) { updatedShardAssignments = maybeAddUpdatedAssignment( updatedShardAssignments, shardState, @@ -1862,13 +1891,18 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement * @param repositoryData repository data if the next finalization operation on the repository should be attempted or {@code null} if * no further actions should be executed */ - private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData) { + private void removeFailedSnapshotFromClusterState( + Snapshot snapshot, + Exception failure, + @Nullable RepositoryData repositoryData, + ShardGenerations shardGenerations + ) { assert failure != null : "Failure must be supplied"; submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot); + final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations); assert updatedState == currentState || endingSnapshots.contains(snapshot) : "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state"; // now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them