From b5843e4b98d2856310b871ef03e2db09927e4b6a Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 11 Oct 2023 07:03:31 +0100 Subject: [PATCH] Encapsulate snapshots deletion process (#100617) Introduces the `SnapshotsDeletion` class which encapsulates the process of deleting some collection of snapshots. In particular this class gives us somewhere to store various deletion-wide data which significantly reduces the length of some argument lists. Relates #100568 --- .../test/repository_url/10_basic.yml | 4 +- .../SharedClusterSnapshotRestoreIT.java | 2 +- .../blobstore/BlobStoreRepository.java | 846 +++++++++--------- 3 files changed, 443 insertions(+), 409 deletions(-) diff --git a/modules/repository-url/src/yamlRestTest/resources/rest-api-spec/test/repository_url/10_basic.yml b/modules/repository-url/src/yamlRestTest/resources/rest-api-spec/test/repository_url/10_basic.yml index 4508dacbfe7e..01152a5930f4 100644 --- a/modules/repository-url/src/yamlRestTest/resources/rest-api-spec/test/repository_url/10_basic.yml +++ b/modules/repository-url/src/yamlRestTest/resources/rest-api-spec/test/repository_url/10_basic.yml @@ -167,7 +167,7 @@ teardown: - match: {count: 3} - do: - catch: /cannot delete snapshot from a readonly repository/ + catch: /repository is readonly/ snapshot.delete: repository: repository-url snapshot: snapshot-two @@ -229,7 +229,7 @@ teardown: - match: {count: 3} - do: - catch: /cannot delete snapshot from a readonly repository/ + catch: /repository is readonly/ snapshot.delete: repository: repository-file snapshot: snapshot-one diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 7fa59f0b47b6..71d036cc6b0f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1050,7 +1050,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertRequestBuilderThrows( client.admin().cluster().prepareDeleteSnapshot("readonly-repo", "test-snap"), RepositoryException.class, - "cannot delete snapshot from a readonly repository" + "repository is readonly" ); logger.info("--> try making another snapshot"); diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9a2d53312d57..98d725b9d136 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -836,431 +836,465 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp IndexVersion repositoryFormatIndexVersion, SnapshotDeleteListener listener ) { - if (isReadOnly()) { - listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); - } else { - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - final Map rootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT); - final RepositoryData repositoryData = safeRepositoryData(repositoryDataGeneration, rootBlobs); - // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never - // delete an index that was created by another master node after writing this index-N blob. - final Map foundIndices = blobStore().blobContainer(indicesPath()) - .children(OperationPurpose.SNAPSHOT); - doDeleteShardSnapshots( - snapshotIds, - repositoryDataGeneration, - foundIndices, - rootBlobs, - repositoryData, - repositoryFormatIndexVersion, - listener - ); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e)); - } - }); - } - } - - /** - * The result of removing a snapshot from a shard folder in the repository. - * - * @param indexId Index that the snapshot was removed from - * @param shardId Shard id that the snapshot was removed from - * @param newGeneration Id of the new index-${uuid} blob that does not include the snapshot any more - * @param blobsToDelete Blob names in the shard directory that have become unreferenced in the new shard generation - */ - private record ShardSnapshotMetaDeleteResult( - IndexId indexId, - int shardId, - ShardGeneration newGeneration, - Collection blobsToDelete - ) {} - - // --------------------------------------------------------------------------------------------------------------------------------- - // The overall flow of execution - - /** - * After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation - * and then has all now unreferenced blobs in it deleted. - * - * @param snapshotIds SnapshotIds to delete - * @param originalRepositoryDataGeneration {@link RepositoryData} generation at the start of the process. - * @param originalIndexContainers All index containers at the start of the operation, obtained by listing the repository - * contents. - * @param originalRootBlobs All blobs found at the root of the repository at the start of the operation, obtained by - * listing the repository contents. - * @param originalRepositoryData {@link RepositoryData} at the start of the operation. - * @param repositoryFormatIndexVersion The minimum {@link IndexVersion} of the nodes in the cluster and the snapshots remaining in - * the repository. - * @param listener Listener to invoke once finished - */ - private void doDeleteShardSnapshots( - Collection snapshotIds, - long originalRepositoryDataGeneration, - Map originalIndexContainers, - Map originalRootBlobs, - RepositoryData originalRepositoryData, - IndexVersion repositoryFormatIndexVersion, - SnapshotDeleteListener listener - ) { - if (SnapshotsService.useShardGenerations(repositoryFormatIndexVersion)) { - // First write the new shard state metadata (with the removed snapshot) and compute deletion targets - final ListenableFuture> writeShardMetaDataAndComputeDeletesStep = - new ListenableFuture<>(); - writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, originalRepositoryData, true, writeShardMetaDataAndComputeDeletesStep); - // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: - // 1. Remove the snapshots from the list of existing snapshots - // 2. Update the index shard generations of all updated shard folders - // - // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created - // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only - // written if all shard paths have been successfully updated. - final ListenableFuture writeUpdatedRepoDataStep = new ListenableFuture<>(); - writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(shardDeleteResults -> { - final ShardGenerations.Builder builder = ShardGenerations.builder(); - for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults) { - builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); - } - final RepositoryData newRepositoryData = originalRepositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen( - newRepositoryData, - originalRepositoryDataGeneration, - repositoryFormatIndexVersion, - Function.identity(), - ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) - ); - }, listener::onFailure)); - // Once we have updated the repository, run the clean-ups - writeUpdatedRepoDataStep.addListener(ActionListener.wrap(newRepositoryData -> { - listener.onRepositoryDataWritten(newRepositoryData); - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - try (var refs = new RefCountingRunnable(listener::onDone)) { - cleanupUnlinkedRootAndIndicesBlobs( - snapshotIds, - originalIndexContainers, - originalRootBlobs, - newRepositoryData, - refs.acquireListener() - ); - cleanupUnlinkedShardLevelBlobs( - originalRepositoryData, - snapshotIds, - writeShardMetaDataAndComputeDeletesStep.result(), - refs.acquireListener() - ); - } - }, listener::onFailure)); - } else { - // Write the new repository data first (with the removed snapshot), using no shard generations - writeIndexGen( - originalRepositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY), - originalRepositoryDataGeneration, - repositoryFormatIndexVersion, - Function.identity(), - ActionListener.wrap(newRepositoryData -> { - try (var refs = new RefCountingRunnable(() -> { - listener.onRepositoryDataWritten(newRepositoryData); - listener.onDone(); - })) { - // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion - cleanupUnlinkedRootAndIndicesBlobs( - snapshotIds, - originalIndexContainers, - originalRootBlobs, - newRepositoryData, - refs.acquireListener() - ); - - // writeIndexGen finishes on master-service thread so must fork here. - threadPool.executor(ThreadPool.Names.SNAPSHOT) - .execute( - ActionRunnable.wrap( - refs.acquireListener(), - l0 -> writeUpdatedShardMetaDataAndComputeDeletes( - snapshotIds, - originalRepositoryData, - false, - l0.delegateFailure( - (l, deleteResults) -> cleanupUnlinkedShardLevelBlobs( - originalRepositoryData, - snapshotIds, - deleteResults, - l - ) - ) - ) - ) - ); - } - }, listener::onFailure) - ); - } - } - - // --------------------------------------------------------------------------------------------------------------------------------- - // Updating the shard-level metadata and accumulating results - - // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up. - private void writeUpdatedShardMetaDataAndComputeDeletes( - Collection snapshotIds, - RepositoryData originalRepositoryData, - boolean useShardGenerations, - ActionListener> onAllShardsCompleted - ) { - - final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - final List indices = originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds); - - if (indices.isEmpty()) { - onAllShardsCompleted.onResponse(Collections.emptyList()); - return; - } - - // Listener that flattens out the delete results for each index - final ActionListener> deleteIndexMetadataListener = new GroupedActionListener<>( - indices.size(), - onAllShardsCompleted.map(res -> res.stream().flatMap(Collection::stream).toList()) - ); - - for (IndexId indexId : indices) { - final Set snapshotsWithIndex = Set.copyOf(originalRepositoryData.getSnapshots(indexId)); - final Set survivingSnapshots = snapshotsWithIndex.stream() - .filter(id -> snapshotIds.contains(id) == false) - .collect(Collectors.toSet()); - final ListenableFuture> shardCountListener = new ListenableFuture<>(); - final Collection indexMetaGenerations = snapshotIds.stream() - .filter(snapshotsWithIndex::contains) - .map(id -> originalRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)) - .collect(Collectors.toSet()); - final ActionListener allShardCountsListener = new GroupedActionListener<>( - indexMetaGenerations.size(), - shardCountListener - ); - final BlobContainer indexContainer = indexContainer(indexId); - for (String indexMetaGeneration : indexMetaGenerations) { - executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { - try { - return INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry) - .getNumberOfShards(); - } catch (Exception ex) { - logger.warn( - () -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), - ex - ); - // Just invoke the listener without any shard generations to count it down, this index will be cleaned up - // by the stale data cleanup in the end. - // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just - // ignoring it and letting the cleanup deal with it. - return null; - } - })); + createSnapshotsDeletion(snapshotIds, repositoryDataGeneration, repositoryFormatIndexVersion, new ActionListener<>() { + @Override + public void onResponse(SnapshotsDeletion snapshotsDeletion) { + snapshotsDeletion.runDelete(listener); } - // ----------------------------------------------------------------------------------------------------------------------------- - // Determining the shard count + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); + } - shardCountListener.addListener(deleteIndexMetadataListener.delegateFailureAndWrap((delegate, counts) -> { - final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0); - if (shardCount == 0) { - delegate.onResponse(null); - return; - } - // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index - final ActionListener allShardsListener = new GroupedActionListener<>(shardCount, delegate); - for (int i = 0; i < shardCount; i++) { - final int shardId = i; - executor.execute(new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - final BlobContainer shardContainer = shardContainer(indexId, shardId); - final Set originalShardBlobs = shardContainer.listBlobs(OperationPurpose.SNAPSHOT).keySet(); - final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; - final long newGen; - if (useShardGenerations) { - newGen = -1L; - blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( - originalShardBlobs, - shardContainer, - originalRepositoryData.shardGenerations().getShardGen(indexId, shardId) - ).v1(); - } else { - Tuple tuple = buildBlobStoreIndexShardSnapshots( - originalShardBlobs, - shardContainer - ); - newGen = tuple.v2() + 1; - blobStoreIndexShardSnapshots = tuple.v1(); - } - allShardsListener.onResponse( - deleteFromShardSnapshotMeta( - survivingSnapshots, - indexId, - shardId, - snapshotIds, - shardContainer, - originalShardBlobs, - blobStoreIndexShardSnapshots, - newGen - ) - ); - } - - @Override - public void onFailure(Exception ex) { - logger.warn( - () -> format("%s failed to delete shard data for shard [%s][%s]", snapshotIds, indexId.getName(), shardId), - ex - ); - // Just passing null here to count down the listener instead of failing it, the stale data left behind - // here will be retried in the next delete or repository cleanup - allShardsListener.onResponse(null); - } - }); - } + private void createSnapshotsDeletion( + Collection snapshotIds, + long repositoryDataGeneration, + IndexVersion repositoryFormatIndexVersion, + ActionListener listener + ) { + if (isReadOnly()) { + listener.onFailure(new RepositoryException(metadata.name(), "repository is readonly")); + } else { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + final var originalRootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT); + return new SnapshotsDeletion( + snapshotIds, + repositoryDataGeneration, + repositoryFormatIndexVersion, + originalRootBlobs, + blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT), + safeRepositoryData(repositoryDataGeneration, originalRootBlobs) + ); })); } } - // ----------------------------------------------------------------------------------------------------------------------------- - // Updating each shard - /** - * Delete snapshot from shard level metadata. - * - * @param indexGeneration generation to write the new shard level level metadata to. If negative a uuid id shard generation should be - * used + *

+ * Represents the process of deleting some collection of snapshots within this repository which since 7.6.0 looks like this: + *

+ *
    + *
  • Write a new {@link BlobStoreIndexShardSnapshots} for each affected shard, and compute the blobs to delete.
  • + *
  • Update the {@link RepositoryData} to remove references to deleted snapshots/indices and point to the new + * {@link BlobStoreIndexShardSnapshots} files.
  • + *
  • Remove up any now-unreferenced blobs.
  • + *
+ *

+ * Until the {@link RepositoryData} is updated there should be no other activities in the repository, and in particular the root + * blob must not change until it is updated by this deletion and {@link SnapshotDeleteListener#onRepositoryDataWritten} is called. + *

*/ - private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( - Set survivingSnapshots, - IndexId indexId, - int shardId, - Collection snapshotIds, - BlobContainer shardContainer, - Set originalShardBlobs, - BlobStoreIndexShardSnapshots snapshots, - long indexGeneration - ) { - // Build a list of snapshots that should be preserved - final BlobStoreIndexShardSnapshots updatedSnapshots = snapshots.withRetainedSnapshots(survivingSnapshots); - ShardGeneration writtenGeneration = null; - try { - if (updatedSnapshots.snapshots().isEmpty()) { - return new ShardSnapshotMetaDeleteResult(indexId, shardId, ShardGenerations.DELETED_SHARD_GEN, originalShardBlobs); + class SnapshotsDeletion { + + /** + * The IDs of the snapshots to delete. + */ + private final Collection snapshotIds; + + /** + * The {@link RepositoryData} generation at the start of the process, to ensure that the {@link RepositoryData} does not change + * while the new {@link BlobStoreIndexShardSnapshots} are being written. + */ + private final long originalRepositoryDataGeneration; + + /** + * The minimum {@link IndexVersion} of the nodes in the cluster and the snapshots remaining in the repository. The repository must + * remain readable by all node versions which support this {@link IndexVersion}. + */ + private final IndexVersion repositoryFormatIndexVersion; + + /** + * Whether the {@link #repositoryFormatIndexVersion} is new enough to support naming {@link BlobStoreIndexShardSnapshots} blobs with + * UUIDs (i.e. does not need to remain compatible with versions before v7.6.0). Older repositories use (unsafe) numeric indices for + * these blobs instead. + */ + private final boolean useShardGenerations; + + /** + * All blobs in the repository root at the start of the operation, obtained by listing the repository contents. Note that this may + * include some blobs which are no longer referenced by the current {@link RepositoryData}, but which have not yet been removed by + * the cleanup that follows an earlier deletion. This cleanup may still be ongoing (we do not wait for it to complete before + * starting the next repository operation) or it may have failed before completion (it could have been running on a different node, + * which crashed for unrelated reasons) so we track all the blobs here and clean them up again at the end. + */ + private final Map originalRootBlobs; + + /** + * All index containers at the start of the operation, obtained by listing the repository contents. Note that this may include some + * containers which are no longer referenced by the current {@link RepositoryData}, but which have not yet been removed by + * the cleanup that follows an earlier deletion. This cleanup may or may not still be ongoing (it could have been running on a + * different node, which died before completing it) so we track all the blobs here and clean them up again at the end. + */ + private final Map originalIndexContainers; + + /** + * The {@link RepositoryData} at the start of the operation, obtained after verifying that {@link #originalRootBlobs} contains no + * {@link RepositoryData} blob newer than the one identified by {@link #originalRepositoryDataGeneration}. + */ + private final RepositoryData originalRepositoryData; + + /** + * Executor to use for all repository interactions. + */ + private final Executor snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + + SnapshotsDeletion( + Collection snapshotIds, + long originalRepositoryDataGeneration, + IndexVersion repositoryFormatIndexVersion, + Map originalRootBlobs, + Map originalIndexContainers, + RepositoryData originalRepositoryData + ) { + this.snapshotIds = snapshotIds; + this.originalRepositoryDataGeneration = originalRepositoryDataGeneration; + this.repositoryFormatIndexVersion = repositoryFormatIndexVersion; + this.useShardGenerations = SnapshotsService.useShardGenerations(repositoryFormatIndexVersion); + this.originalRootBlobs = originalRootBlobs; + this.originalIndexContainers = originalIndexContainers; + this.originalRepositoryData = originalRepositoryData; + } + + /** + * The result of removing a snapshot from a shard folder in the repository. + * + * @param indexId Index that the snapshot was removed from + * @param shardId Shard id that the snapshot was removed from + * @param newGeneration Id of the new index-${uuid} blob that does not include the snapshot any more + * @param blobsToDelete Blob names in the shard directory that have become unreferenced in the new shard generation + */ + private record ShardSnapshotMetaDeleteResult( + IndexId indexId, + int shardId, + ShardGeneration newGeneration, + Collection blobsToDelete + ) {} + + // --------------------------------------------------------------------------------------------------------------------------------- + // The overall flow of execution + + private void runDelete(SnapshotDeleteListener listener) { + if (useShardGenerations) { + // First write the new shard state metadata (with the removed snapshot) and compute deletion targets + final ListenableFuture> writeShardMetaDataAndComputeDeletesStep = + new ListenableFuture<>(); + writeUpdatedShardMetaDataAndComputeDeletes(writeShardMetaDataAndComputeDeletesStep); + // Once we have put the new shard-level metadata into place, we can update the repository metadata as follows: + // 1. Remove the snapshots from the list of existing snapshots + // 2. Update the index shard generations of all updated shard folders + // + // Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created + // index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only + // written if all shard paths have been successfully updated. + final ListenableFuture writeUpdatedRepoDataStep = new ListenableFuture<>(); + writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(shardDeleteResults -> { + final ShardGenerations.Builder builder = ShardGenerations.builder(); + for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults) { + builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); + } + final RepositoryData newRepositoryData = originalRepositoryData.removeSnapshots(snapshotIds, builder.build()); + writeIndexGen( + newRepositoryData, + originalRepositoryDataGeneration, + repositoryFormatIndexVersion, + Function.identity(), + ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure) + ); + }, listener::onFailure)); + // Once we have updated the repository, run the clean-ups + writeUpdatedRepoDataStep.addListener(ActionListener.wrap(newRepositoryData -> { + listener.onRepositoryDataWritten(newRepositoryData); + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + try (var refs = new RefCountingRunnable(listener::onDone)) { + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + cleanupUnlinkedShardLevelBlobs(writeShardMetaDataAndComputeDeletesStep.result(), refs.acquireListener()); + } + }, listener::onFailure)); } else { - if (indexGeneration < 0L) { - writtenGeneration = ShardGeneration.newGeneration(); - INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration.toBlobNamePart(), compress); - } else { - writtenGeneration = new ShardGeneration(indexGeneration); - writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots, Collections.emptyMap()); - } - final Set survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet()); - return new ShardSnapshotMetaDeleteResult( - indexId, - shardId, - writtenGeneration, - unusedBlobs(originalShardBlobs, survivingSnapshotUUIDs, updatedSnapshots) + // Write the new repository data first (with the removed snapshot), using no shard generations + writeIndexGen( + originalRepositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY), + originalRepositoryDataGeneration, + repositoryFormatIndexVersion, + Function.identity(), + ActionListener.wrap(newRepositoryData -> { + try (var refs = new RefCountingRunnable(() -> { + listener.onRepositoryDataWritten(newRepositoryData); + listener.onDone(); + })) { + // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion + cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener()); + + // writeIndexGen finishes on master-service thread so must fork here. + snapshotExecutor.execute( + ActionRunnable.wrap( + refs.acquireListener(), + l0 -> writeUpdatedShardMetaDataAndComputeDeletes( + l0.delegateFailure((l, shardDeleteResults) -> cleanupUnlinkedShardLevelBlobs(shardDeleteResults, l)) + ) + ) + ); + } + }, listener::onFailure) ); } - } catch (IOException e) { - throw new RepositoryException( - metadata.name(), - "Failed to finalize snapshot deletion " - + snapshotIds - + " with shard index [" - + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(writtenGeneration.toBlobNamePart()) - + "]", - e - ); } - } - // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all - // temporary blobs - private static List unusedBlobs( - Set originalShardBlobs, - Set survivingSnapshotUUIDs, - BlobStoreIndexShardSnapshots updatedSnapshots - ) { - return originalShardBlobs.stream() - .filter( - blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX) - || (blob.startsWith(SNAPSHOT_PREFIX) - && blob.endsWith(".dat") - && survivingSnapshotUUIDs.contains( - blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) - ) == false) - || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) - || FsBlobContainer.isTempBlobName(blob) - ) - .toList(); - } + // --------------------------------------------------------------------------------------------------------------------------------- + // Updating the shard-level metadata and accumulating results - // --------------------------------------------------------------------------------------------------------------------------------- - // Cleaning up dangling blobs + // updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up. + private void writeUpdatedShardMetaDataAndComputeDeletes( + ActionListener> onAllShardsCompleted + ) { - /** - * Delete any dangling blobs in the repository root (i.e. {@link RepositoryData}, {@link SnapshotInfo} and {@link Metadata} blobs) - * as well as any containers for indices that are now completely unreferenced. - */ - private void cleanupUnlinkedRootAndIndicesBlobs( - Collection snapshotIds, - Map originalIndexContainers, - Map originalRootBlobs, - RepositoryData newRepositoryData, - ActionListener listener - ) { - cleanupStaleBlobs(snapshotIds, originalIndexContainers, originalRootBlobs, newRepositoryData, listener.map(ignored -> null)); - } + final List indices = originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds); - private void cleanupUnlinkedShardLevelBlobs( - RepositoryData originalRepositoryData, - Collection snapshotIds, - Collection shardDeleteResults, - ActionListener listener - ) { - final Iterator filesToDelete = resolveFilesToDelete(originalRepositoryData, snapshotIds, shardDeleteResults); - if (filesToDelete.hasNext() == false) { - listener.onResponse(null); - return; - } - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { - try { - deleteFromContainer(blobContainer(), filesToDelete); - l.onResponse(null); - } catch (Exception e) { - logger.warn(() -> format("%s Failed to delete some blobs during snapshot delete", snapshotIds), e); - throw e; + if (indices.isEmpty()) { + onAllShardsCompleted.onResponse(Collections.emptyList()); + return; } - })); - } - private Iterator resolveFilesToDelete( - RepositoryData oldRepositoryData, - Collection snapshotIds, - Collection deleteResults - ) { - final String basePath = basePath().buildAsString(); - final int basePathLen = basePath.length(); - final Map> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots( - snapshotIds - ); - return Stream.concat(deleteResults.stream().flatMap(shardResult -> { - final String shardPath = shardPath(shardResult.indexId, shardResult.shardId).buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); - }), indexMetaGenerations.entrySet().stream().flatMap(entry -> { - final String indexContainerPath = indexPath(entry.getKey()).buildAsString(); - return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id)); - })).map(absolutePath -> { - assert absolutePath.startsWith(basePath); - return absolutePath.substring(basePathLen); - }).iterator(); + // Listener that flattens out the delete results for each index + final ActionListener> deleteIndexMetadataListener = new GroupedActionListener<>( + indices.size(), + onAllShardsCompleted.map(res -> res.stream().flatMap(Collection::stream).toList()) + ); + + for (IndexId indexId : indices) { + final Set snapshotsWithIndex = Set.copyOf(originalRepositoryData.getSnapshots(indexId)); + final Set survivingSnapshots = snapshotsWithIndex.stream() + .filter(id -> snapshotIds.contains(id) == false) + .collect(Collectors.toSet()); + final ListenableFuture> shardCountListener = new ListenableFuture<>(); + final Collection indexMetaGenerations = snapshotIds.stream() + .filter(snapshotsWithIndex::contains) + .map(id -> originalRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)) + .collect(Collectors.toSet()); + final ActionListener allShardCountsListener = new GroupedActionListener<>( + indexMetaGenerations.size(), + shardCountListener + ); + final BlobContainer indexContainer = indexContainer(indexId); + for (String indexMetaGeneration : indexMetaGenerations) { + snapshotExecutor.execute(ActionRunnable.supply(allShardCountsListener, () -> { + try { + return INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry) + .getNumberOfShards(); + } catch (Exception ex) { + logger.warn( + () -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()), + ex + ); + // Just invoke the listener without any shard generations to count it down, this index will be cleaned up + // by the stale data cleanup in the end. + // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just + // ignoring it and letting the cleanup deal with it. + return null; + } + })); + } + + // ------------------------------------------------------------------------------------------------------------------------- + // Determining the shard count + + shardCountListener.addListener(deleteIndexMetadataListener.delegateFailureAndWrap((delegate, counts) -> { + final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0); + if (shardCount == 0) { + delegate.onResponse(null); + return; + } + // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index + final ActionListener allShardsListener = new GroupedActionListener<>( + shardCount, + delegate + ); + for (int i = 0; i < shardCount; i++) { + final int shardId = i; + snapshotExecutor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + final BlobContainer shardContainer = shardContainer(indexId, shardId); + final Set originalShardBlobs = shardContainer.listBlobs(OperationPurpose.SNAPSHOT).keySet(); + final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots; + final long newGen; + if (useShardGenerations) { + newGen = -1L; + blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots( + originalShardBlobs, + shardContainer, + originalRepositoryData.shardGenerations().getShardGen(indexId, shardId) + ).v1(); + } else { + Tuple tuple = buildBlobStoreIndexShardSnapshots( + originalShardBlobs, + shardContainer + ); + newGen = tuple.v2() + 1; + blobStoreIndexShardSnapshots = tuple.v1(); + } + allShardsListener.onResponse( + deleteFromShardSnapshotMeta( + survivingSnapshots, + indexId, + shardId, + snapshotIds, + shardContainer, + originalShardBlobs, + blobStoreIndexShardSnapshots, + newGen + ) + ); + } + + @Override + public void onFailure(Exception ex) { + logger.warn( + () -> format( + "%s failed to delete shard data for shard [%s][%s]", + snapshotIds, + indexId.getName(), + shardId + ), + ex + ); + // Just passing null here to count down the listener instead of failing it, the stale data left behind + // here will be retried in the next delete or repository cleanup + allShardsListener.onResponse(null); + } + }); + } + })); + } + } + + // ----------------------------------------------------------------------------------------------------------------------------- + // Updating each shard + + /** + * Delete snapshot from shard level metadata. + * + * @param indexGeneration generation to write the new shard level level metadata to. If negative a uuid id shard generation should + * be used + */ + private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta( + Set survivingSnapshots, + IndexId indexId, + int shardId, + Collection snapshotIds, + BlobContainer shardContainer, + Set originalShardBlobs, + BlobStoreIndexShardSnapshots snapshots, + long indexGeneration + ) { + // Build a list of snapshots that should be preserved + final BlobStoreIndexShardSnapshots updatedSnapshots = snapshots.withRetainedSnapshots(survivingSnapshots); + ShardGeneration writtenGeneration = null; + try { + if (updatedSnapshots.snapshots().isEmpty()) { + return new ShardSnapshotMetaDeleteResult(indexId, shardId, ShardGenerations.DELETED_SHARD_GEN, originalShardBlobs); + } else { + if (indexGeneration < 0L) { + writtenGeneration = ShardGeneration.newGeneration(); + INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration.toBlobNamePart(), compress); + } else { + writtenGeneration = new ShardGeneration(indexGeneration); + writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots, Collections.emptyMap()); + } + final Set survivingSnapshotUUIDs = survivingSnapshots.stream() + .map(SnapshotId::getUUID) + .collect(Collectors.toSet()); + return new ShardSnapshotMetaDeleteResult( + indexId, + shardId, + writtenGeneration, + unusedBlobs(originalShardBlobs, survivingSnapshotUUIDs, updatedSnapshots) + ); + } + } catch (IOException e) { + throw new RepositoryException( + metadata.name(), + "Failed to finalize snapshot deletion " + + snapshotIds + + " with shard index [" + + INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(writtenGeneration.toBlobNamePart()) + + "]", + e + ); + } + } + + // Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all + // temporary blobs + private static List unusedBlobs( + Set originalShardBlobs, + Set survivingSnapshotUUIDs, + BlobStoreIndexShardSnapshots updatedSnapshots + ) { + return originalShardBlobs.stream() + .filter( + blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX) + || (blob.startsWith(SNAPSHOT_PREFIX) + && blob.endsWith(".dat") + && survivingSnapshotUUIDs.contains( + blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length()) + ) == false) + || (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null) + || FsBlobContainer.isTempBlobName(blob) + ) + .toList(); + } + + // --------------------------------------------------------------------------------------------------------------------------------- + // Cleaning up dangling blobs + + /** + * Delete any dangling blobs in the repository root (i.e. {@link RepositoryData}, {@link SnapshotInfo} and {@link Metadata} blobs) + * as well as any containers for indices that are now completely unreferenced. + */ + private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData, ActionListener listener) { + cleanupStaleBlobs(snapshotIds, originalIndexContainers, originalRootBlobs, newRepositoryData, listener.map(ignored -> null)); + } + + private void cleanupUnlinkedShardLevelBlobs( + Collection shardDeleteResults, + ActionListener listener + ) { + final Iterator filesToDelete = resolveFilesToDelete(shardDeleteResults); + if (filesToDelete.hasNext() == false) { + listener.onResponse(null); + return; + } + snapshotExecutor.execute(ActionRunnable.wrap(listener, l -> { + try { + deleteFromContainer(blobContainer(), filesToDelete); + l.onResponse(null); + } catch (Exception e) { + logger.warn(() -> format("%s Failed to delete some blobs during snapshot delete", snapshotIds), e); + throw e; + } + })); + } + + private Iterator resolveFilesToDelete(Collection deleteResults) { + final String basePath = basePath().buildAsString(); + final int basePathLen = basePath.length(); + final Map> indexMetaGenerations = originalRepositoryData + .indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds); + return Stream.concat(deleteResults.stream().flatMap(shardResult -> { + final String shardPath = shardPath(shardResult.indexId, shardResult.shardId).buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), indexMetaGenerations.entrySet().stream().flatMap(entry -> { + final String indexContainerPath = indexPath(entry.getKey()).buildAsString(); + return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id)); + })).map(absolutePath -> { + assert absolutePath.startsWith(basePath); + return absolutePath.substring(basePathLen); + }).iterator(); + } } /**