mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Fix deleting index during snapshot finalization (#103817)
Today if an index is deleted during a very specific order of snapshot finalizations then it's possible we'll miscalculate the latest shard generations for the shards in that index, causing the deletion of a shard-level `index-UUID` blob which prevents further snapshots of that shard. Closes #101029
This commit is contained in:
parent
ad28dc9a6c
commit
d049273ce1
6 changed files with 169 additions and 13 deletions
6
docs/changelog/103817.yaml
Normal file
6
docs/changelog/103817.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 103817
|
||||||
|
summary: Fix deleting index during snapshot finalization
|
||||||
|
area: Snapshot/Restore
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 101029
|
|
@ -17,17 +17,24 @@ import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.delete.DeleteIndexClusterStateUpdateRequest;
|
||||||
|
import org.elasticsearch.action.support.ActionTestUtils;
|
||||||
import org.elasticsearch.action.support.GroupedActionListener;
|
import org.elasticsearch.action.support.GroupedActionListener;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.action.support.SubscribableListener;
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
|
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
|
||||||
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||||
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
|
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
|
||||||
import org.elasticsearch.core.PathUtils;
|
import org.elasticsearch.core.PathUtils;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
|
import org.elasticsearch.discovery.AbstractDisruptionTestCase;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.RepositoryConflictException;
|
import org.elasticsearch.repositories.RepositoryConflictException;
|
||||||
|
@ -36,6 +43,7 @@ import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.ShardGenerations;
|
import org.elasticsearch.repositories.ShardGenerations;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
import org.elasticsearch.test.ClusterServiceUtils;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.test.disruption.NetworkDisruption;
|
import org.elasticsearch.test.disruption.NetworkDisruption;
|
||||||
import org.elasticsearch.test.transport.MockTransportService;
|
import org.elasticsearch.test.transport.MockTransportService;
|
||||||
|
@ -48,10 +56,12 @@ import java.nio.file.StandardOpenOption;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFileExists;
|
||||||
|
@ -2060,6 +2070,106 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
||||||
assertSuccessful(partialFuture);
|
assertSuccessful(partialFuture);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDeleteIndexWithOutOfOrderFinalization() {
|
||||||
|
|
||||||
|
final var indexToDelete = "index-to-delete";
|
||||||
|
final var indexNames = List.of(indexToDelete, "index-0", "index-1", "index-2");
|
||||||
|
|
||||||
|
for (final var indexName : indexNames) {
|
||||||
|
assertAcked(prepareCreate(indexName, indexSettingsNoReplicas(1)));
|
||||||
|
}
|
||||||
|
|
||||||
|
final var repoName = "test-repo";
|
||||||
|
createRepository(repoName, "fs");
|
||||||
|
|
||||||
|
// block the update-shard-snapshot-status requests so we can execute them in a specific order
|
||||||
|
final var masterTransportService = MockTransportService.getInstance(internalCluster().getMasterName());
|
||||||
|
final Map<String, SubscribableListener<Void>> otherIndexSnapshotListeners = indexNames.stream()
|
||||||
|
.collect(Collectors.toMap(k -> k, k -> new SubscribableListener<>()));
|
||||||
|
masterTransportService.<UpdateIndexShardSnapshotStatusRequest>addRequestHandlingBehavior(
|
||||||
|
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME,
|
||||||
|
(handler, request, channel, task) -> {
|
||||||
|
final var indexName = request.shardId().getIndexName();
|
||||||
|
if (indexName.equals(indexToDelete)) {
|
||||||
|
handler.messageReceived(request, channel, task);
|
||||||
|
} else {
|
||||||
|
final var listener = otherIndexSnapshotListeners.get(indexName);
|
||||||
|
assertNotNull(indexName, listener);
|
||||||
|
listener.addListener(
|
||||||
|
ActionTestUtils.assertNoFailureListener(ignored -> handler.messageReceived(request, channel, task))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// start the snapshots, each targeting index-to-delete and one other index so we can control their finalization order
|
||||||
|
final var snapshotCompleters = new HashMap<String, Runnable>();
|
||||||
|
for (final var blockingIndex : List.of("index-0", "index-1", "index-2")) {
|
||||||
|
final var snapshotName = "snapshot-with-" + blockingIndex;
|
||||||
|
final var snapshotFuture = clusterAdmin().prepareCreateSnapshot(repoName, snapshotName)
|
||||||
|
.setWaitForCompletion(true)
|
||||||
|
.setPartial(true)
|
||||||
|
.setIndices(indexToDelete, blockingIndex)
|
||||||
|
.execute();
|
||||||
|
|
||||||
|
// ensure each snapshot has really started before moving on to the next one
|
||||||
|
safeAwait(
|
||||||
|
ClusterServiceUtils.addTemporaryStateListener(
|
||||||
|
internalCluster().getInstance(ClusterService.class),
|
||||||
|
cs -> SnapshotsInProgress.get(cs)
|
||||||
|
.forRepo(repoName)
|
||||||
|
.stream()
|
||||||
|
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals(snapshotName))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
snapshotCompleters.put(blockingIndex, () -> {
|
||||||
|
assertFalse(snapshotFuture.isDone());
|
||||||
|
otherIndexSnapshotListeners.get(blockingIndex).onResponse(null);
|
||||||
|
assertEquals(SnapshotState.SUCCESS, snapshotFuture.actionGet(10, TimeUnit.SECONDS).getSnapshotInfo().state());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// set up to delete the index at a very specific moment during finalization
|
||||||
|
final var masterDeleteIndexService = internalCluster().getCurrentMasterNodeInstance(MetadataDeleteIndexService.class);
|
||||||
|
final var indexRecreatedListener = ClusterServiceUtils
|
||||||
|
// wait until the snapshot has entered finalization
|
||||||
|
.addTemporaryStateListener(
|
||||||
|
internalCluster().getInstance(ClusterService.class),
|
||||||
|
cs -> SnapshotsInProgress.get(cs)
|
||||||
|
.forRepo(repoName)
|
||||||
|
.stream()
|
||||||
|
.anyMatch(e -> e.snapshot().getSnapshotId().getName().equals("snapshot-with-index-1") && e.state().completed())
|
||||||
|
)
|
||||||
|
// execute the index deletion _directly on the master_ so it happens before the snapshot finalization executes
|
||||||
|
.andThen((l, ignored) -> masterDeleteIndexService.deleteIndices(new DeleteIndexClusterStateUpdateRequest(l.map(r -> {
|
||||||
|
assertTrue(r.isAcknowledged());
|
||||||
|
return null;
|
||||||
|
})).indices(new Index[] { internalCluster().clusterService().state().metadata().index(indexToDelete).getIndex() })
|
||||||
|
.ackTimeout(TimeValue.timeValueSeconds(10))
|
||||||
|
.masterNodeTimeout(TimeValue.timeValueSeconds(10))))
|
||||||
|
// ultimately create the index again so that taking a full snapshot will pick up any missing shard gen blob, and deleting that
|
||||||
|
// full snapshot will clean up all dangling shard-level blobs
|
||||||
|
.andThen((l, ignored) -> prepareCreate(indexToDelete, indexSettingsNoReplicas(1)).execute(l.map(r -> {
|
||||||
|
assertTrue(r.isAcknowledged());
|
||||||
|
return null;
|
||||||
|
})));
|
||||||
|
|
||||||
|
// release the snapshots to be finalized, in this order
|
||||||
|
for (final var blockingIndex : List.of("index-1", "index-2", "index-0")) {
|
||||||
|
snapshotCompleters.get(blockingIndex).run();
|
||||||
|
}
|
||||||
|
|
||||||
|
safeAwait(indexRecreatedListener);
|
||||||
|
masterTransportService.clearAllRules();
|
||||||
|
|
||||||
|
// create a full snapshot to verify that the repo is still ok
|
||||||
|
createFullSnapshot(repoName, "final-full-snapshot");
|
||||||
|
|
||||||
|
// delete the full snapshot to clean up the leftover shard-level metadata (which trips repo consistency assertions otherwise)
|
||||||
|
startDeleteSnapshot(repoName, "final-full-snapshot").actionGet(10, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
|
private static void assertSnapshotStatusCountOnRepo(String otherBlockedRepoName, int count) {
|
||||||
final SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus(otherBlockedRepoName).get();
|
final SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus(otherBlockedRepoName).get();
|
||||||
final List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
|
final List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
|
||||||
|
|
|
@ -100,7 +100,7 @@ public final class FinalizeSnapshotContext extends DelegatingActionListener<Repo
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterState updatedClusterState(ClusterState state) {
|
public ClusterState updatedClusterState(ClusterState state) {
|
||||||
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot());
|
final ClusterState updatedState = SnapshotsService.stateWithoutSnapshot(state, snapshotInfo.snapshot(), updatedShardGenerations);
|
||||||
obsoleteGenerations.set(
|
obsoleteGenerations.set(
|
||||||
SnapshotsInProgress.get(updatedState).obsoleteGenerations(snapshotInfo.repository(), SnapshotsInProgress.get(state))
|
SnapshotsInProgress.get(updatedState).obsoleteGenerations(snapshotInfo.repository(), SnapshotsInProgress.get(state))
|
||||||
);
|
);
|
||||||
|
|
|
@ -142,6 +142,11 @@ public final class ShardGenerations {
|
||||||
return generations.get(shardId);
|
return generations.get(shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean hasShardGen(RepositoryShardId repositoryShardId) {
|
||||||
|
final var indexShardGens = getGens(repositoryShardId.index());
|
||||||
|
return repositoryShardId.shardId() < indexShardGens.size() && indexShardGens.get(repositoryShardId.shardId()) != null;
|
||||||
|
}
|
||||||
|
|
||||||
public List<ShardGeneration> getGens(IndexId indexId) {
|
public List<ShardGeneration> getGens(IndexId indexId) {
|
||||||
return shardGenerations.getOrDefault(indexId, Collections.emptyList());
|
return shardGenerations.getOrDefault(indexId, Collections.emptyList());
|
||||||
}
|
}
|
||||||
|
|
|
@ -1741,6 +1741,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
(indexId, gens) -> gens.forEach(
|
(indexId, gens) -> gens.forEach(
|
||||||
(shardId, oldGen) -> toDelete.add(
|
(shardId, oldGen) -> toDelete.add(
|
||||||
shardPath(indexId, shardId).buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen
|
shardPath(indexId, shardId).buildAsString().substring(prefixPathLen) + INDEX_FILE_PREFIX + oldGen
|
||||||
|
.toBlobNamePart()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
|
@ -450,7 +450,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
endingSnapshots.add(targetSnapshot);
|
endingSnapshots.add(targetSnapshot);
|
||||||
initializingClones.remove(targetSnapshot);
|
initializingClones.remove(targetSnapshot);
|
||||||
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
|
logger.info(() -> "Failed to start snapshot clone [" + cloneEntry + "]", e);
|
||||||
removeFailedSnapshotFromClusterState(targetSnapshot, e, null);
|
removeFailedSnapshotFromClusterState(targetSnapshot, e, null, ShardGenerations.EMPTY);
|
||||||
};
|
};
|
||||||
|
|
||||||
// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
|
// 1. step, load SnapshotInfo to make sure that source snapshot was successful for the indices we want to clone
|
||||||
|
@ -1312,7 +1312,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
if (entry.isClone() && entry.state() == State.FAILED) {
|
if (entry.isClone() && entry.state() == State.FAILED) {
|
||||||
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
|
logger.debug("Removing failed snapshot clone [{}] from cluster state", entry);
|
||||||
if (newFinalization) {
|
if (newFinalization) {
|
||||||
removeFailedSnapshotFromClusterState(snapshot, new SnapshotException(snapshot, entry.failure()), null);
|
removeFailedSnapshotFromClusterState(
|
||||||
|
snapshot,
|
||||||
|
new SnapshotException(snapshot, entry.failure()),
|
||||||
|
null,
|
||||||
|
ShardGenerations.EMPTY
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1496,7 +1501,15 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
// a fatal like e.g. this node stopped being the master node
|
// a fatal like e.g. this node stopped being the master node
|
||||||
snapshotListeners.onResponse(endAndGetListenersToResolve(snapshot));
|
snapshotListeners.onResponse(endAndGetListenersToResolve(snapshot));
|
||||||
runNextQueuedOperation(updatedRepositoryData, repository, true);
|
runNextQueuedOperation(updatedRepositoryData, repository, true);
|
||||||
}, e -> handleFinalizationFailure(e, snapshot, repositoryData)),
|
},
|
||||||
|
e -> handleFinalizationFailure(
|
||||||
|
e,
|
||||||
|
snapshot,
|
||||||
|
repositoryData,
|
||||||
|
// we might have written the new root blob before failing here, so we must use the updated shardGenerations
|
||||||
|
shardGenerations
|
||||||
|
)
|
||||||
|
),
|
||||||
snInfo -> snapshotListeners.addListener(new ActionListener<>() {
|
snInfo -> snapshotListeners.addListener(new ActionListener<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(List<ActionListener<SnapshotInfo>> actionListeners) {
|
public void onResponse(List<ActionListener<SnapshotInfo>> actionListeners) {
|
||||||
|
@ -1512,11 +1525,20 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}, e -> handleFinalizationFailure(e, snapshot, repositoryData)));
|
},
|
||||||
|
e -> handleFinalizationFailure(
|
||||||
|
e,
|
||||||
|
snapshot,
|
||||||
|
repositoryData,
|
||||||
|
// a failure here means the root blob was not updated, but the updated shard generation blobs are all in place so we can
|
||||||
|
// use the updated shardGenerations for all pending shard snapshots
|
||||||
|
shardGenerations
|
||||||
|
)
|
||||||
|
));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
|
logger.error(Strings.format("unexpected failure finalizing %s", snapshot), e);
|
||||||
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
|
assert false : new AssertionError("unexpected failure finalizing " + snapshot, e);
|
||||||
handleFinalizationFailure(e, snapshot, repositoryData);
|
handleFinalizationFailure(e, snapshot, repositoryData, ShardGenerations.EMPTY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1568,7 +1590,12 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
* @param snapshot snapshot that failed to finalize
|
* @param snapshot snapshot that failed to finalize
|
||||||
* @param repositoryData current repository data for the snapshot's repository
|
* @param repositoryData current repository data for the snapshot's repository
|
||||||
*/
|
*/
|
||||||
private void handleFinalizationFailure(Exception e, Snapshot snapshot, RepositoryData repositoryData) {
|
private void handleFinalizationFailure(
|
||||||
|
Exception e,
|
||||||
|
Snapshot snapshot,
|
||||||
|
RepositoryData repositoryData,
|
||||||
|
ShardGenerations shardGenerations
|
||||||
|
) {
|
||||||
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
|
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) != null) {
|
||||||
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
|
// Failure due to not being master any more, don't try to remove snapshot from cluster state the next master
|
||||||
// will try ending this snapshot again
|
// will try ending this snapshot again
|
||||||
|
@ -1581,7 +1608,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
failAllListenersOnMasterFailOver(e);
|
failAllListenersOnMasterFailOver(e);
|
||||||
} else {
|
} else {
|
||||||
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
|
logger.warn(() -> "[" + snapshot + "] failed to finalize snapshot", e);
|
||||||
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData);
|
removeFailedSnapshotFromClusterState(snapshot, e, repositoryData, shardGenerations);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1701,7 +1728,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
* @param snapshot snapshot for which to remove the snapshot operation
|
* @param snapshot snapshot for which to remove the snapshot operation
|
||||||
* @return updated cluster state
|
* @return updated cluster state
|
||||||
*/
|
*/
|
||||||
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
|
public static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot, ShardGenerations shardGenerations) {
|
||||||
final SnapshotsInProgress snapshots = SnapshotsInProgress.get(state);
|
final SnapshotsInProgress snapshots = SnapshotsInProgress.get(state);
|
||||||
ClusterState result = state;
|
ClusterState result = state;
|
||||||
int indexOfEntry = -1;
|
int indexOfEntry = -1;
|
||||||
|
@ -1762,7 +1789,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
|
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
|
||||||
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
|
final RepositoryShardId repositoryShardId = finishedShardEntry.getKey();
|
||||||
if (shardState.state() != ShardState.SUCCESS
|
if (shardState.state() != ShardState.SUCCESS
|
||||||
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false) {
|
|| previousEntry.shardsByRepoShardId().containsKey(repositoryShardId) == false
|
||||||
|
|| shardGenerations.hasShardGen(finishedShardEntry.getKey()) == false) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
updatedShardAssignments = maybeAddUpdatedAssignment(
|
updatedShardAssignments = maybeAddUpdatedAssignment(
|
||||||
|
@ -1779,7 +1807,8 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
.entrySet()) {
|
.entrySet()) {
|
||||||
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
|
final ShardSnapshotStatus shardState = finishedShardEntry.getValue();
|
||||||
if (shardState.state() == ShardState.SUCCESS
|
if (shardState.state() == ShardState.SUCCESS
|
||||||
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey())) {
|
&& previousEntry.shardsByRepoShardId().containsKey(finishedShardEntry.getKey())
|
||||||
|
&& shardGenerations.hasShardGen(finishedShardEntry.getKey())) {
|
||||||
updatedShardAssignments = maybeAddUpdatedAssignment(
|
updatedShardAssignments = maybeAddUpdatedAssignment(
|
||||||
updatedShardAssignments,
|
updatedShardAssignments,
|
||||||
shardState,
|
shardState,
|
||||||
|
@ -1862,13 +1891,18 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
|
||||||
* @param repositoryData repository data if the next finalization operation on the repository should be attempted or {@code null} if
|
* @param repositoryData repository data if the next finalization operation on the repository should be attempted or {@code null} if
|
||||||
* no further actions should be executed
|
* no further actions should be executed
|
||||||
*/
|
*/
|
||||||
private void removeFailedSnapshotFromClusterState(Snapshot snapshot, Exception failure, @Nullable RepositoryData repositoryData) {
|
private void removeFailedSnapshotFromClusterState(
|
||||||
|
Snapshot snapshot,
|
||||||
|
Exception failure,
|
||||||
|
@Nullable RepositoryData repositoryData,
|
||||||
|
ShardGenerations shardGenerations
|
||||||
|
) {
|
||||||
assert failure != null : "Failure must be supplied";
|
assert failure != null : "Failure must be supplied";
|
||||||
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {
|
submitUnbatchedTask(REMOVE_SNAPSHOT_METADATA_TASK_SOURCE, new ClusterStateUpdateTask() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot);
|
final ClusterState updatedState = stateWithoutSnapshot(currentState, snapshot, shardGenerations);
|
||||||
assert updatedState == currentState || endingSnapshots.contains(snapshot)
|
assert updatedState == currentState || endingSnapshots.contains(snapshot)
|
||||||
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
|
: "did not track [" + snapshot + "] in ending snapshots while removing it from the cluster state";
|
||||||
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
|
// now check if there are any delete operations that refer to the just failed snapshot and remove the snapshot from them
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue