mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Make SnapshotsInProgress project compatible (#125470)
This PR adds project-id to both SnapshotsInProgress and Snapshot so that they are aware of projects and ready to handle snapshots from multiple projects. Relates: ES-10224
This commit is contained in:
parent
b882e76a9a
commit
0c8daaeca5
11 changed files with 413 additions and 52 deletions
|
@ -0,0 +1,112 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.upgrades;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||||
|
|
||||||
|
import org.elasticsearch.client.Request;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.test.rest.ObjectPath;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import static org.elasticsearch.upgrades.SnapshotBasedRecoveryIT.indexDocs;
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
|
|
||||||
|
public class RunningSnapshotIT extends AbstractRollingUpgradeTestCase {
|
||||||
|
|
||||||
|
public RunningSnapshotIT(@Name("upgradedNodes") int upgradedNodes) {
|
||||||
|
super(upgradedNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testRunningSnapshotCompleteAfterUpgrade() throws Exception {
|
||||||
|
final String indexName = "index";
|
||||||
|
final String repositoryName = "repo";
|
||||||
|
final String snapshotName = "snapshot";
|
||||||
|
final var nodeIds = getNodesInfo(client()).keySet();
|
||||||
|
|
||||||
|
if (isOldCluster()) {
|
||||||
|
registerRepository(repositoryName, "fs", randomBoolean(), Settings.builder().put("location", "backup").build());
|
||||||
|
// create an index to have one shard per node
|
||||||
|
createIndex(indexName, indexSettings(3, 0).put("index.routing.allocation.total_shards_per_node", 1).build());
|
||||||
|
ensureGreen(indexName);
|
||||||
|
if (randomBoolean()) {
|
||||||
|
indexDocs(indexName, between(10, 50));
|
||||||
|
}
|
||||||
|
flush(indexName, true);
|
||||||
|
// Signal shutdown to prevent snapshot from being completed
|
||||||
|
putShutdownMetadata(nodeIds);
|
||||||
|
createSnapshot(repositoryName, snapshotName, false);
|
||||||
|
assertRunningSnapshot(repositoryName, snapshotName);
|
||||||
|
} else {
|
||||||
|
if (isUpgradedCluster()) {
|
||||||
|
deleteShutdownMetadata(nodeIds);
|
||||||
|
assertNoShutdownMetadata(nodeIds);
|
||||||
|
ensureGreen(indexName);
|
||||||
|
assertBusy(() -> assertCompletedSnapshot(repositoryName, snapshotName));
|
||||||
|
} else {
|
||||||
|
assertRunningSnapshot(repositoryName, snapshotName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void putShutdownMetadata(Collection<String> nodeIds) throws IOException {
|
||||||
|
for (String nodeId : nodeIds) {
|
||||||
|
final Request putShutdownRequest = new Request("PUT", "/_nodes/" + nodeId + "/shutdown");
|
||||||
|
putShutdownRequest.setJsonEntity("""
|
||||||
|
{
|
||||||
|
"type": "remove",
|
||||||
|
"reason": "test"
|
||||||
|
}""");
|
||||||
|
client().performRequest(putShutdownRequest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void deleteShutdownMetadata(Collection<String> nodeIds) throws IOException {
|
||||||
|
for (String nodeId : nodeIds) {
|
||||||
|
final Request request = new Request("DELETE", "/_nodes/" + nodeId + "/shutdown");
|
||||||
|
client().performRequest(request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNoShutdownMetadata(Collection<String> nodeIds) throws IOException {
|
||||||
|
final ObjectPath responsePath = assertOKAndCreateObjectPath(
|
||||||
|
client().performRequest(new Request("GET", "/_nodes/" + Strings.collectionToCommaDelimitedString(nodeIds) + "/shutdown"))
|
||||||
|
);
|
||||||
|
assertThat(responsePath.evaluate("nodes"), empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertRunningSnapshot(String repositoryName, String snapshotName) throws IOException {
|
||||||
|
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current");
|
||||||
|
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
|
||||||
|
assertThat(responsePath.evaluate("total"), equalTo(1));
|
||||||
|
assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertCompletedSnapshot(String repositoryName, String snapshotName) throws IOException {
|
||||||
|
{
|
||||||
|
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/_current");
|
||||||
|
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
|
||||||
|
assertThat(responsePath.evaluate("total"), equalTo(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
final Request request = new Request("GET", "/_snapshot/" + repositoryName + "/" + snapshotName);
|
||||||
|
final ObjectPath responsePath = assertOKAndCreateObjectPath(client().performRequest(request));
|
||||||
|
assertThat(responsePath.evaluate("total"), equalTo(1));
|
||||||
|
assertThat(responsePath.evaluate("snapshots.0.snapshot"), equalTo(snapshotName));
|
||||||
|
assertThat(responsePath.evaluate("snapshots.0.state"), not(equalTo("IN_PROGRESS")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -233,7 +233,7 @@ public class SnapshotBasedRecoveryIT extends AbstractRollingUpgradeTestCase {
|
||||||
return responseAsMap;
|
return responseAsMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void indexDocs(String indexName, int numDocs) throws IOException {
|
static void indexDocs(String indexName, int numDocs) throws IOException {
|
||||||
final StringBuilder bulkBody = new StringBuilder();
|
final StringBuilder bulkBody = new StringBuilder();
|
||||||
for (int i = 0; i < numDocs; i++) {
|
for (int i = 0; i < numDocs; i++) {
|
||||||
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
|
bulkBody.append("{\"index\":{\"_id\":\"").append(i).append("\"}}\n");
|
||||||
|
|
|
@ -205,6 +205,7 @@ public class TransportVersions {
|
||||||
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00);
|
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00);
|
||||||
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
|
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
|
||||||
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
|
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
|
||||||
|
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* STOP! READ THIS FIRST! No, really,
|
* STOP! READ THIS FIRST! No, really,
|
||||||
|
|
|
@ -146,6 +146,29 @@ public final class DiffableUtils {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new JDK map backed MapDiff by transforming the keys with the provided keyFunction.
|
||||||
|
* @param diff Original MapDiff to transform
|
||||||
|
* @param keyFunction Function to transform the key
|
||||||
|
* @param keySerializer Serializer for the new key
|
||||||
|
*/
|
||||||
|
public static <K1, K2, T extends Diffable<T>, M1 extends Map<K1, T>> MapDiff<K2, T, Map<K2, T>> jdkMapDiffWithUpdatedKeys(
|
||||||
|
MapDiff<K1, T, M1> diff,
|
||||||
|
Function<K1, K2> keyFunction,
|
||||||
|
KeySerializer<K2> keySerializer
|
||||||
|
) {
|
||||||
|
final List<K2> deletes = diff.getDeletes().stream().map(keyFunction).toList();
|
||||||
|
final List<Map.Entry<K2, Diff<T>>> diffs = diff.getDiffs()
|
||||||
|
.stream()
|
||||||
|
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
|
||||||
|
.toList();
|
||||||
|
final List<Map.Entry<K2, T>> upserts = diff.getUpserts()
|
||||||
|
.stream()
|
||||||
|
.map(entry -> Map.entry(keyFunction.apply(entry.getKey()), entry.getValue()))
|
||||||
|
.toList();
|
||||||
|
return new MapDiff<>(keySerializer, DiffableValueSerializer.getWriteOnlyInstance(), deletes, diffs, upserts, JdkMapBuilder::new);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a MapDiff that applies a single entry diff to a map
|
* Creates a MapDiff that applies a single entry diff to a map
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -12,6 +12,8 @@ package org.elasticsearch.cluster;
|
||||||
import org.elasticsearch.TransportVersion;
|
import org.elasticsearch.TransportVersion;
|
||||||
import org.elasticsearch.TransportVersions;
|
import org.elasticsearch.TransportVersions;
|
||||||
import org.elasticsearch.cluster.ClusterState.Custom;
|
import org.elasticsearch.cluster.ClusterState.Custom;
|
||||||
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
@ -22,6 +24,7 @@ import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.util.Maps;
|
import org.elasticsearch.common.util.Maps;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
|
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
|
||||||
|
import org.elasticsearch.core.FixForMultiProject;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.SuppressForbidden;
|
import org.elasticsearch.core.SuppressForbidden;
|
||||||
import org.elasticsearch.core.Tuple;
|
import org.elasticsearch.core.Tuple;
|
||||||
|
@ -32,6 +35,7 @@ import org.elasticsearch.logging.LogManager;
|
||||||
import org.elasticsearch.logging.Logger;
|
import org.elasticsearch.logging.Logger;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.RepositoryOperation;
|
import org.elasticsearch.repositories.RepositoryOperation;
|
||||||
|
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
|
||||||
import org.elasticsearch.repositories.RepositoryShardId;
|
import org.elasticsearch.repositories.RepositoryShardId;
|
||||||
import org.elasticsearch.repositories.ShardGeneration;
|
import org.elasticsearch.repositories.ShardGeneration;
|
||||||
import org.elasticsearch.repositories.ShardSnapshotResult;
|
import org.elasticsearch.repositories.ShardSnapshotResult;
|
||||||
|
@ -56,6 +60,8 @@ import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.RepositoryOperation.PROJECT_REPO_SERIALIZER;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Meta data about snapshots that are currently executing
|
* Meta data about snapshots that are currently executing
|
||||||
*/
|
*/
|
||||||
|
@ -70,7 +76,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion";
|
public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion";
|
||||||
|
|
||||||
/** Maps repository name to list of snapshots in that repository */
|
/** Maps repository name to list of snapshots in that repository */
|
||||||
private final Map<String, ByRepo> entries;
|
private final Map<ProjectRepo, ByRepo> entries;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* IDs of nodes which are marked for removal, or which were previously marked for removal and still have running shard snapshots.
|
* IDs of nodes which are marked for removal, or which were previously marked for removal and still have running shard snapshots.
|
||||||
|
@ -104,56 +110,72 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
: Set.of();
|
: Set.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Map<String, ByRepo> collectByRepo(StreamInput in) throws IOException {
|
private static Map<ProjectRepo, ByRepo> collectByRepo(StreamInput in) throws IOException {
|
||||||
final int count = in.readVInt();
|
final int count = in.readVInt();
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
return Map.of();
|
return Map.of();
|
||||||
}
|
}
|
||||||
final Map<String, List<Entry>> entriesByRepo = new HashMap<>();
|
final Map<ProjectRepo, List<Entry>> entriesByRepo = new HashMap<>();
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
final Entry entry = Entry.readFrom(in);
|
final Entry entry = Entry.readFrom(in);
|
||||||
entriesByRepo.computeIfAbsent(entry.repository(), repo -> new ArrayList<>()).add(entry);
|
entriesByRepo.computeIfAbsent(new ProjectRepo(entry.projectId(), entry.repository()), repo -> new ArrayList<>()).add(entry);
|
||||||
}
|
}
|
||||||
final Map<String, ByRepo> res = Maps.newMapWithExpectedSize(entriesByRepo.size());
|
final Map<ProjectRepo, ByRepo> res = Maps.newMapWithExpectedSize(entriesByRepo.size());
|
||||||
for (Map.Entry<String, List<Entry>> entryForRepo : entriesByRepo.entrySet()) {
|
for (Map.Entry<ProjectRepo, List<Entry>> entryForRepo : entriesByRepo.entrySet()) {
|
||||||
res.put(entryForRepo.getKey(), new ByRepo(entryForRepo.getValue()));
|
res.put(entryForRepo.getKey(), new ByRepo(entryForRepo.getValue()));
|
||||||
}
|
}
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
private SnapshotsInProgress(Map<String, ByRepo> entries, Set<String> nodesIdsForRemoval) {
|
private SnapshotsInProgress(Map<ProjectRepo, ByRepo> entries, Set<String> nodesIdsForRemoval) {
|
||||||
this.entries = Map.copyOf(entries);
|
this.entries = Map.copyOf(entries);
|
||||||
this.nodesIdsForRemoval = nodesIdsForRemoval;
|
this.nodesIdsForRemoval = nodesIdsForRemoval;
|
||||||
assert assertConsistentEntries(this.entries);
|
assert assertConsistentEntries(this.entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@FixForMultiProject
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
|
public SnapshotsInProgress withUpdatedEntriesForRepo(String repository, List<Entry> updatedEntries) {
|
||||||
if (updatedEntries.equals(forRepo(repository))) {
|
return withUpdatedEntriesForRepo(Metadata.DEFAULT_PROJECT_ID, repository, updatedEntries);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SnapshotsInProgress withUpdatedEntriesForRepo(ProjectId projectId, String repository, List<Entry> updatedEntries) {
|
||||||
|
if (updatedEntries.equals(forRepo(projectId, repository))) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
final Map<String, ByRepo> copy = new HashMap<>(this.entries);
|
final Map<ProjectRepo, ByRepo> copy = new HashMap<>(this.entries);
|
||||||
|
final var projectRepo = new ProjectRepo(projectId, repository);
|
||||||
if (updatedEntries.isEmpty()) {
|
if (updatedEntries.isEmpty()) {
|
||||||
copy.remove(repository);
|
copy.remove(projectRepo);
|
||||||
if (copy.isEmpty()) {
|
if (copy.isEmpty()) {
|
||||||
return EMPTY;
|
return EMPTY;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
copy.put(repository, new ByRepo(updatedEntries));
|
copy.put(projectRepo, new ByRepo(updatedEntries));
|
||||||
}
|
}
|
||||||
return new SnapshotsInProgress(copy, nodesIdsForRemoval);
|
return new SnapshotsInProgress(copy, nodesIdsForRemoval);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SnapshotsInProgress withAddedEntry(Entry entry) {
|
public SnapshotsInProgress withAddedEntry(Entry entry) {
|
||||||
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.repository()));
|
final List<Entry> forRepo = new ArrayList<>(forRepo(entry.projectId(), entry.repository()));
|
||||||
forRepo.add(entry);
|
forRepo.add(entry);
|
||||||
return withUpdatedEntriesForRepo(entry.repository(), forRepo);
|
return withUpdatedEntriesForRepo(entry.projectId(), entry.repository(), forRepo);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the list of snapshots in the specified repository.
|
* Returns the list of snapshots in the specified repository.
|
||||||
*/
|
*/
|
||||||
|
@FixForMultiProject
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
public List<Entry> forRepo(String repository) {
|
public List<Entry> forRepo(String repository) {
|
||||||
return entries.getOrDefault(repository, ByRepo.EMPTY).entries;
|
return forRepo(Metadata.DEFAULT_PROJECT_ID, repository);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the list of snapshots in the specified repository.
|
||||||
|
*/
|
||||||
|
public List<Entry> forRepo(ProjectId projectId, String repository) {
|
||||||
|
return entries.getOrDefault(new ProjectRepo(projectId, repository), ByRepo.EMPTY).entries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isEmpty() {
|
public boolean isEmpty() {
|
||||||
|
@ -178,7 +200,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public Entry snapshot(final Snapshot snapshot) {
|
public Entry snapshot(final Snapshot snapshot) {
|
||||||
return findSnapshotInList(snapshot, forRepo(snapshot.getRepository()));
|
return findSnapshotInList(snapshot, forRepo(snapshot.getProjectId(), snapshot.getRepository()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -206,14 +228,34 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
|
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
|
||||||
* for deletion now.
|
* for deletion now.
|
||||||
*/
|
*/
|
||||||
|
@FixForMultiProject
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
|
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
|
||||||
String repository,
|
String repository,
|
||||||
SnapshotsInProgress oldClusterStateSnapshots
|
SnapshotsInProgress oldClusterStateSnapshots
|
||||||
) {
|
) {
|
||||||
final Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations = new HashMap<>();
|
return obsoleteGenerations(Metadata.DEFAULT_PROJECT_ID, repository, oldClusterStateSnapshots);
|
||||||
final List<Entry> latestSnapshots = forRepo(repository);
|
}
|
||||||
|
|
||||||
for (Entry oldEntry : oldClusterStateSnapshots.forRepo(repository)) {
|
/**
|
||||||
|
* Computes a map of repository shard id to set of shard generations, containing all shard generations that became obsolete and may be
|
||||||
|
* deleted from the repository as the cluster state moves from the given old value of {@link SnapshotsInProgress} to this instance.
|
||||||
|
* <p>
|
||||||
|
* An unique shard generation is created for every in-progress shard snapshot. The shard generation file contains information about all
|
||||||
|
* the files needed by pre-existing and any new shard snapshots that were in-progress. When a shard snapshot is finalized, its file list
|
||||||
|
* is promoted to the official shard snapshot list for the index shard. This final list will contain metadata about any other
|
||||||
|
* in-progress shard snapshots that were not yet finalized when it began. All these other in-progress shard snapshot lists are scheduled
|
||||||
|
* for deletion now.
|
||||||
|
*/
|
||||||
|
public Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations(
|
||||||
|
ProjectId projectId,
|
||||||
|
String repository,
|
||||||
|
SnapshotsInProgress oldClusterStateSnapshots
|
||||||
|
) {
|
||||||
|
final Map<RepositoryShardId, Set<ShardGeneration>> obsoleteGenerations = new HashMap<>();
|
||||||
|
final List<Entry> latestSnapshots = forRepo(projectId, repository);
|
||||||
|
|
||||||
|
for (Entry oldEntry : oldClusterStateSnapshots.forRepo(projectId, repository)) {
|
||||||
final Entry matchingLatestEntry = findSnapshotInList(oldEntry.snapshot(), latestSnapshots);
|
final Entry matchingLatestEntry = findSnapshotInList(oldEntry.snapshot(), latestSnapshots);
|
||||||
if (matchingLatestEntry == null || matchingLatestEntry == oldEntry) {
|
if (matchingLatestEntry == null || matchingLatestEntry == oldEntry) {
|
||||||
// The snapshot progress has not changed.
|
// The snapshot progress has not changed.
|
||||||
|
@ -412,15 +454,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean assertConsistentEntries(Map<String, ByRepo> entries) {
|
private static boolean assertConsistentEntries(Map<ProjectRepo, ByRepo> entries) {
|
||||||
for (Map.Entry<String, ByRepo> repoEntries : entries.entrySet()) {
|
for (Map.Entry<ProjectRepo, ByRepo> repoEntries : entries.entrySet()) {
|
||||||
final Set<Tuple<String, Integer>> assignedShards = new HashSet<>();
|
final Set<Tuple<String, Integer>> assignedShards = new HashSet<>();
|
||||||
final Set<Tuple<String, Integer>> queuedShards = new HashSet<>();
|
final Set<Tuple<String, Integer>> queuedShards = new HashSet<>();
|
||||||
final List<Entry> entriesForRepository = repoEntries.getValue().entries;
|
final List<Entry> entriesForRepository = repoEntries.getValue().entries;
|
||||||
final String repository = repoEntries.getKey();
|
final ProjectRepo repository = repoEntries.getKey();
|
||||||
assert entriesForRepository.isEmpty() == false : "found empty list of snapshots for " + repository + " in " + entries;
|
assert entriesForRepository.isEmpty() == false : "found empty list of snapshots for " + repository + " in " + entries;
|
||||||
for (Entry entry : entriesForRepository) {
|
for (Entry entry : entriesForRepository) {
|
||||||
assert entry.repository().equals(repository) : "mismatched repository " + entry + " tracked under " + repository;
|
assert new ProjectRepo(entry.projectId(), entry.repository()).equals(repository)
|
||||||
|
: "mismatched repository " + entry + " tracked under " + repository;
|
||||||
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shard : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
|
for (Map.Entry<RepositoryShardId, ShardSnapshotStatus> shard : entry.shardSnapshotStatusByRepoShardId().entrySet()) {
|
||||||
final RepositoryShardId sid = shard.getKey();
|
final RepositoryShardId sid = shard.getKey();
|
||||||
final ShardSnapshotStatus shardSnapshotStatus = shard.getValue();
|
final ShardSnapshotStatus shardSnapshotStatus = shard.getValue();
|
||||||
|
@ -1241,6 +1284,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProjectId projectId() {
|
||||||
|
return snapshot.getProjectId();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String repository() {
|
public String repository() {
|
||||||
return snapshot.getRepository();
|
return snapshot.getRepository();
|
||||||
|
@ -1391,6 +1439,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
@Override
|
@Override
|
||||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
|
builder.field("project_id", snapshot.getProjectId());
|
||||||
builder.field("repository", snapshot.getRepository());
|
builder.field("repository", snapshot.getRepository());
|
||||||
builder.field("snapshot", snapshot.getSnapshotId().getName());
|
builder.field("snapshot", snapshot.getSnapshotId().getName());
|
||||||
builder.field("uuid", snapshot.getSnapshotId().getUUID());
|
builder.field("uuid", snapshot.getSnapshotId().getUUID());
|
||||||
|
@ -1725,25 +1774,43 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
|
|
||||||
private final SnapshotsInProgress after;
|
private final SnapshotsInProgress after;
|
||||||
|
|
||||||
private final DiffableUtils.MapDiff<String, ByRepo, Map<String, ByRepo>> mapDiff;
|
private final DiffableUtils.MapDiff<ProjectRepo, ByRepo, Map<ProjectRepo, ByRepo>> mapDiff;
|
||||||
private final Set<String> nodeIdsForRemoval;
|
private final Set<String> nodeIdsForRemoval;
|
||||||
|
|
||||||
SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) {
|
SnapshotInProgressDiff(SnapshotsInProgress before, SnapshotsInProgress after) {
|
||||||
this.mapDiff = DiffableUtils.diff(before.entries, after.entries, DiffableUtils.getStringKeySerializer());
|
this.mapDiff = DiffableUtils.diff(before.entries, after.entries, PROJECT_REPO_SERIALIZER);
|
||||||
this.nodeIdsForRemoval = after.nodesIdsForRemoval;
|
this.nodeIdsForRemoval = after.nodesIdsForRemoval;
|
||||||
this.after = after;
|
this.after = after;
|
||||||
}
|
}
|
||||||
|
|
||||||
SnapshotInProgressDiff(StreamInput in) throws IOException {
|
SnapshotInProgressDiff(StreamInput in) throws IOException {
|
||||||
this.mapDiff = DiffableUtils.readJdkMapDiff(
|
if (in.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
|
||||||
in,
|
final var oldMapDiff = DiffableUtils.readJdkMapDiff(
|
||||||
DiffableUtils.getStringKeySerializer(),
|
in,
|
||||||
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
|
DiffableUtils.getStringKeySerializer(),
|
||||||
i -> new ByRepo.ByRepoDiff(
|
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
|
||||||
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
|
i -> new ByRepo.ByRepoDiff(
|
||||||
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
|
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
|
||||||
)
|
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
|
||||||
);
|
)
|
||||||
|
);
|
||||||
|
this.mapDiff = DiffableUtils.jdkMapDiffWithUpdatedKeys(
|
||||||
|
oldMapDiff,
|
||||||
|
repository -> new ProjectRepo(ProjectId.DEFAULT, repository),
|
||||||
|
PROJECT_REPO_SERIALIZER
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
this.mapDiff = DiffableUtils.readJdkMapDiff(
|
||||||
|
in,
|
||||||
|
PROJECT_REPO_SERIALIZER,
|
||||||
|
i -> new ByRepo(i.readCollectionAsImmutableList(Entry::readFrom)),
|
||||||
|
i -> new ByRepo.ByRepoDiff(
|
||||||
|
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), Entry::readFrom, EntryDiff::new),
|
||||||
|
DiffableUtils.readJdkMapDiff(i, DiffableUtils.getStringKeySerializer(), ByRepo.INT_DIFF_VALUE_SERIALIZER)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
this.nodeIdsForRemoval = readNodeIdsForRemoval(in);
|
this.nodeIdsForRemoval = readNodeIdsForRemoval(in);
|
||||||
this.after = null;
|
this.after = null;
|
||||||
}
|
}
|
||||||
|
@ -1768,7 +1835,21 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
assert after != null : "should only write instances that were diffed from this node's state";
|
assert after != null : "should only write instances that were diffed from this node's state";
|
||||||
if (out.getTransportVersion().onOrAfter(DIFFABLE_VERSION)) {
|
if (out.getTransportVersion().onOrAfter(DIFFABLE_VERSION)) {
|
||||||
mapDiff.writeTo(out);
|
if (out.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
|
||||||
|
DiffableUtils.jdkMapDiffWithUpdatedKeys(mapDiff, projectRepo -> {
|
||||||
|
if (ProjectId.DEFAULT.equals(projectRepo.projectId()) == false) {
|
||||||
|
final var message = "Cannot write instance with non-default project id "
|
||||||
|
+ projectRepo.projectId()
|
||||||
|
+ " to version before "
|
||||||
|
+ TransportVersions.PROJECT_ID_IN_SNAPSHOT;
|
||||||
|
assert false : message;
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
return projectRepo.name();
|
||||||
|
}, DiffableUtils.getStringKeySerializer()).writeTo(out);
|
||||||
|
} else {
|
||||||
|
mapDiff.writeTo(out);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
new SimpleDiffable.CompleteDiff<>(after).writeTo(out);
|
new SimpleDiffable.CompleteDiff<>(after).writeTo(out);
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,11 +8,29 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.repositories;
|
package org.elasticsearch.repositories;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.DiffableUtils;
|
||||||
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.core.FixForMultiProject;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
|
* Coordinates of an operation that modifies a repository, assuming that repository at a specific generation.
|
||||||
*/
|
*/
|
||||||
public interface RepositoryOperation {
|
public interface RepositoryOperation {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Project for which repository belongs to.
|
||||||
|
*/
|
||||||
|
@FixForMultiProject(description = "default implementation is temporary")
|
||||||
|
default ProjectId projectId() {
|
||||||
|
return Metadata.DEFAULT_PROJECT_ID;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of the repository affected.
|
* Name of the repository affected.
|
||||||
*/
|
*/
|
||||||
|
@ -22,4 +40,34 @@ public interface RepositoryOperation {
|
||||||
* The repository state id at the time the operation began.
|
* The repository state id at the time the operation began.
|
||||||
*/
|
*/
|
||||||
long repositoryStateId();
|
long repositoryStateId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A project qualified repository
|
||||||
|
* @param projectId The project that the repository belongs to
|
||||||
|
* @param name Name of the repository
|
||||||
|
*/
|
||||||
|
record ProjectRepo(ProjectId projectId, String name) implements Writeable {
|
||||||
|
|
||||||
|
public ProjectRepo(StreamInput in) throws IOException {
|
||||||
|
this(ProjectId.readFrom(in), in.readString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
projectId.writeTo(out);
|
||||||
|
out.writeString(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DiffableUtils.KeySerializer<ProjectRepo> PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {
|
||||||
|
@Override
|
||||||
|
public void writeKey(ProjectRepo key, StreamOutput out) throws IOException {
|
||||||
|
key.writeTo(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ProjectRepo readKey(StreamInput in) throws IOException {
|
||||||
|
return new ProjectRepo(in);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,6 +12,7 @@ package org.elasticsearch.snapshots;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
|
||||||
import org.elasticsearch.repositories.RepositoryShardId;
|
import org.elasticsearch.repositories.RepositoryShardId;
|
||||||
import org.elasticsearch.repositories.ShardGeneration;
|
import org.elasticsearch.repositories.ShardGeneration;
|
||||||
import org.elasticsearch.repositories.ShardGenerations;
|
import org.elasticsearch.repositories.ShardGenerations;
|
||||||
|
@ -46,7 +47,7 @@ public final class InFlightShardSnapshotStates {
|
||||||
}
|
}
|
||||||
final Map<String, Map<Integer, ShardGeneration>> generations = new HashMap<>();
|
final Map<String, Map<Integer, ShardGeneration>> generations = new HashMap<>();
|
||||||
final Map<String, Set<Integer>> busyIds = new HashMap<>();
|
final Map<String, Set<Integer>> busyIds = new HashMap<>();
|
||||||
assert snapshots.stream().map(SnapshotsInProgress.Entry::repository).distinct().count() == 1
|
assert snapshots.stream().map(entry -> new ProjectRepo(entry.projectId(), entry.repository())).distinct().count() == 1
|
||||||
: "snapshots must either be an empty list or all belong to the same repository but saw " + snapshots;
|
: "snapshots must either be an empty list or all belong to the same repository but saw " + snapshots;
|
||||||
for (SnapshotsInProgress.Entry runningSnapshot : snapshots) {
|
for (SnapshotsInProgress.Entry runningSnapshot : snapshots) {
|
||||||
for (Map.Entry<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : runningSnapshot
|
for (Map.Entry<RepositoryShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : runningSnapshot
|
||||||
|
|
|
@ -9,9 +9,12 @@
|
||||||
|
|
||||||
package org.elasticsearch.snapshots;
|
package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
|
import org.elasticsearch.TransportVersions;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.core.FixForMultiProject;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -21,6 +24,7 @@ import java.util.Objects;
|
||||||
*/
|
*/
|
||||||
public final class Snapshot implements Writeable {
|
public final class Snapshot implements Writeable {
|
||||||
|
|
||||||
|
private final ProjectId projectId;
|
||||||
private final String repository;
|
private final String repository;
|
||||||
private final SnapshotId snapshotId;
|
private final SnapshotId snapshotId;
|
||||||
private final int hashCode;
|
private final int hashCode;
|
||||||
|
@ -28,7 +32,17 @@ public final class Snapshot implements Writeable {
|
||||||
/**
|
/**
|
||||||
* Constructs a snapshot.
|
* Constructs a snapshot.
|
||||||
*/
|
*/
|
||||||
|
@FixForMultiProject
|
||||||
|
@Deprecated(forRemoval = true)
|
||||||
public Snapshot(final String repository, final SnapshotId snapshotId) {
|
public Snapshot(final String repository, final SnapshotId snapshotId) {
|
||||||
|
this(ProjectId.DEFAULT, repository, snapshotId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructs a snapshot.
|
||||||
|
*/
|
||||||
|
public Snapshot(ProjectId projectId, final String repository, final SnapshotId snapshotId) {
|
||||||
|
this.projectId = projectId;
|
||||||
this.repository = Objects.requireNonNull(repository);
|
this.repository = Objects.requireNonNull(repository);
|
||||||
this.snapshotId = Objects.requireNonNull(snapshotId);
|
this.snapshotId = Objects.requireNonNull(snapshotId);
|
||||||
this.hashCode = computeHashCode();
|
this.hashCode = computeHashCode();
|
||||||
|
@ -38,11 +52,20 @@ public final class Snapshot implements Writeable {
|
||||||
* Constructs a snapshot from the stream input.
|
* Constructs a snapshot from the stream input.
|
||||||
*/
|
*/
|
||||||
public Snapshot(final StreamInput in) throws IOException {
|
public Snapshot(final StreamInput in) throws IOException {
|
||||||
|
if (in.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
|
||||||
|
projectId = ProjectId.DEFAULT;
|
||||||
|
} else {
|
||||||
|
projectId = ProjectId.readFrom(in);
|
||||||
|
}
|
||||||
repository = in.readString();
|
repository = in.readString();
|
||||||
snapshotId = new SnapshotId(in);
|
snapshotId = new SnapshotId(in);
|
||||||
hashCode = computeHashCode();
|
hashCode = computeHashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ProjectId getProjectId() {
|
||||||
|
return projectId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets the repository name for the snapshot.
|
* Gets the repository name for the snapshot.
|
||||||
*/
|
*/
|
||||||
|
@ -59,7 +82,7 @@ public final class Snapshot implements Writeable {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return repository + ":" + snapshotId.toString();
|
return projectId + ":" + repository + ":" + snapshotId.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -71,7 +94,7 @@ public final class Snapshot implements Writeable {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
Snapshot that = (Snapshot) o;
|
Snapshot that = (Snapshot) o;
|
||||||
return repository.equals(that.repository) && snapshotId.equals(that.snapshotId);
|
return projectId.equals(that.projectId) && repository.equals(that.repository) && snapshotId.equals(that.snapshotId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -80,11 +103,23 @@ public final class Snapshot implements Writeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int computeHashCode() {
|
private int computeHashCode() {
|
||||||
return Objects.hash(repository, snapshotId);
|
return Objects.hash(projectId, repository, snapshotId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(final StreamOutput out) throws IOException {
|
public void writeTo(final StreamOutput out) throws IOException {
|
||||||
|
if (out.getTransportVersion().before(TransportVersions.PROJECT_ID_IN_SNAPSHOT)) {
|
||||||
|
if (ProjectId.DEFAULT.equals(projectId) == false) {
|
||||||
|
final var message = "Cannot write instance with non-default project id "
|
||||||
|
+ projectId
|
||||||
|
+ " to version before "
|
||||||
|
+ TransportVersions.PROJECT_ID_IN_SNAPSHOT;
|
||||||
|
assert false : message;
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
projectId.writeTo(out);
|
||||||
|
}
|
||||||
out.writeString(repository);
|
out.writeString(repository);
|
||||||
snapshotId.writeTo(out);
|
snapshotId.writeTo(out);
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCas
|
||||||
assertThat(
|
assertThat(
|
||||||
decision.getExplanation(),
|
decision.getExplanation(),
|
||||||
equalTo(
|
equalTo(
|
||||||
"shard has failed to be restored from the snapshot [_repository:_missing/_uuid] - manually close or "
|
"shard has failed to be restored from the snapshot [default:_repository:_missing/_uuid] - manually close or "
|
||||||
+ "delete the index [test] in order to retry to restore the snapshot again or use the reroute API "
|
+ "delete the index [test] in order to retry to restore the snapshot again or use the reroute API "
|
||||||
+ "to force the allocation of an empty primary shard. Details: [restore_source[_repository/_missing]]"
|
+ "to force the allocation of an empty primary shard. Details: [restore_source[_repository/_missing]]"
|
||||||
)
|
)
|
||||||
|
@ -168,7 +168,8 @@ public class RestoreInProgressAllocationDeciderTests extends ESAllocationTestCas
|
||||||
assertThat(
|
assertThat(
|
||||||
decision.getExplanation(),
|
decision.getExplanation(),
|
||||||
startsWith(
|
startsWith(
|
||||||
"shard has failed to be restored from the snapshot [_repository:_existing/_uuid] - manually close or delete the index "
|
"shard has failed to be restored from the snapshot [default:_repository:_existing/_uuid]"
|
||||||
|
+ " - manually close or delete the index "
|
||||||
+ "[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of "
|
+ "[test] in order to retry to restore the snapshot again or use the reroute API to force the allocation of "
|
||||||
+ "an empty primary shard. Details: [restore_source[_repository/_existing], failure "
|
+ "an empty primary shard. Details: [restore_source[_repository/_existing], failure "
|
||||||
+ "java.io.IOException: i/o failure"
|
+ "java.io.IOException: i/o failure"
|
||||||
|
|
|
@ -241,7 +241,7 @@ public class RestoreServiceTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
assertThat(
|
assertThat(
|
||||||
exception.getMessage(),
|
exception.getMessage(),
|
||||||
equalTo("[name:name/uuid] cannot restore global state since the snapshot was created without global state")
|
equalTo("[default:name:name/uuid] cannot restore global state since the snapshot was created without global state")
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,19 +10,23 @@
|
||||||
package org.elasticsearch.snapshots;
|
package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
import org.elasticsearch.TransportVersion;
|
import org.elasticsearch.TransportVersion;
|
||||||
|
import org.elasticsearch.TransportVersions;
|
||||||
import org.elasticsearch.cluster.ClusterModule;
|
import org.elasticsearch.cluster.ClusterModule;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterState.Custom;
|
import org.elasticsearch.cluster.ClusterState.Custom;
|
||||||
import org.elasticsearch.cluster.Diff;
|
import org.elasticsearch.cluster.Diff;
|
||||||
|
import org.elasticsearch.cluster.NamedDiff;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
|
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
|
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
import org.elasticsearch.cluster.SnapshotsInProgress.State;
|
||||||
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
|
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
|
||||||
import org.elasticsearch.cluster.version.CompatibilityVersions;
|
import org.elasticsearch.cluster.version.CompatibilityVersions;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -31,11 +35,13 @@ import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.IndexVersion;
|
import org.elasticsearch.index.IndexVersion;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.RepositoryOperation.ProjectRepo;
|
||||||
import org.elasticsearch.repositories.ShardGeneration;
|
import org.elasticsearch.repositories.ShardGeneration;
|
||||||
import org.elasticsearch.repositories.ShardSnapshotResult;
|
import org.elasticsearch.repositories.ShardSnapshotResult;
|
||||||
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
|
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
|
import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
|
||||||
|
import org.elasticsearch.test.TransportVersionUtils;
|
||||||
import org.elasticsearch.test.index.IndexVersionUtils;
|
import org.elasticsearch.test.index.IndexVersionUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -46,6 +52,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -60,10 +67,14 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Custom createTestInstance() {
|
protected Custom createTestInstance() {
|
||||||
int numberOfSnapshots = randomInt(10);
|
return createTestInstance(() -> randomSnapshot(randomProjectIdOrDefault()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Custom createTestInstance(Supplier<Entry> randomEntrySupplier) {
|
||||||
|
int numberOfSnapshots = randomInt(20);
|
||||||
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY;
|
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.EMPTY;
|
||||||
for (int i = 0; i < numberOfSnapshots; i++) {
|
for (int i = 0; i < numberOfSnapshots; i++) {
|
||||||
snapshotsInProgress = snapshotsInProgress.withAddedEntry(randomSnapshot());
|
snapshotsInProgress = snapshotsInProgress.withAddedEntry(randomEntrySupplier.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
final var nodeIdsForRemoval = randomList(3, ESTestCase::randomUUID);
|
final var nodeIdsForRemoval = randomList(3, ESTestCase::randomUUID);
|
||||||
|
@ -76,6 +87,36 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
return snapshotsInProgress;
|
return snapshotsInProgress;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSerializationBwc() throws IOException {
|
||||||
|
final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.PROJECT_ID_IN_SNAPSHOT);
|
||||||
|
final BytesStreamOutput out = new BytesStreamOutput();
|
||||||
|
out.setTransportVersion(oldVersion);
|
||||||
|
final Custom original = createTestInstance(() -> randomSnapshot(ProjectId.DEFAULT));
|
||||||
|
original.writeTo(out);
|
||||||
|
|
||||||
|
final var in = out.bytes().streamInput();
|
||||||
|
in.setTransportVersion(oldVersion);
|
||||||
|
final SnapshotsInProgress fromStream = new SnapshotsInProgress(in);
|
||||||
|
assertThat(fromStream, equalTo(original));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDiffSerializationBwc() throws IOException {
|
||||||
|
final var oldVersion = TransportVersionUtils.getPreviousVersion(TransportVersions.PROJECT_ID_IN_SNAPSHOT);
|
||||||
|
final BytesStreamOutput out = new BytesStreamOutput();
|
||||||
|
out.setTransportVersion(oldVersion);
|
||||||
|
|
||||||
|
final Custom before = createTestInstance(() -> randomSnapshot(ProjectId.DEFAULT));
|
||||||
|
final Custom after = makeTestChanges(before, () -> randomSnapshot(ProjectId.DEFAULT));
|
||||||
|
final Diff<Custom> diff = after.diff(before);
|
||||||
|
diff.writeTo(out);
|
||||||
|
|
||||||
|
final var in = out.bytes().streamInput();
|
||||||
|
in.setTransportVersion(oldVersion);
|
||||||
|
final NamedDiff<Custom> diffFromStream = SnapshotsInProgress.readDiffFrom(in);
|
||||||
|
|
||||||
|
assertThat(diffFromStream.apply(before), equalTo(after));
|
||||||
|
}
|
||||||
|
|
||||||
private ClusterState getClusterStateWithNodeShutdownMetadata(List<String> nodeIdsForRemoval) {
|
private ClusterState getClusterStateWithNodeShutdownMetadata(List<String> nodeIdsForRemoval) {
|
||||||
return CLUSTER_STATE_FOR_NODE_SHUTDOWNS.copyAndUpdateMetadata(
|
return CLUSTER_STATE_FOR_NODE_SHUTDOWNS.copyAndUpdateMetadata(
|
||||||
mdb -> mdb.putCustom(
|
mdb -> mdb.putCustom(
|
||||||
|
@ -100,7 +141,15 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
}
|
}
|
||||||
|
|
||||||
private Entry randomSnapshot() {
|
private Entry randomSnapshot() {
|
||||||
Snapshot snapshot = new Snapshot("repo-" + randomInt(5), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10)));
|
return randomSnapshot(randomProjectIdOrDefault());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Entry randomSnapshot(ProjectId projectId) {
|
||||||
|
Snapshot snapshot = new Snapshot(
|
||||||
|
projectId,
|
||||||
|
"repo-" + randomInt(5),
|
||||||
|
new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10))
|
||||||
|
);
|
||||||
boolean includeGlobalState = randomBoolean();
|
boolean includeGlobalState = randomBoolean();
|
||||||
boolean partial = randomBoolean();
|
boolean partial = randomBoolean();
|
||||||
int numberOfIndices = randomIntBetween(0, 10);
|
int numberOfIndices = randomIntBetween(0, 10);
|
||||||
|
@ -158,6 +207,10 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Custom makeTestChanges(Custom testInstance) {
|
protected Custom makeTestChanges(Custom testInstance) {
|
||||||
|
return makeTestChanges(testInstance, () -> randomSnapshot(randomProjectIdOrDefault()));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Custom makeTestChanges(Custom testInstance, Supplier<Entry> randomEntrySupplier) {
|
||||||
final SnapshotsInProgress snapshots = (SnapshotsInProgress) testInstance;
|
final SnapshotsInProgress snapshots = (SnapshotsInProgress) testInstance;
|
||||||
SnapshotsInProgress updatedInstance = SnapshotsInProgress.EMPTY;
|
SnapshotsInProgress updatedInstance = SnapshotsInProgress.EMPTY;
|
||||||
if (randomBoolean() && snapshots.count() > 1) {
|
if (randomBoolean() && snapshots.count() > 1) {
|
||||||
|
@ -178,7 +231,7 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
// add some elements
|
// add some elements
|
||||||
int addElements = randomInt(10);
|
int addElements = randomInt(10);
|
||||||
for (int i = 0; i < addElements; i++) {
|
for (int i = 0; i < addElements; i++) {
|
||||||
updatedInstance = updatedInstance.withAddedEntry(randomSnapshot());
|
updatedInstance = updatedInstance.withAddedEntry(randomEntrySupplier.get());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
|
@ -194,7 +247,8 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
entries = shuffledList(entries);
|
entries = shuffledList(entries);
|
||||||
}
|
}
|
||||||
updatedInstance = updatedInstance.withUpdatedEntriesForRepo(perRepoEntries.get(0).repository(), entries);
|
final Entry firstEntry = perRepoEntries.get(0);
|
||||||
|
updatedInstance = updatedInstance.withUpdatedEntriesForRepo(firstEntry.projectId(), firstEntry.repository(), entries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return updatedInstance;
|
return updatedInstance;
|
||||||
|
@ -219,10 +273,12 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
return snapshotsInProgress.withAddedEntry(randomSnapshot());
|
return snapshotsInProgress.withAddedEntry(randomSnapshot());
|
||||||
} else {
|
} else {
|
||||||
// mutate or remove an entry
|
// mutate or remove an entry
|
||||||
final String repo = randomFrom(
|
final var repo = randomFrom(
|
||||||
snapshotsInProgress.asStream().map(SnapshotsInProgress.Entry::repository).collect(Collectors.toSet())
|
snapshotsInProgress.asStream()
|
||||||
|
.map(entry -> new ProjectRepo(entry.projectId(), entry.repository()))
|
||||||
|
.collect(Collectors.toSet())
|
||||||
);
|
);
|
||||||
final List<Entry> forRepo = snapshotsInProgress.forRepo(repo);
|
final List<Entry> forRepo = snapshotsInProgress.forRepo(repo.projectId(), repo.name());
|
||||||
int index = randomIntBetween(0, forRepo.size() - 1);
|
int index = randomIntBetween(0, forRepo.size() - 1);
|
||||||
Entry entry = forRepo.get(index);
|
Entry entry = forRepo.get(index);
|
||||||
final List<Entry> updatedEntries = new ArrayList<>(forRepo);
|
final List<Entry> updatedEntries = new ArrayList<>(forRepo);
|
||||||
|
@ -231,7 +287,7 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
} else {
|
} else {
|
||||||
updatedEntries.remove(index);
|
updatedEntries.remove(index);
|
||||||
}
|
}
|
||||||
return snapshotsInProgress.withUpdatedEntriesForRepo(repo, updatedEntries);
|
return snapshotsInProgress.withUpdatedEntriesForRepo(repo.projectId(), repo.name(), updatedEntries);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return snapshotsInProgress.withUpdatedNodeIdsForRemoval(
|
return snapshotsInProgress.withUpdatedNodeIdsForRemoval(
|
||||||
|
@ -438,9 +494,10 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
|
|
||||||
public void testXContent() throws IOException {
|
public void testXContent() throws IOException {
|
||||||
final IndexId indexId = new IndexId("index", "uuid");
|
final IndexId indexId = new IndexId("index", "uuid");
|
||||||
|
final ProjectId projectId = ProjectId.fromId("some-project");
|
||||||
SnapshotsInProgress sip = SnapshotsInProgress.EMPTY.withAddedEntry(
|
SnapshotsInProgress sip = SnapshotsInProgress.EMPTY.withAddedEntry(
|
||||||
Entry.snapshot(
|
Entry.snapshot(
|
||||||
new Snapshot("repo", new SnapshotId("name", "uuid")),
|
new Snapshot(projectId, "repo", new SnapshotId("name", "uuid")),
|
||||||
true,
|
true,
|
||||||
true,
|
true,
|
||||||
State.SUCCESS,
|
State.SUCCESS,
|
||||||
|
@ -497,6 +554,7 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
{
|
{
|
||||||
"snapshots": [
|
"snapshots": [
|
||||||
{
|
{
|
||||||
|
"project_id": "some-project",
|
||||||
"repository": "repo",
|
"repository": "repo",
|
||||||
"snapshot": "name",
|
"snapshot": "name",
|
||||||
"uuid": "uuid",
|
"uuid": "uuid",
|
||||||
|
@ -547,6 +605,7 @@ public class SnapshotsInProgressSerializationTests extends SimpleDiffableWireSer
|
||||||
{
|
{
|
||||||
"snapshots": [
|
"snapshots": [
|
||||||
{
|
{
|
||||||
|
"project_id": "some-project",
|
||||||
"repository": "repo",
|
"repository": "repo",
|
||||||
"snapshot": "name",
|
"snapshot": "name",
|
||||||
"uuid": "uuid",
|
"uuid": "uuid",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue