mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 01:22:26 -04:00
Encapsulate snapshots deletion process (#100617)
Introduces the `SnapshotsDeletion` class which encapsulates the process of deleting some collection of snapshots. In particular this class gives us somewhere to store various deletion-wide data which significantly reduces the length of some argument lists. Relates #100568
This commit is contained in:
parent
f35c3b49b5
commit
b5843e4b98
3 changed files with 443 additions and 409 deletions
|
@ -167,7 +167,7 @@ teardown:
|
|||
- match: {count: 3}
|
||||
|
||||
- do:
|
||||
catch: /cannot delete snapshot from a readonly repository/
|
||||
catch: /repository is readonly/
|
||||
snapshot.delete:
|
||||
repository: repository-url
|
||||
snapshot: snapshot-two
|
||||
|
@ -229,7 +229,7 @@ teardown:
|
|||
- match: {count: 3}
|
||||
|
||||
- do:
|
||||
catch: /cannot delete snapshot from a readonly repository/
|
||||
catch: /repository is readonly/
|
||||
snapshot.delete:
|
||||
repository: repository-file
|
||||
snapshot: snapshot-one
|
||||
|
|
|
@ -1050,7 +1050,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertRequestBuilderThrows(
|
||||
client.admin().cluster().prepareDeleteSnapshot("readonly-repo", "test-snap"),
|
||||
RepositoryException.class,
|
||||
"cannot delete snapshot from a readonly repository"
|
||||
"repository is readonly"
|
||||
);
|
||||
|
||||
logger.info("--> try making another snapshot");
|
||||
|
|
|
@ -836,431 +836,465 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
IndexVersion repositoryFormatIndexVersion,
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
} else {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final Map<String, BlobMetadata> rootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT);
|
||||
final RepositoryData repositoryData = safeRepositoryData(repositoryDataGeneration, rootBlobs);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath())
|
||||
.children(OperationPurpose.SNAPSHOT);
|
||||
doDeleteShardSnapshots(
|
||||
snapshotIds,
|
||||
repositoryDataGeneration,
|
||||
foundIndices,
|
||||
rootBlobs,
|
||||
repositoryData,
|
||||
repositoryFormatIndexVersion,
|
||||
listener
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshots " + snapshotIds, e));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of removing a snapshot from a shard folder in the repository.
|
||||
*
|
||||
* @param indexId Index that the snapshot was removed from
|
||||
* @param shardId Shard id that the snapshot was removed from
|
||||
* @param newGeneration Id of the new index-${uuid} blob that does not include the snapshot any more
|
||||
* @param blobsToDelete Blob names in the shard directory that have become unreferenced in the new shard generation
|
||||
*/
|
||||
private record ShardSnapshotMetaDeleteResult(
|
||||
IndexId indexId,
|
||||
int shardId,
|
||||
ShardGeneration newGeneration,
|
||||
Collection<String> blobsToDelete
|
||||
) {}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// The overall flow of execution
|
||||
|
||||
/**
|
||||
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
|
||||
* and then has all now unreferenced blobs in it deleted.
|
||||
*
|
||||
* @param snapshotIds SnapshotIds to delete
|
||||
* @param originalRepositoryDataGeneration {@link RepositoryData} generation at the start of the process.
|
||||
* @param originalIndexContainers All index containers at the start of the operation, obtained by listing the repository
|
||||
* contents.
|
||||
* @param originalRootBlobs All blobs found at the root of the repository at the start of the operation, obtained by
|
||||
* listing the repository contents.
|
||||
* @param originalRepositoryData {@link RepositoryData} at the start of the operation.
|
||||
* @param repositoryFormatIndexVersion The minimum {@link IndexVersion} of the nodes in the cluster and the snapshots remaining in
|
||||
* the repository.
|
||||
* @param listener Listener to invoke once finished
|
||||
*/
|
||||
private void doDeleteShardSnapshots(
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
long originalRepositoryDataGeneration,
|
||||
Map<String, BlobContainer> originalIndexContainers,
|
||||
Map<String, BlobMetadata> originalRootBlobs,
|
||||
RepositoryData originalRepositoryData,
|
||||
IndexVersion repositoryFormatIndexVersion,
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
if (SnapshotsService.useShardGenerations(repositoryFormatIndexVersion)) {
|
||||
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
||||
final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep =
|
||||
new ListenableFuture<>();
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, originalRepositoryData, true, writeShardMetaDataAndComputeDeletesStep);
|
||||
// Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
|
||||
// 1. Remove the snapshots from the list of existing snapshots
|
||||
// 2. Update the index shard generations of all updated shard folders
|
||||
//
|
||||
// Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
|
||||
// index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
|
||||
// written if all shard paths have been successfully updated.
|
||||
final ListenableFuture<RepositoryData> writeUpdatedRepoDataStep = new ListenableFuture<>();
|
||||
writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(shardDeleteResults -> {
|
||||
final ShardGenerations.Builder builder = ShardGenerations.builder();
|
||||
for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults) {
|
||||
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
||||
}
|
||||
final RepositoryData newRepositoryData = originalRepositoryData.removeSnapshots(snapshotIds, builder.build());
|
||||
writeIndexGen(
|
||||
newRepositoryData,
|
||||
originalRepositoryDataGeneration,
|
||||
repositoryFormatIndexVersion,
|
||||
Function.identity(),
|
||||
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
|
||||
);
|
||||
}, listener::onFailure));
|
||||
// Once we have updated the repository, run the clean-ups
|
||||
writeUpdatedRepoDataStep.addListener(ActionListener.wrap(newRepositoryData -> {
|
||||
listener.onRepositoryDataWritten(newRepositoryData);
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
try (var refs = new RefCountingRunnable(listener::onDone)) {
|
||||
cleanupUnlinkedRootAndIndicesBlobs(
|
||||
snapshotIds,
|
||||
originalIndexContainers,
|
||||
originalRootBlobs,
|
||||
newRepositoryData,
|
||||
refs.acquireListener()
|
||||
);
|
||||
cleanupUnlinkedShardLevelBlobs(
|
||||
originalRepositoryData,
|
||||
snapshotIds,
|
||||
writeShardMetaDataAndComputeDeletesStep.result(),
|
||||
refs.acquireListener()
|
||||
);
|
||||
}
|
||||
}, listener::onFailure));
|
||||
} else {
|
||||
// Write the new repository data first (with the removed snapshot), using no shard generations
|
||||
writeIndexGen(
|
||||
originalRepositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY),
|
||||
originalRepositoryDataGeneration,
|
||||
repositoryFormatIndexVersion,
|
||||
Function.identity(),
|
||||
ActionListener.wrap(newRepositoryData -> {
|
||||
try (var refs = new RefCountingRunnable(() -> {
|
||||
listener.onRepositoryDataWritten(newRepositoryData);
|
||||
listener.onDone();
|
||||
})) {
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
cleanupUnlinkedRootAndIndicesBlobs(
|
||||
snapshotIds,
|
||||
originalIndexContainers,
|
||||
originalRootBlobs,
|
||||
newRepositoryData,
|
||||
refs.acquireListener()
|
||||
);
|
||||
|
||||
// writeIndexGen finishes on master-service thread so must fork here.
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT)
|
||||
.execute(
|
||||
ActionRunnable.wrap(
|
||||
refs.acquireListener(),
|
||||
l0 -> writeUpdatedShardMetaDataAndComputeDeletes(
|
||||
snapshotIds,
|
||||
originalRepositoryData,
|
||||
false,
|
||||
l0.delegateFailure(
|
||||
(l, deleteResults) -> cleanupUnlinkedShardLevelBlobs(
|
||||
originalRepositoryData,
|
||||
snapshotIds,
|
||||
deleteResults,
|
||||
l
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}, listener::onFailure)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// Updating the shard-level metadata and accumulating results
|
||||
|
||||
// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
|
||||
private void writeUpdatedShardMetaDataAndComputeDeletes(
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
RepositoryData originalRepositoryData,
|
||||
boolean useShardGenerations,
|
||||
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
|
||||
) {
|
||||
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
final List<IndexId> indices = originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);
|
||||
|
||||
if (indices.isEmpty()) {
|
||||
onAllShardsCompleted.onResponse(Collections.emptyList());
|
||||
return;
|
||||
}
|
||||
|
||||
// Listener that flattens out the delete results for each index
|
||||
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetadataListener = new GroupedActionListener<>(
|
||||
indices.size(),
|
||||
onAllShardsCompleted.map(res -> res.stream().flatMap(Collection::stream).toList())
|
||||
);
|
||||
|
||||
for (IndexId indexId : indices) {
|
||||
final Set<SnapshotId> snapshotsWithIndex = Set.copyOf(originalRepositoryData.getSnapshots(indexId));
|
||||
final Set<SnapshotId> survivingSnapshots = snapshotsWithIndex.stream()
|
||||
.filter(id -> snapshotIds.contains(id) == false)
|
||||
.collect(Collectors.toSet());
|
||||
final ListenableFuture<Collection<Integer>> shardCountListener = new ListenableFuture<>();
|
||||
final Collection<String> indexMetaGenerations = snapshotIds.stream()
|
||||
.filter(snapshotsWithIndex::contains)
|
||||
.map(id -> originalRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId))
|
||||
.collect(Collectors.toSet());
|
||||
final ActionListener<Integer> allShardCountsListener = new GroupedActionListener<>(
|
||||
indexMetaGenerations.size(),
|
||||
shardCountListener
|
||||
);
|
||||
final BlobContainer indexContainer = indexContainer(indexId);
|
||||
for (String indexMetaGeneration : indexMetaGenerations) {
|
||||
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
||||
try {
|
||||
return INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry)
|
||||
.getNumberOfShards();
|
||||
} catch (Exception ex) {
|
||||
logger.warn(
|
||||
() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()),
|
||||
ex
|
||||
);
|
||||
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
||||
// by the stale data cleanup in the end.
|
||||
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
||||
// ignoring it and letting the cleanup deal with it.
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
createSnapshotsDeletion(snapshotIds, repositoryDataGeneration, repositoryFormatIndexVersion, new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(SnapshotsDeletion snapshotsDeletion) {
|
||||
snapshotsDeletion.runDelete(listener);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------------------
|
||||
// Determining the shard count
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
shardCountListener.addListener(deleteIndexMetadataListener.delegateFailureAndWrap((delegate, counts) -> {
|
||||
final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0);
|
||||
if (shardCount == 0) {
|
||||
delegate.onResponse(null);
|
||||
return;
|
||||
}
|
||||
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
|
||||
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(shardCount, delegate);
|
||||
for (int i = 0; i < shardCount; i++) {
|
||||
final int shardId = i;
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobContainer shardContainer = shardContainer(indexId, shardId);
|
||||
final Set<String> originalShardBlobs = shardContainer.listBlobs(OperationPurpose.SNAPSHOT).keySet();
|
||||
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
|
||||
final long newGen;
|
||||
if (useShardGenerations) {
|
||||
newGen = -1L;
|
||||
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
|
||||
originalShardBlobs,
|
||||
shardContainer,
|
||||
originalRepositoryData.shardGenerations().getShardGen(indexId, shardId)
|
||||
).v1();
|
||||
} else {
|
||||
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(
|
||||
originalShardBlobs,
|
||||
shardContainer
|
||||
);
|
||||
newGen = tuple.v2() + 1;
|
||||
blobStoreIndexShardSnapshots = tuple.v1();
|
||||
}
|
||||
allShardsListener.onResponse(
|
||||
deleteFromShardSnapshotMeta(
|
||||
survivingSnapshots,
|
||||
indexId,
|
||||
shardId,
|
||||
snapshotIds,
|
||||
shardContainer,
|
||||
originalShardBlobs,
|
||||
blobStoreIndexShardSnapshots,
|
||||
newGen
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
logger.warn(
|
||||
() -> format("%s failed to delete shard data for shard [%s][%s]", snapshotIds, indexId.getName(), shardId),
|
||||
ex
|
||||
);
|
||||
// Just passing null here to count down the listener instead of failing it, the stale data left behind
|
||||
// here will be retried in the next delete or repository cleanup
|
||||
allShardsListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
private void createSnapshotsDeletion(
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryDataGeneration,
|
||||
IndexVersion repositoryFormatIndexVersion,
|
||||
ActionListener<SnapshotsDeletion> listener
|
||||
) {
|
||||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "repository is readonly"));
|
||||
} else {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
||||
final var originalRootBlobs = blobContainer().listBlobs(OperationPurpose.SNAPSHOT);
|
||||
return new SnapshotsDeletion(
|
||||
snapshotIds,
|
||||
repositoryDataGeneration,
|
||||
repositoryFormatIndexVersion,
|
||||
originalRootBlobs,
|
||||
blobStore().blobContainer(indicesPath()).children(OperationPurpose.SNAPSHOT),
|
||||
safeRepositoryData(repositoryDataGeneration, originalRootBlobs)
|
||||
);
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------------------
|
||||
// Updating each shard
|
||||
|
||||
/**
|
||||
* Delete snapshot from shard level metadata.
|
||||
*
|
||||
* @param indexGeneration generation to write the new shard level level metadata to. If negative a uuid id shard generation should be
|
||||
* used
|
||||
* <p>
|
||||
* Represents the process of deleting some collection of snapshots within this repository which since 7.6.0 looks like this:
|
||||
* </p>
|
||||
* <ul>
|
||||
* <li>Write a new {@link BlobStoreIndexShardSnapshots} for each affected shard, and compute the blobs to delete.</li>
|
||||
* <li>Update the {@link RepositoryData} to remove references to deleted snapshots/indices and point to the new
|
||||
* {@link BlobStoreIndexShardSnapshots} files.</li>
|
||||
* <li>Remove up any now-unreferenced blobs.</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* Until the {@link RepositoryData} is updated there should be no other activities in the repository, and in particular the root
|
||||
* blob must not change until it is updated by this deletion and {@link SnapshotDeleteListener#onRepositoryDataWritten} is called.
|
||||
* </p>
|
||||
*/
|
||||
private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
|
||||
Set<SnapshotId> survivingSnapshots,
|
||||
IndexId indexId,
|
||||
int shardId,
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
BlobContainer shardContainer,
|
||||
Set<String> originalShardBlobs,
|
||||
BlobStoreIndexShardSnapshots snapshots,
|
||||
long indexGeneration
|
||||
) {
|
||||
// Build a list of snapshots that should be preserved
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = snapshots.withRetainedSnapshots(survivingSnapshots);
|
||||
ShardGeneration writtenGeneration = null;
|
||||
try {
|
||||
if (updatedSnapshots.snapshots().isEmpty()) {
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, shardId, ShardGenerations.DELETED_SHARD_GEN, originalShardBlobs);
|
||||
class SnapshotsDeletion {
|
||||
|
||||
/**
|
||||
* The IDs of the snapshots to delete.
|
||||
*/
|
||||
private final Collection<SnapshotId> snapshotIds;
|
||||
|
||||
/**
|
||||
* The {@link RepositoryData} generation at the start of the process, to ensure that the {@link RepositoryData} does not change
|
||||
* while the new {@link BlobStoreIndexShardSnapshots} are being written.
|
||||
*/
|
||||
private final long originalRepositoryDataGeneration;
|
||||
|
||||
/**
|
||||
* The minimum {@link IndexVersion} of the nodes in the cluster and the snapshots remaining in the repository. The repository must
|
||||
* remain readable by all node versions which support this {@link IndexVersion}.
|
||||
*/
|
||||
private final IndexVersion repositoryFormatIndexVersion;
|
||||
|
||||
/**
|
||||
* Whether the {@link #repositoryFormatIndexVersion} is new enough to support naming {@link BlobStoreIndexShardSnapshots} blobs with
|
||||
* UUIDs (i.e. does not need to remain compatible with versions before v7.6.0). Older repositories use (unsafe) numeric indices for
|
||||
* these blobs instead.
|
||||
*/
|
||||
private final boolean useShardGenerations;
|
||||
|
||||
/**
|
||||
* All blobs in the repository root at the start of the operation, obtained by listing the repository contents. Note that this may
|
||||
* include some blobs which are no longer referenced by the current {@link RepositoryData}, but which have not yet been removed by
|
||||
* the cleanup that follows an earlier deletion. This cleanup may still be ongoing (we do not wait for it to complete before
|
||||
* starting the next repository operation) or it may have failed before completion (it could have been running on a different node,
|
||||
* which crashed for unrelated reasons) so we track all the blobs here and clean them up again at the end.
|
||||
*/
|
||||
private final Map<String, BlobMetadata> originalRootBlobs;
|
||||
|
||||
/**
|
||||
* All index containers at the start of the operation, obtained by listing the repository contents. Note that this may include some
|
||||
* containers which are no longer referenced by the current {@link RepositoryData}, but which have not yet been removed by
|
||||
* the cleanup that follows an earlier deletion. This cleanup may or may not still be ongoing (it could have been running on a
|
||||
* different node, which died before completing it) so we track all the blobs here and clean them up again at the end.
|
||||
*/
|
||||
private final Map<String, BlobContainer> originalIndexContainers;
|
||||
|
||||
/**
|
||||
* The {@link RepositoryData} at the start of the operation, obtained after verifying that {@link #originalRootBlobs} contains no
|
||||
* {@link RepositoryData} blob newer than the one identified by {@link #originalRepositoryDataGeneration}.
|
||||
*/
|
||||
private final RepositoryData originalRepositoryData;
|
||||
|
||||
/**
|
||||
* Executor to use for all repository interactions.
|
||||
*/
|
||||
private final Executor snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
|
||||
SnapshotsDeletion(
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
long originalRepositoryDataGeneration,
|
||||
IndexVersion repositoryFormatIndexVersion,
|
||||
Map<String, BlobMetadata> originalRootBlobs,
|
||||
Map<String, BlobContainer> originalIndexContainers,
|
||||
RepositoryData originalRepositoryData
|
||||
) {
|
||||
this.snapshotIds = snapshotIds;
|
||||
this.originalRepositoryDataGeneration = originalRepositoryDataGeneration;
|
||||
this.repositoryFormatIndexVersion = repositoryFormatIndexVersion;
|
||||
this.useShardGenerations = SnapshotsService.useShardGenerations(repositoryFormatIndexVersion);
|
||||
this.originalRootBlobs = originalRootBlobs;
|
||||
this.originalIndexContainers = originalIndexContainers;
|
||||
this.originalRepositoryData = originalRepositoryData;
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of removing a snapshot from a shard folder in the repository.
|
||||
*
|
||||
* @param indexId Index that the snapshot was removed from
|
||||
* @param shardId Shard id that the snapshot was removed from
|
||||
* @param newGeneration Id of the new index-${uuid} blob that does not include the snapshot any more
|
||||
* @param blobsToDelete Blob names in the shard directory that have become unreferenced in the new shard generation
|
||||
*/
|
||||
private record ShardSnapshotMetaDeleteResult(
|
||||
IndexId indexId,
|
||||
int shardId,
|
||||
ShardGeneration newGeneration,
|
||||
Collection<String> blobsToDelete
|
||||
) {}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// The overall flow of execution
|
||||
|
||||
private void runDelete(SnapshotDeleteListener listener) {
|
||||
if (useShardGenerations) {
|
||||
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
||||
final ListenableFuture<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep =
|
||||
new ListenableFuture<>();
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(writeShardMetaDataAndComputeDeletesStep);
|
||||
// Once we have put the new shard-level metadata into place, we can update the repository metadata as follows:
|
||||
// 1. Remove the snapshots from the list of existing snapshots
|
||||
// 2. Update the index shard generations of all updated shard folders
|
||||
//
|
||||
// Note: If we fail updating any of the individual shard paths, none of them are changed since the newly created
|
||||
// index-${gen_uuid} will not be referenced by the existing RepositoryData and new RepositoryData is only
|
||||
// written if all shard paths have been successfully updated.
|
||||
final ListenableFuture<RepositoryData> writeUpdatedRepoDataStep = new ListenableFuture<>();
|
||||
writeShardMetaDataAndComputeDeletesStep.addListener(ActionListener.wrap(shardDeleteResults -> {
|
||||
final ShardGenerations.Builder builder = ShardGenerations.builder();
|
||||
for (ShardSnapshotMetaDeleteResult newGen : shardDeleteResults) {
|
||||
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
||||
}
|
||||
final RepositoryData newRepositoryData = originalRepositoryData.removeSnapshots(snapshotIds, builder.build());
|
||||
writeIndexGen(
|
||||
newRepositoryData,
|
||||
originalRepositoryDataGeneration,
|
||||
repositoryFormatIndexVersion,
|
||||
Function.identity(),
|
||||
ActionListener.wrap(writeUpdatedRepoDataStep::onResponse, listener::onFailure)
|
||||
);
|
||||
}, listener::onFailure));
|
||||
// Once we have updated the repository, run the clean-ups
|
||||
writeUpdatedRepoDataStep.addListener(ActionListener.wrap(newRepositoryData -> {
|
||||
listener.onRepositoryDataWritten(newRepositoryData);
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
try (var refs = new RefCountingRunnable(listener::onDone)) {
|
||||
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
|
||||
cleanupUnlinkedShardLevelBlobs(writeShardMetaDataAndComputeDeletesStep.result(), refs.acquireListener());
|
||||
}
|
||||
}, listener::onFailure));
|
||||
} else {
|
||||
if (indexGeneration < 0L) {
|
||||
writtenGeneration = ShardGeneration.newGeneration();
|
||||
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration.toBlobNamePart(), compress);
|
||||
} else {
|
||||
writtenGeneration = new ShardGeneration(indexGeneration);
|
||||
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots, Collections.emptyMap());
|
||||
}
|
||||
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
|
||||
return new ShardSnapshotMetaDeleteResult(
|
||||
indexId,
|
||||
shardId,
|
||||
writtenGeneration,
|
||||
unusedBlobs(originalShardBlobs, survivingSnapshotUUIDs, updatedSnapshots)
|
||||
// Write the new repository data first (with the removed snapshot), using no shard generations
|
||||
writeIndexGen(
|
||||
originalRepositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY),
|
||||
originalRepositoryDataGeneration,
|
||||
repositoryFormatIndexVersion,
|
||||
Function.identity(),
|
||||
ActionListener.wrap(newRepositoryData -> {
|
||||
try (var refs = new RefCountingRunnable(() -> {
|
||||
listener.onRepositoryDataWritten(newRepositoryData);
|
||||
listener.onDone();
|
||||
})) {
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
cleanupUnlinkedRootAndIndicesBlobs(newRepositoryData, refs.acquireListener());
|
||||
|
||||
// writeIndexGen finishes on master-service thread so must fork here.
|
||||
snapshotExecutor.execute(
|
||||
ActionRunnable.wrap(
|
||||
refs.acquireListener(),
|
||||
l0 -> writeUpdatedShardMetaDataAndComputeDeletes(
|
||||
l0.delegateFailure((l, shardDeleteResults) -> cleanupUnlinkedShardLevelBlobs(shardDeleteResults, l))
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}, listener::onFailure)
|
||||
);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RepositoryException(
|
||||
metadata.name(),
|
||||
"Failed to finalize snapshot deletion "
|
||||
+ snapshotIds
|
||||
+ " with shard index ["
|
||||
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(writtenGeneration.toBlobNamePart())
|
||||
+ "]",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
|
||||
// temporary blobs
|
||||
private static List<String> unusedBlobs(
|
||||
Set<String> originalShardBlobs,
|
||||
Set<String> survivingSnapshotUUIDs,
|
||||
BlobStoreIndexShardSnapshots updatedSnapshots
|
||||
) {
|
||||
return originalShardBlobs.stream()
|
||||
.filter(
|
||||
blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
||||
|| (blob.startsWith(SNAPSHOT_PREFIX)
|
||||
&& blob.endsWith(".dat")
|
||||
&& survivingSnapshotUUIDs.contains(
|
||||
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())
|
||||
) == false)
|
||||
|| (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||
|| FsBlobContainer.isTempBlobName(blob)
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// Updating the shard-level metadata and accumulating results
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// Cleaning up dangling blobs
|
||||
// updates the shard state metadata for shards of a snapshot that is to be deleted. Also computes the files to be cleaned up.
|
||||
private void writeUpdatedShardMetaDataAndComputeDeletes(
|
||||
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> onAllShardsCompleted
|
||||
) {
|
||||
|
||||
/**
|
||||
* Delete any dangling blobs in the repository root (i.e. {@link RepositoryData}, {@link SnapshotInfo} and {@link Metadata} blobs)
|
||||
* as well as any containers for indices that are now completely unreferenced.
|
||||
*/
|
||||
private void cleanupUnlinkedRootAndIndicesBlobs(
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
Map<String, BlobContainer> originalIndexContainers,
|
||||
Map<String, BlobMetadata> originalRootBlobs,
|
||||
RepositoryData newRepositoryData,
|
||||
ActionListener<Void> listener
|
||||
) {
|
||||
cleanupStaleBlobs(snapshotIds, originalIndexContainers, originalRootBlobs, newRepositoryData, listener.map(ignored -> null));
|
||||
}
|
||||
final List<IndexId> indices = originalRepositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotIds);
|
||||
|
||||
private void cleanupUnlinkedShardLevelBlobs(
|
||||
RepositoryData originalRepositoryData,
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
Collection<ShardSnapshotMetaDeleteResult> shardDeleteResults,
|
||||
ActionListener<Void> listener
|
||||
) {
|
||||
final Iterator<String> filesToDelete = resolveFilesToDelete(originalRepositoryData, snapshotIds, shardDeleteResults);
|
||||
if (filesToDelete.hasNext() == false) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
|
||||
try {
|
||||
deleteFromContainer(blobContainer(), filesToDelete);
|
||||
l.onResponse(null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(() -> format("%s Failed to delete some blobs during snapshot delete", snapshotIds), e);
|
||||
throw e;
|
||||
if (indices.isEmpty()) {
|
||||
onAllShardsCompleted.onResponse(Collections.emptyList());
|
||||
return;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private Iterator<String> resolveFilesToDelete(
|
||||
RepositoryData oldRepositoryData,
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults
|
||||
) {
|
||||
final String basePath = basePath().buildAsString();
|
||||
final int basePathLen = basePath.length();
|
||||
final Map<IndexId, Collection<String>> indexMetaGenerations = oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(
|
||||
snapshotIds
|
||||
);
|
||||
return Stream.concat(deleteResults.stream().flatMap(shardResult -> {
|
||||
final String shardPath = shardPath(shardResult.indexId, shardResult.shardId).buildAsString();
|
||||
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
||||
}), indexMetaGenerations.entrySet().stream().flatMap(entry -> {
|
||||
final String indexContainerPath = indexPath(entry.getKey()).buildAsString();
|
||||
return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id));
|
||||
})).map(absolutePath -> {
|
||||
assert absolutePath.startsWith(basePath);
|
||||
return absolutePath.substring(basePathLen);
|
||||
}).iterator();
|
||||
// Listener that flattens out the delete results for each index
|
||||
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetadataListener = new GroupedActionListener<>(
|
||||
indices.size(),
|
||||
onAllShardsCompleted.map(res -> res.stream().flatMap(Collection::stream).toList())
|
||||
);
|
||||
|
||||
for (IndexId indexId : indices) {
|
||||
final Set<SnapshotId> snapshotsWithIndex = Set.copyOf(originalRepositoryData.getSnapshots(indexId));
|
||||
final Set<SnapshotId> survivingSnapshots = snapshotsWithIndex.stream()
|
||||
.filter(id -> snapshotIds.contains(id) == false)
|
||||
.collect(Collectors.toSet());
|
||||
final ListenableFuture<Collection<Integer>> shardCountListener = new ListenableFuture<>();
|
||||
final Collection<String> indexMetaGenerations = snapshotIds.stream()
|
||||
.filter(snapshotsWithIndex::contains)
|
||||
.map(id -> originalRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId))
|
||||
.collect(Collectors.toSet());
|
||||
final ActionListener<Integer> allShardCountsListener = new GroupedActionListener<>(
|
||||
indexMetaGenerations.size(),
|
||||
shardCountListener
|
||||
);
|
||||
final BlobContainer indexContainer = indexContainer(indexId);
|
||||
for (String indexMetaGeneration : indexMetaGenerations) {
|
||||
snapshotExecutor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
||||
try {
|
||||
return INDEX_METADATA_FORMAT.read(metadata.name(), indexContainer, indexMetaGeneration, namedXContentRegistry)
|
||||
.getNumberOfShards();
|
||||
} catch (Exception ex) {
|
||||
logger.warn(
|
||||
() -> format("[%s] [%s] failed to read metadata for index", indexMetaGeneration, indexId.getName()),
|
||||
ex
|
||||
);
|
||||
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
||||
// by the stale data cleanup in the end.
|
||||
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
||||
// ignoring it and letting the cleanup deal with it.
|
||||
return null;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------------
|
||||
// Determining the shard count
|
||||
|
||||
shardCountListener.addListener(deleteIndexMetadataListener.delegateFailureAndWrap((delegate, counts) -> {
|
||||
final int shardCount = counts.stream().mapToInt(i -> i).max().orElse(0);
|
||||
if (shardCount == 0) {
|
||||
delegate.onResponse(null);
|
||||
return;
|
||||
}
|
||||
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
|
||||
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener = new GroupedActionListener<>(
|
||||
shardCount,
|
||||
delegate
|
||||
);
|
||||
for (int i = 0; i < shardCount; i++) {
|
||||
final int shardId = i;
|
||||
snapshotExecutor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final BlobContainer shardContainer = shardContainer(indexId, shardId);
|
||||
final Set<String> originalShardBlobs = shardContainer.listBlobs(OperationPurpose.SNAPSHOT).keySet();
|
||||
final BlobStoreIndexShardSnapshots blobStoreIndexShardSnapshots;
|
||||
final long newGen;
|
||||
if (useShardGenerations) {
|
||||
newGen = -1L;
|
||||
blobStoreIndexShardSnapshots = buildBlobStoreIndexShardSnapshots(
|
||||
originalShardBlobs,
|
||||
shardContainer,
|
||||
originalRepositoryData.shardGenerations().getShardGen(indexId, shardId)
|
||||
).v1();
|
||||
} else {
|
||||
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(
|
||||
originalShardBlobs,
|
||||
shardContainer
|
||||
);
|
||||
newGen = tuple.v2() + 1;
|
||||
blobStoreIndexShardSnapshots = tuple.v1();
|
||||
}
|
||||
allShardsListener.onResponse(
|
||||
deleteFromShardSnapshotMeta(
|
||||
survivingSnapshots,
|
||||
indexId,
|
||||
shardId,
|
||||
snapshotIds,
|
||||
shardContainer,
|
||||
originalShardBlobs,
|
||||
blobStoreIndexShardSnapshots,
|
||||
newGen
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
logger.warn(
|
||||
() -> format(
|
||||
"%s failed to delete shard data for shard [%s][%s]",
|
||||
snapshotIds,
|
||||
indexId.getName(),
|
||||
shardId
|
||||
),
|
||||
ex
|
||||
);
|
||||
// Just passing null here to count down the listener instead of failing it, the stale data left behind
|
||||
// here will be retried in the next delete or repository cleanup
|
||||
allShardsListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------------------------------------------------------
|
||||
// Updating each shard
|
||||
|
||||
/**
|
||||
* Delete snapshot from shard level metadata.
|
||||
*
|
||||
* @param indexGeneration generation to write the new shard level level metadata to. If negative a uuid id shard generation should
|
||||
* be used
|
||||
*/
|
||||
private ShardSnapshotMetaDeleteResult deleteFromShardSnapshotMeta(
|
||||
Set<SnapshotId> survivingSnapshots,
|
||||
IndexId indexId,
|
||||
int shardId,
|
||||
Collection<SnapshotId> snapshotIds,
|
||||
BlobContainer shardContainer,
|
||||
Set<String> originalShardBlobs,
|
||||
BlobStoreIndexShardSnapshots snapshots,
|
||||
long indexGeneration
|
||||
) {
|
||||
// Build a list of snapshots that should be preserved
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = snapshots.withRetainedSnapshots(survivingSnapshots);
|
||||
ShardGeneration writtenGeneration = null;
|
||||
try {
|
||||
if (updatedSnapshots.snapshots().isEmpty()) {
|
||||
return new ShardSnapshotMetaDeleteResult(indexId, shardId, ShardGenerations.DELETED_SHARD_GEN, originalShardBlobs);
|
||||
} else {
|
||||
if (indexGeneration < 0L) {
|
||||
writtenGeneration = ShardGeneration.newGeneration();
|
||||
INDEX_SHARD_SNAPSHOTS_FORMAT.write(updatedSnapshots, shardContainer, writtenGeneration.toBlobNamePart(), compress);
|
||||
} else {
|
||||
writtenGeneration = new ShardGeneration(indexGeneration);
|
||||
writeShardIndexBlobAtomic(shardContainer, indexGeneration, updatedSnapshots, Collections.emptyMap());
|
||||
}
|
||||
final Set<String> survivingSnapshotUUIDs = survivingSnapshots.stream()
|
||||
.map(SnapshotId::getUUID)
|
||||
.collect(Collectors.toSet());
|
||||
return new ShardSnapshotMetaDeleteResult(
|
||||
indexId,
|
||||
shardId,
|
||||
writtenGeneration,
|
||||
unusedBlobs(originalShardBlobs, survivingSnapshotUUIDs, updatedSnapshots)
|
||||
);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RepositoryException(
|
||||
metadata.name(),
|
||||
"Failed to finalize snapshot deletion "
|
||||
+ snapshotIds
|
||||
+ " with shard index ["
|
||||
+ INDEX_SHARD_SNAPSHOTS_FORMAT.blobName(writtenGeneration.toBlobNamePart())
|
||||
+ "]",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
|
||||
// temporary blobs
|
||||
private static List<String> unusedBlobs(
|
||||
Set<String> originalShardBlobs,
|
||||
Set<String> survivingSnapshotUUIDs,
|
||||
BlobStoreIndexShardSnapshots updatedSnapshots
|
||||
) {
|
||||
return originalShardBlobs.stream()
|
||||
.filter(
|
||||
blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
||||
|| (blob.startsWith(SNAPSHOT_PREFIX)
|
||||
&& blob.endsWith(".dat")
|
||||
&& survivingSnapshotUUIDs.contains(
|
||||
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())
|
||||
) == false)
|
||||
|| (blob.startsWith(UPLOADED_DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||
|| FsBlobContainer.isTempBlobName(blob)
|
||||
)
|
||||
.toList();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------------------------------------------------------------
|
||||
// Cleaning up dangling blobs
|
||||
|
||||
/**
|
||||
* Delete any dangling blobs in the repository root (i.e. {@link RepositoryData}, {@link SnapshotInfo} and {@link Metadata} blobs)
|
||||
* as well as any containers for indices that are now completely unreferenced.
|
||||
*/
|
||||
private void cleanupUnlinkedRootAndIndicesBlobs(RepositoryData newRepositoryData, ActionListener<Void> listener) {
|
||||
cleanupStaleBlobs(snapshotIds, originalIndexContainers, originalRootBlobs, newRepositoryData, listener.map(ignored -> null));
|
||||
}
|
||||
|
||||
private void cleanupUnlinkedShardLevelBlobs(
|
||||
Collection<ShardSnapshotMetaDeleteResult> shardDeleteResults,
|
||||
ActionListener<Void> listener
|
||||
) {
|
||||
final Iterator<String> filesToDelete = resolveFilesToDelete(shardDeleteResults);
|
||||
if (filesToDelete.hasNext() == false) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
snapshotExecutor.execute(ActionRunnable.wrap(listener, l -> {
|
||||
try {
|
||||
deleteFromContainer(blobContainer(), filesToDelete);
|
||||
l.onResponse(null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(() -> format("%s Failed to delete some blobs during snapshot delete", snapshotIds), e);
|
||||
throw e;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private Iterator<String> resolveFilesToDelete(Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
||||
final String basePath = basePath().buildAsString();
|
||||
final int basePathLen = basePath.length();
|
||||
final Map<IndexId, Collection<String>> indexMetaGenerations = originalRepositoryData
|
||||
.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds);
|
||||
return Stream.concat(deleteResults.stream().flatMap(shardResult -> {
|
||||
final String shardPath = shardPath(shardResult.indexId, shardResult.shardId).buildAsString();
|
||||
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
||||
}), indexMetaGenerations.entrySet().stream().flatMap(entry -> {
|
||||
final String indexContainerPath = indexPath(entry.getKey()).buildAsString();
|
||||
return entry.getValue().stream().map(id -> indexContainerPath + INDEX_METADATA_FORMAT.blobName(id));
|
||||
})).map(absolutePath -> {
|
||||
assert absolutePath.startsWith(basePath);
|
||||
return absolutePath.substring(basePathLen);
|
||||
}).iterator();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue