mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
Make snapshot deletes not block the repository during data blob deletes (#86514)
Snapshot deletes need not block other operations beyond the updates to the repository metadata at the beginning of the delete operation. All subsequent blob deletes can safely run async and concurrent to other operations. This is the simplest possible implementation of this change that I could find. It's not the most optimal since concurrent deletes are not guarded against trying to delete the same blobs twice. I believe this is not a big issue in practice though. For one, we batch overlapping deletes into single operations, so we will only try to redundantly delete blobs leaked by previous operations that are part of indices still referenced (which will generally by a very limited number of blobs I believe) and indices that went out of scope. Indices that went out of scope are deleted by listing out blobs and deleting them in turn, which means that we likely won't be attempting all that many redundant deletes even if the same index would be touched by concurrent delete operations and even if we did, the additional memory use would be bounded.
This commit is contained in:
parent
b60ccc4c9a
commit
55acdfad62
14 changed files with 203 additions and 83 deletions
5
docs/changelog/86514.yaml
Normal file
5
docs/changelog/86514.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 86514
|
||||
summary: Make snapshot deletes not block the repository during data blob deletes
|
||||
area: Snapshot/Restore
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.repositories.FinalizeSnapshotContext;
|
|||
import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
|
@ -275,12 +276,42 @@ class S3Repository extends MeteredBlobStoreRepository {
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
|
||||
listener = delayedListener(listener);
|
||||
final SnapshotDeleteListener wrappedListener;
|
||||
if (SnapshotsService.useShardGenerations(repositoryMetaVersion)) {
|
||||
wrappedListener = listener;
|
||||
} else {
|
||||
wrappedListener = new SnapshotDeleteListener() {
|
||||
@Override
|
||||
public void onDone() {
|
||||
listener.onDone();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRepositoryDataWritten(RepositoryData repositoryData) {
|
||||
logCooldownInfo();
|
||||
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(threadPool.schedule(() -> {
|
||||
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
|
||||
assert cancellable != null;
|
||||
listener.onRepositoryDataWritten(repositoryData);
|
||||
}, coolDown, ThreadPool.Names.SNAPSHOT));
|
||||
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
logCooldownInfo();
|
||||
final Scheduler.Cancellable existing = finalizationFuture.getAndSet(threadPool.schedule(() -> {
|
||||
final Scheduler.Cancellable cancellable = finalizationFuture.getAndSet(null);
|
||||
assert cancellable != null;
|
||||
listener.onFailure(e);
|
||||
}, coolDown, ThreadPool.Names.SNAPSHOT));
|
||||
assert existing == null : "Already have an ongoing finalization " + finalizationFuture;
|
||||
}
|
||||
};
|
||||
}
|
||||
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
|
||||
super.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, wrappedListener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -20,12 +20,10 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
|||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.repositories.RepositoriesService;
|
||||
import org.elasticsearch.repositories.RepositoryConflictException;
|
||||
import org.elasticsearch.repositories.RepositoryException;
|
||||
import org.elasticsearch.repositories.RepositoryVerificationException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.nio.file.Path;
|
||||
|
@ -270,7 +268,7 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
|||
final String snapshot1 = "test-snap1";
|
||||
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setWaitForCompletion(true).get();
|
||||
String blockedNode = internalCluster().getMasterName();
|
||||
((MockRepository) internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles();
|
||||
blockMasterOnWriteIndexFile(repo);
|
||||
logger.info("--> start deletion of snapshot");
|
||||
ActionFuture<AcknowledgedResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
|
||||
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
|
||||
|
|
|
@ -802,12 +802,28 @@ public class SnapshotStressTestsIT extends AbstractSnapshotIntegTestCase {
|
|||
.cluster()
|
||||
.prepareCleanupRepository(trackedRepository.repositoryName)
|
||||
.execute(mustSucceed(cleanupRepositoryResponse -> {
|
||||
Releasables.close(releaseAll);
|
||||
logger.info("--> completed cleanup of [{}]", trackedRepository.repositoryName);
|
||||
final RepositoryCleanupResult result = cleanupRepositoryResponse.result();
|
||||
assertThat(Strings.toString(result), result.blobs(), equalTo(0L));
|
||||
assertThat(Strings.toString(result), result.bytes(), equalTo(0L));
|
||||
startCleaner();
|
||||
if (result.bytes() > 0L || result.blobs() > 0L) {
|
||||
// we could legitimately run into dangling blobs as the result of a shard snapshot failing half-way
|
||||
// through the snapshot because of a concurrent index-close or -delete. The second round of cleanup on
|
||||
// the same repository however must always fully remove any dangling blobs since we block all concurrent
|
||||
// operations on the repository here
|
||||
client.admin()
|
||||
.cluster()
|
||||
.prepareCleanupRepository(trackedRepository.repositoryName)
|
||||
.execute(mustSucceed(secondCleanupRepositoryResponse -> {
|
||||
final RepositoryCleanupResult secondCleanupResult = secondCleanupRepositoryResponse.result();
|
||||
assertThat(Strings.toString(secondCleanupResult), secondCleanupResult.blobs(), equalTo(0L));
|
||||
assertThat(Strings.toString(secondCleanupResult), secondCleanupResult.bytes(), equalTo(0L));
|
||||
Releasables.close(releaseAll);
|
||||
logger.info("--> completed second cleanup of [{}]", trackedRepository.repositoryName);
|
||||
startCleaner();
|
||||
}));
|
||||
} else {
|
||||
Releasables.close(releaseAll);
|
||||
logger.info("--> completed cleanup of [{}]", trackedRepository.repositoryName);
|
||||
startCleaner();
|
||||
}
|
||||
}));
|
||||
|
||||
startedCleanup = true;
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -77,7 +78,7 @@ public class FilterRepository implements Repository {
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
in.deleteSnapshots(snapshotIds, repositoryStateId, repositoryMetaVersion, listener);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -84,7 +85,7 @@ public class InvalidRepository extends AbstractLifecycleComponent implements Rep
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
listener.onFailure(createCreationException());
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -149,7 +150,7 @@ public interface Repository extends LifecycleComponent {
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
);
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -82,7 +83,7 @@ public class UnknownTypeRepository extends AbstractLifecycleComponent implements
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
listener.onFailure(createUnknownTypeException());
|
||||
}
|
||||
|
|
|
@ -106,6 +106,7 @@ import org.elasticsearch.repositories.ShardGenerations;
|
|||
import org.elasticsearch.repositories.ShardSnapshotResult;
|
||||
import org.elasticsearch.repositories.SnapshotShardContext;
|
||||
import org.elasticsearch.snapshots.AbortedSnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
|
@ -807,7 +808,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
|
@ -906,9 +907,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
Map<String, BlobMetadata> rootBlobs,
|
||||
RepositoryData repositoryData,
|
||||
Version repoMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
|
||||
if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
|
||||
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
|
||||
|
@ -937,11 +937,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}, listener::onFailure);
|
||||
// Once we have updated the repository, run the clean-ups
|
||||
writeUpdatedRepoDataStep.whenComplete(updatedRepoData -> {
|
||||
listener.onRepositoryDataWritten(updatedRepoData);
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(() -> listener.onResponse(updatedRepoData)),
|
||||
2
|
||||
);
|
||||
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(listener::onDone), 2);
|
||||
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||
asyncCleanupUnlinkedShardLevelBlobs(
|
||||
repositoryData,
|
||||
|
@ -955,10 +953,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
|
||||
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(newRepoData -> {
|
||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(() -> listener.onResponse(newRepoData)),
|
||||
2
|
||||
);
|
||||
final ActionListener<Void> afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> {
|
||||
listener.onRepositoryDataWritten(newRepoData);
|
||||
listener.onDone();
|
||||
}), 2);
|
||||
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
||||
|
@ -2627,8 +2625,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final long startTime = threadPool.absoluteTimeInMillis();
|
||||
try {
|
||||
final ShardGeneration generation = snapshotStatus.generation();
|
||||
logger.debug("[{}] [{}] snapshot to [{}] [{}] ...", shardId, snapshotId, metadata.name(), generation);
|
||||
final BlobContainer shardContainer = shardContainer(context.indexId(), shardId);
|
||||
logger.debug("[{}][{}] snapshot to [{}][{}][{}] ...", shardId, snapshotId, metadata.name(), context.indexId(), generation);
|
||||
final Set<String> blobs;
|
||||
if (generation == null) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
|
||||
public interface SnapshotDeleteListener {
|
||||
|
||||
/**
|
||||
* Invoked once a snapshot has been fully deleted from the repository.
|
||||
*/
|
||||
void onDone();
|
||||
|
||||
/**
|
||||
* Invoked once the updated {@link RepositoryData} has been written to the repository.
|
||||
*
|
||||
* @param repositoryData updated repository data
|
||||
*/
|
||||
void onRepositoryDataWritten(RepositoryData repositoryData);
|
||||
|
||||
/**
|
||||
* Invoked if writing updated {@link RepositoryData} to the repository failed. Once {@link #onRepositoryDataWritten(RepositoryData)} has
|
||||
* been invoked this method will never be invoked.
|
||||
*
|
||||
* @param e exception during metadata steps of snapshot delete
|
||||
*/
|
||||
void onFailure(Exception e);
|
||||
}
|
|
@ -65,6 +65,7 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.Maps;
|
||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.core.SuppressForbidden;
|
||||
import org.elasticsearch.core.Tuple;
|
||||
|
@ -2407,7 +2408,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// delete now, we can avoid reaching to the repository and can complete the deletion.
|
||||
// TODO we should complete the deletion and resolve the listeners of SnapshotDeletionsInProgress with no snapshot sooner,
|
||||
// that would save some cluster state updates.
|
||||
removeSnapshotDeletionFromClusterState(deleteEntry, null, repositoryData);
|
||||
removeSnapshotDeletionFromClusterState(
|
||||
deleteEntry,
|
||||
repositoryData,
|
||||
listeners -> completeListenersIgnoringException(listeners, null)
|
||||
);
|
||||
return;
|
||||
}
|
||||
repositoriesService.repository(deleteEntry.repository())
|
||||
|
@ -2415,10 +2420,51 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
snapshotIds,
|
||||
repositoryData.getGenId(),
|
||||
minCompatibleVersion(minNodeVersion, repositoryData, snapshotIds),
|
||||
ActionListener.wrap(updatedRepoData -> {
|
||||
logger.info("snapshots {} deleted", snapshotIds);
|
||||
removeSnapshotDeletionFromClusterState(deleteEntry, null, updatedRepoData);
|
||||
}, ex -> removeSnapshotDeletionFromClusterState(deleteEntry, ex, repositoryData))
|
||||
new SnapshotDeleteListener() {
|
||||
|
||||
private final ListenableFuture<Void> doneFuture = new ListenableFuture<>();
|
||||
|
||||
@Override
|
||||
public void onDone() {
|
||||
logger.info("snapshots {} deleted", snapshotIds);
|
||||
doneFuture.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRepositoryDataWritten(RepositoryData updatedRepoData) {
|
||||
removeSnapshotDeletionFromClusterState(
|
||||
deleteEntry,
|
||||
updatedRepoData,
|
||||
listeners -> doneFuture.addListener(new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(Void unused) {
|
||||
completeListenersIgnoringException(listeners, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
// this should never be called, once updated repository metadata has been written to the
|
||||
// repository and the delete been removed from the cluster state, we ignore any further failures
|
||||
// and always complete the delete successfully
|
||||
assert false : e;
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
submitUnbatchedTask(
|
||||
"remove snapshot deletion metadata after failed delete",
|
||||
new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) {
|
||||
@Override
|
||||
protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
|
||||
failListenersIgnoringException(deleteListeners, e);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -2427,54 +2473,41 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
* Removes a {@link SnapshotDeletionsInProgress.Entry} from {@link SnapshotDeletionsInProgress} in the cluster state after it executed
|
||||
* on the repository.
|
||||
*
|
||||
* @param deleteEntry delete entry to remove from the cluster state
|
||||
* @param failure failure encountered while executing the delete on the repository or {@code null} if the delete executed
|
||||
* successfully
|
||||
* @param deleteEntry delete entry to remove from the cluster state
|
||||
* @param repositoryData current {@link RepositoryData} for the repository we just ran the delete on.
|
||||
* @param listenersHandler consumer that gets passed a list of all listeners that had their delete entry successfully removed from the
|
||||
* cluster state
|
||||
*/
|
||||
private void removeSnapshotDeletionFromClusterState(
|
||||
final SnapshotDeletionsInProgress.Entry deleteEntry,
|
||||
@Nullable final Exception failure,
|
||||
final RepositoryData repositoryData
|
||||
final RepositoryData repositoryData,
|
||||
final Consumer<List<ActionListener<Void>>> listenersHandler
|
||||
) {
|
||||
final ClusterStateUpdateTask clusterStateUpdateTask;
|
||||
if (failure == null) {
|
||||
// If we didn't have a failure during the snapshot delete we will remove all snapshot ids that the delete successfully removed
|
||||
// from the repository from enqueued snapshot delete entries during the cluster state update. After the cluster state update we
|
||||
// resolve the delete listeners with the latest repository data from after the delete.
|
||||
clusterStateUpdateTask = new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) {
|
||||
@Override
|
||||
protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgress deletions) {
|
||||
final SnapshotDeletionsInProgress updatedDeletions = deletionsWithoutSnapshots(
|
||||
deletions,
|
||||
deleteEntry.getSnapshots(),
|
||||
deleteEntry.repository()
|
||||
);
|
||||
return updatedDeletions == null ? deletions : updatedDeletions;
|
||||
}
|
||||
// We remove all snapshot ids that the delete successfully removed from the repository from enqueued snapshot delete entries during
|
||||
// the cluster state update. After the cluster state update we pass the list of listeners that had their entry removed from the
|
||||
// cluster state to the given handler
|
||||
submitUnbatchedTask("remove snapshot deletion metadata", new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) {
|
||||
@Override
|
||||
protected SnapshotDeletionsInProgress filterDeletions(SnapshotDeletionsInProgress deletions) {
|
||||
final SnapshotDeletionsInProgress updatedDeletions = deletionsWithoutSnapshots(
|
||||
deletions,
|
||||
deleteEntry.getSnapshots(),
|
||||
deleteEntry.repository()
|
||||
);
|
||||
return updatedDeletions == null ? deletions : updatedDeletions;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
|
||||
assert repositoryData.getSnapshotIds().stream().noneMatch(deleteEntry.getSnapshots()::contains)
|
||||
: "Repository data contained snapshot ids "
|
||||
+ repositoryData.getSnapshotIds()
|
||||
+ " that should should been deleted by ["
|
||||
+ deleteEntry
|
||||
+ "]";
|
||||
completeListenersIgnoringException(deleteListeners, null);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
// The delete failed to execute on the repository. We remove it from the cluster state and then fail all listeners associated
|
||||
// with it.
|
||||
clusterStateUpdateTask = new RemoveSnapshotDeletionAndContinueTask(deleteEntry, repositoryData) {
|
||||
@Override
|
||||
protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
|
||||
failListenersIgnoringException(deleteListeners, failure);
|
||||
}
|
||||
};
|
||||
}
|
||||
submitUnbatchedTask("remove snapshot deletion metadata", clusterStateUpdateTask);
|
||||
@Override
|
||||
protected void handleListeners(List<ActionListener<Void>> deleteListeners) {
|
||||
assert repositoryData.getSnapshotIds().stream().noneMatch(deleteEntry.getSnapshots()::contains)
|
||||
: "Repository data contained snapshot ids "
|
||||
+ repositoryData.getSnapshotIds()
|
||||
+ " that should should been deleted by ["
|
||||
+ deleteEntry
|
||||
+ "]";
|
||||
listenersHandler.accept(deleteListeners);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2558,9 +2591,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public final void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
|
||||
final List<ActionListener<Void>> deleteListeners;
|
||||
repositoryOperations.finishDeletion(deleteEntry.uuid());
|
||||
deleteListeners = snapshotDeletionListeners.remove(deleteEntry.uuid());
|
||||
final List<ActionListener<Void>> deleteListeners = snapshotDeletionListeners.remove(deleteEntry.uuid());
|
||||
handleListeners(deleteListeners);
|
||||
if (newFinalizations.isEmpty()) {
|
||||
if (readyDeletions.isEmpty()) {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.index.store.Store;
|
|||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.repositories.blobstore.MeteredBlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -350,9 +351,9 @@ public class RepositoriesServiceTests extends ESTestCase {
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
listener.onResponse(null);
|
||||
listener.onFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.repositories.ShardGeneration;
|
|||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.repositories.ShardSnapshotResult;
|
||||
import org.elasticsearch.repositories.SnapshotShardContext;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
|
||||
import java.util.Collection;
|
||||
|
@ -103,9 +104,9 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
listener.onResponse(null);
|
||||
listener.onFailure(new UnsupportedOperationException());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -74,6 +74,7 @@ import org.elasticsearch.repositories.ShardSnapshotResult;
|
|||
import org.elasticsearch.repositories.SnapshotShardContext;
|
||||
import org.elasticsearch.repositories.blobstore.FileRestoreContext;
|
||||
import org.elasticsearch.snapshots.Snapshot;
|
||||
import org.elasticsearch.snapshots.SnapshotDeleteListener;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
|
@ -305,9 +306,9 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
|||
Collection<SnapshotId> snapshotIds,
|
||||
long repositoryStateId,
|
||||
Version repositoryMetaVersion,
|
||||
ActionListener<RepositoryData> listener
|
||||
SnapshotDeleteListener listener
|
||||
) {
|
||||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||
listener.onFailure(new UnsupportedOperationException("Unsupported for repository of type: " + TYPE));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue