Add project-id to SnapshotDeletionsInProgress and RepositoryCleanupInProgress (#129462)

This PR adds project-id to both SnapshotDeletionsInProgress and
RepositoryCleanupInProgress so that they become project aware. Note that
making service code to configure and use the project-id accordingly will
be worked on separately.

Resolves: ES-11380 Relates: #125470
This commit is contained in:
Yang Wang 2025-06-17 14:38:54 +10:00 committed by GitHub
parent adf4d1005f
commit c8233588a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 369 additions and 33 deletions

View file

@ -160,7 +160,7 @@ org.elasticsearch.cluster.ClusterFeatures#clusterHasFeature(org.elasticsearch.cl
@defaultMessage Do not construct this records outside the source files they are declared in @defaultMessage Do not construct this records outside the source files they are declared in
org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult) org.elasticsearch.cluster.SnapshotsInProgress$ShardSnapshotStatus#<init>(java.lang.String, org.elasticsearch.cluster.SnapshotsInProgress$ShardState, org.elasticsearch.repositories.ShardGeneration, java.lang.String, org.elasticsearch.repositories.ShardSnapshotResult)
org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.String, java.util.List, long, long, org.elasticsearch.cluster.SnapshotDeletionsInProgress$State, java.lang.String) org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(org.elasticsearch.cluster.metadata.ProjectId, java.lang.String, java.util.List, long, long, org.elasticsearch.cluster.SnapshotDeletionsInProgress$State, java.lang.String)
@defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug @defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
java.lang.Thread#<init>(java.lang.Runnable) java.lang.Thread#<init>(java.lang.Runnable)

View file

@ -300,6 +300,7 @@ public class TransportVersions {
public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00); public static final TransportVersion PROJECT_DELETION_GLOBAL_BLOCK = def(9_098_0_00);
public static final TransportVersion SECURITY_CLOUD_API_KEY_REALM_AND_TYPE = def(9_099_0_00); public static final TransportVersion SECURITY_CLOUD_API_KEY_REALM_AND_TYPE = def(9_099_0_00);
public static final TransportVersion STATE_PARAM_GET_SNAPSHOT = def(9_100_0_00); public static final TransportVersion STATE_PARAM_GET_SNAPSHOT = def(9_100_0_00);
public static final TransportVersion PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP = def(9_101_0_00);
/* /*
* STOP! READ THIS FIRST! No, really, * STOP! READ THIS FIRST! No, really,

View file

@ -21,11 +21,13 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.injection.guice.Inject;
@ -198,11 +200,13 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]" "Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]"
); );
} }
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
return ClusterState.builder(currentState) return ClusterState.builder(currentState)
.putCustom( .putCustom(
RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress( new RepositoryCleanupInProgress(
List.of(RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId)) List.of(RepositoryCleanupInProgress.startedEntry(projectId, repositoryName, repositoryStateId))
) )
) )
.build(); .build();

View file

@ -10,6 +10,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions; import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -21,6 +22,9 @@ import org.elasticsearch.xcontent.ToXContent;
import java.io.IOException; import java.io.IOException;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects;
import static org.elasticsearch.TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
/** /**
* A repository cleanup request entry. Part of the cluster state. * A repository cleanup request entry. Part of the cluster state.
@ -49,8 +53,8 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
return readDiffFrom(ClusterState.Custom.class, TYPE, in); return readDiffFrom(ClusterState.Custom.class, TYPE, in);
} }
public static Entry startedEntry(String repository, long repositoryStateId) { public static Entry startedEntry(ProjectId projectId, String repository, long repositoryStateId) {
return new Entry(repository, repositoryStateId); return new Entry(projectId, repository, repositoryStateId);
} }
public boolean hasCleanupInProgress() { public boolean hasCleanupInProgress() {
@ -86,6 +90,18 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
); );
} }
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
RepositoryCleanupInProgress that = (RepositoryCleanupInProgress) o;
return Objects.equals(entries, that.entries);
}
@Override
public int hashCode() {
return Objects.hashCode(entries);
}
@Override @Override
public String toString() { public String toString() {
return Strings.toString(this); return Strings.toString(this);
@ -96,10 +112,13 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
return TransportVersions.ZERO; return TransportVersions.ZERO;
} }
public record Entry(String repository, long repositoryStateId) implements Writeable, RepositoryOperation { public record Entry(ProjectId projectId, String repository, long repositoryStateId) implements Writeable, RepositoryOperation {
public static Entry readFrom(StreamInput in) throws IOException { public static Entry readFrom(StreamInput in) throws IOException {
return new Entry(in.readString(), in.readLong()); final ProjectId projectId = in.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)
? ProjectId.readFrom(in)
: ProjectId.DEFAULT;
return new Entry(projectId, in.readString(), in.readLong());
} }
@Override @Override
@ -107,6 +126,11 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
return repositoryStateId; return repositoryStateId;
} }
@Override
public ProjectId projectId() {
return projectId;
}
@Override @Override
public String repository() { public String repository() {
return repository; return repository;
@ -114,6 +138,18 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<Clu
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)) {
projectId.writeTo(out);
} else {
if (ProjectId.DEFAULT.equals(projectId) == false) {
final var message = "Cannot write repository cleanup entry with non-default project id "
+ projectId
+ " to version before "
+ PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
assert false : message;
throw new IllegalStateException(message);
}
}
out.writeString(repository); out.writeString(repository);
out.writeLong(repositoryStateId); out.writeLong(repositoryStateId);
} }

View file

@ -12,6 +12,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions; import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -32,6 +33,8 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import static org.elasticsearch.TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
/** /**
* Represents the in-progress snapshot deletions in the cluster state. * Represents the in-progress snapshot deletions in the cluster state.
*/ */
@ -174,6 +177,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
Iterators.map(entries.iterator(), entry -> (builder, params) -> { Iterators.map(entries.iterator(), entry -> (builder, params) -> {
builder.startObject(); builder.startObject();
{ {
builder.field("project_id", entry.projectId);
builder.field("repository", entry.repository()); builder.field("repository", entry.repository());
builder.startArray("snapshots"); builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) { for (SnapshotId snapshot : entry.snapshots) {
@ -206,14 +210,26 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
/** /**
* A class representing a snapshot deletion request entry in the cluster state. * A class representing a snapshot deletion request entry in the cluster state.
*/ */
public record Entry(String repoName, List<SnapshotId> snapshots, long startTime, long repositoryStateId, State state, String uuid) public record Entry(
implements ProjectId projectId,
Writeable, String repoName,
RepositoryOperation { List<SnapshotId> snapshots,
long startTime,
long repositoryStateId,
State state,
String uuid
) implements Writeable, RepositoryOperation {
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
public Entry(String repoName, List<SnapshotId> snapshots, long startTime, long repositoryStateId, State state) { public Entry(
this(repoName, snapshots, startTime, repositoryStateId, state, UUIDs.randomBase64UUID()); ProjectId projectId,
String repoName,
List<SnapshotId> snapshots,
long startTime,
long repositoryStateId,
State state
) {
this(projectId, repoName, snapshots, startTime, repositoryStateId, state, UUIDs.randomBase64UUID());
} }
public Entry { public Entry {
@ -222,7 +238,11 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
public static Entry readFrom(StreamInput in) throws IOException { public static Entry readFrom(StreamInput in) throws IOException {
final ProjectId projectId = in.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)
? ProjectId.readFrom(in)
: ProjectId.DEFAULT;
return new Entry( return new Entry(
projectId,
in.readString(), in.readString(),
in.readCollectionAsImmutableList(SnapshotId::new), in.readCollectionAsImmutableList(SnapshotId::new),
in.readVLong(), in.readVLong(),
@ -235,7 +255,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
public Entry started() { public Entry started() {
assert state == State.WAITING; assert state == State.WAITING;
return new Entry(repository(), snapshots, startTime, repositoryStateId, State.STARTED, uuid); return new Entry(projectId(), repository(), snapshots, startTime, repositoryStateId, State.STARTED, uuid);
} }
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
@ -245,21 +265,33 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
if (updatedSnapshots.addAll(newSnapshots) == false) { if (updatedSnapshots.addAll(newSnapshots) == false) {
return this; return this;
} }
return new Entry(repository(), List.copyOf(updatedSnapshots), startTime, repositoryStateId, State.WAITING, uuid); return new Entry(projectId(), repository(), List.copyOf(updatedSnapshots), startTime, repositoryStateId, State.WAITING, uuid);
} }
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
public Entry withSnapshots(Collection<SnapshotId> snapshots) { public Entry withSnapshots(Collection<SnapshotId> snapshots) {
return new Entry(repository(), List.copyOf(snapshots), startTime, repositoryStateId, state, uuid); return new Entry(projectId(), repository(), List.copyOf(snapshots), startTime, repositoryStateId, state, uuid);
} }
@SuppressForbidden(reason = "using a private constructor within the same file") @SuppressForbidden(reason = "using a private constructor within the same file")
public Entry withRepoGen(long repoGen) { public Entry withRepoGen(long repoGen) {
return new Entry(repository(), snapshots, startTime, repoGen, state, uuid); return new Entry(projectId(), repository(), snapshots, startTime, repoGen, state, uuid);
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)) {
projectId.writeTo(out);
} else {
if (ProjectId.DEFAULT.equals(projectId) == false) {
final var message = "Cannot write snapshot deletion entry with non-default project id "
+ projectId
+ " to version before "
+ PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP;
assert false : message;
throw new IllegalStateException(message);
}
}
out.writeString(repoName); out.writeString(repoName);
out.writeCollection(snapshots); out.writeCollection(snapshots);
out.writeVLong(startTime); out.writeVLong(startTime);
@ -268,6 +300,11 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
out.writeString(uuid); out.writeString(uuid);
} }
@Override
public ProjectId projectId() {
return projectId;
}
@Override @Override
public String repository() { public String repository() {
return repoName; return repoName;

View file

@ -9,12 +9,10 @@
package org.elasticsearch.repositories; package org.elasticsearch.repositories;
import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.DiffableUtils;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.FixForMultiProject;
import java.io.IOException; import java.io.IOException;
@ -26,10 +24,7 @@ public interface RepositoryOperation {
/** /**
* Project for which repository belongs to. * Project for which repository belongs to.
*/ */
@FixForMultiProject(description = "default implementation is temporary") ProjectId projectId();
default ProjectId projectId() {
return Metadata.DEFAULT_PROJECT_ID;
}
/** /**
* Name of the repository affected. * Name of the repository affected.

View file

@ -47,6 +47,7 @@ import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@ -2255,7 +2256,10 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
reusedExistingDelete = true; reusedExistingDelete = true;
return currentState; return currentState;
} }
@FixForMultiProject
final var projectId = ProjectId.DEFAULT;
newDelete = new SnapshotDeletionsInProgress.Entry( newDelete = new SnapshotDeletionsInProgress.Entry(
projectId,
repositoryName, repositoryName,
List.copyOf(snapshotIdsRequiringCleanup), List.copyOf(snapshotIdsRequiringCleanup),
threadPool.absoluteTimeInMillis(), threadPool.absoluteTimeInMillis(),

View file

@ -10,6 +10,7 @@
package org.elasticsearch.cluster; package org.elasticsearch.cluster;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
@ -434,6 +435,7 @@ public class ClusterSnapshotStatsTests extends AbstractWireSerializingTestCase<C
SnapshotDeletionsInProgress.of( SnapshotDeletionsInProgress.of(
List.of( List.of(
new SnapshotDeletionsInProgress.Entry( new SnapshotDeletionsInProgress.Entry(
ProjectId.DEFAULT,
"test-repo", "test-repo",
List.of(new SnapshotId("deleting", "uuid")), List.of(new SnapshotId("deleting", "uuid")),
startTimes[2], startTimes[2],
@ -446,7 +448,7 @@ public class ClusterSnapshotStatsTests extends AbstractWireSerializingTestCase<C
.putCustom( .putCustom(
RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress( new RepositoryCleanupInProgress(
List.of(new RepositoryCleanupInProgress.Entry("test-repo", randomNonNegativeLong())) List.of(new RepositoryCleanupInProgress.Entry(ProjectId.DEFAULT, "test-repo", randomNonNegativeLong()))
) )
) )
.build(), .build(),

View file

@ -9,14 +9,132 @@
package org.elasticsearch.cluster; package org.elasticsearch.cluster;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo;
public class RepositoryCleanupInProgressTests extends SimpleDiffableWireSerializationTestCase<ClusterState.Custom> {
private Supplier<ProjectId> projectIdSupplier;
@Before
public void setUp() throws Exception {
super.setUp();
projectIdSupplier = ESTestCase::randomProjectIdOrDefault;
}
@Override
protected ClusterState.Custom createTestInstance() {
return new RepositoryCleanupInProgress(randomList(0, 3, this::randomCleanupEntry));
}
@Override
protected Writeable.Reader<ClusterState.Custom> instanceReader() {
return RepositoryCleanupInProgress::new;
}
@Override
protected Writeable.Reader<Diff<ClusterState.Custom>> diffReader() {
return RepositoryCleanupInProgress::readDiffFrom;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}
@Override
protected ClusterState.Custom mutateInstance(ClusterState.Custom instance) throws IOException {
return makeTestChanges(instance);
}
@Override
protected ClusterState.Custom makeTestChanges(ClusterState.Custom testInstance) {
RepositoryCleanupInProgress original = (RepositoryCleanupInProgress) testInstance;
final List<RepositoryCleanupInProgress.Entry> entries = original.entries();
if (entries.isEmpty()) {
return new RepositoryCleanupInProgress(randomList(1, 3, this::randomCleanupEntry));
}
final int testVariant = between(0, 1);
switch (testVariant) {
case 0: // Add a new entry
return new RepositoryCleanupInProgress(CollectionUtils.appendToCopy(entries, randomCleanupEntry()));
case 1: // Remove an existing entry
return new RepositoryCleanupInProgress(randomSubsetOf(between(0, entries.size() - 1), entries));
default:
throw new AssertionError("Unexpected variant: " + testVariant);
}
}
private RepositoryCleanupInProgress.Entry randomCleanupEntry() {
return RepositoryCleanupInProgress.startedEntry(projectIdSupplier.get(), randomIdentifier(), randomLongBetween(0, 9999));
}
public void testSerializationBwc() throws IOException {
projectIdSupplier = () -> ProjectId.DEFAULT;
final var oldVersion = TransportVersionUtils.getPreviousVersion(
TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(oldVersion);
final ClusterState.Custom original = createTestInstance();
original.writeTo(out);
final var in = out.bytes().streamInput();
in.setTransportVersion(oldVersion);
final RepositoryCleanupInProgress fromStream = new RepositoryCleanupInProgress(in);
assertThat(fromStream, equalTo(original));
}
public void testDiffSerializationBwc() throws IOException {
projectIdSupplier = () -> ProjectId.DEFAULT;
final var oldVersion = TransportVersionUtils.getPreviousVersion(
TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(oldVersion);
final ClusterState.Custom before = createTestInstance();
final ClusterState.Custom after = makeTestChanges(before);
final Diff<ClusterState.Custom> diff = after.diff(before);
diff.writeTo(out);
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry());
in.setTransportVersion(oldVersion);
final NamedDiff<ClusterState.Custom> diffFromStream = RepositoryCleanupInProgress.readDiffFrom(in);
assertThat(diffFromStream.apply(before), equalTo(after));
}
public class RepositoryCleanupInProgressTests extends ESTestCase {
public void testChunking() { public void testChunking() {
AbstractChunkedSerializingTestCase.assertChunkCount( AbstractChunkedSerializingTestCase.assertChunkCount(
new RepositoryCleanupInProgress( new RepositoryCleanupInProgress(
randomList(10, () -> new RepositoryCleanupInProgress.Entry(randomAlphaOfLength(10), randomNonNegativeLong())) randomList(
10,
() -> new RepositoryCleanupInProgress.Entry(
randomProjectIdOrDefault(),
randomAlphaOfLength(10),
randomNonNegativeLong()
)
)
), ),
i -> i.entries().size() + 2 i -> i.entries().size() + 2
); );

View file

@ -9,26 +9,160 @@
package org.elasticsearch.cluster; package org.elasticsearch.cluster;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.SimpleDiffableWireSerializationTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.function.Supplier;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public class SnapshotDeletionsInProgressTests extends ESTestCase { public class SnapshotDeletionsInProgressTests extends SimpleDiffableWireSerializationTestCase<ClusterState.Custom> {
private Supplier<ProjectId> projectIdSupplier;
@Before
public void setUp() throws Exception {
super.setUp();
projectIdSupplier = ESTestCase::randomProjectIdOrDefault;
}
@Override
protected ClusterState.Custom createTestInstance() {
return SnapshotDeletionsInProgress.of(randomList(0, 3, this::randomDeletionEntry));
}
@Override
protected Writeable.Reader<ClusterState.Custom> instanceReader() {
return SnapshotDeletionsInProgress::new;
}
@Override
protected Writeable.Reader<Diff<ClusterState.Custom>> diffReader() {
return SnapshotDeletionsInProgress::readDiffFrom;
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(ClusterModule.getNamedWriteables());
}
@Override
protected ClusterState.Custom mutateInstance(ClusterState.Custom instance) throws IOException {
return makeTestChanges(instance);
}
@Override
protected ClusterState.Custom makeTestChanges(ClusterState.Custom testInstance) {
SnapshotDeletionsInProgress original = (SnapshotDeletionsInProgress) testInstance;
List<SnapshotDeletionsInProgress.Entry> entries = original.getEntries();
if (entries.isEmpty()) {
return SnapshotDeletionsInProgress.of(randomList(1, 3, this::randomDeletionEntry));
}
final SnapshotDeletionsInProgress.Entry entry = randomFrom(entries);
final int testVariant = between(0, 3);
switch (testVariant) {
case 0: // Add a new entry
return original.withAddedEntry(randomDeletionEntry());
case 1: // Remove an existing entry
return original.withRemovedEntry(entry.uuid());
case 2: // With new repo generation or started
if (entry.state() != SnapshotDeletionsInProgress.State.STARTED) {
return original.withRemovedEntry(entry.uuid()).withAddedEntry(entry.started());
} else {
return original.withRemovedEntry(entry.uuid())
.withAddedEntry(entry.withRepoGen(entry.repositoryStateId() + randomLongBetween(1, 1000)));
}
case 3: // with new or removed snapshot
final SnapshotDeletionsInProgress.Entry updatedEntry;
if (entry.snapshots().isEmpty()) {
if (entry.state() != SnapshotDeletionsInProgress.State.STARTED) {
updatedEntry = entry.withAddedSnapshots(randomList(1, 3, () -> new SnapshotId(randomUUID(), randomUUID())));
} else {
updatedEntry = entry.withSnapshots(randomList(1, 3, () -> new SnapshotId(randomUUID(), randomUUID())));
}
} else {
updatedEntry = entry.withSnapshots(randomSubsetOf(between(0, entry.snapshots().size() - 1), entry.snapshots()));
}
return original.withRemovedEntry(entry.uuid()).withAddedEntry(updatedEntry);
default:
throw new AssertionError("Unexpected variant: " + testVariant);
}
}
private SnapshotDeletionsInProgress.Entry randomDeletionEntry() {
final List<SnapshotId> snapshots = randomList(0, 3, () -> new SnapshotId(randomUUID(), randomUUID()));
return new SnapshotDeletionsInProgress.Entry(
projectIdSupplier.get(),
randomIdentifier(),
snapshots,
randomLongBetween(0, 9999),
randomLongBetween(0, 9999),
snapshots.isEmpty() ? SnapshotDeletionsInProgress.State.WAITING : randomFrom(SnapshotDeletionsInProgress.State.values())
);
}
public void testSerializationBwc() throws IOException {
projectIdSupplier = () -> ProjectId.DEFAULT;
final var oldVersion = TransportVersionUtils.getPreviousVersion(
TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(oldVersion);
final ClusterState.Custom original = createTestInstance();
original.writeTo(out);
final var in = out.bytes().streamInput();
in.setTransportVersion(oldVersion);
final SnapshotDeletionsInProgress fromStream = new SnapshotDeletionsInProgress(in);
assertThat(fromStream, equalTo(original));
}
public void testDiffSerializationBwc() throws IOException {
projectIdSupplier = () -> ProjectId.DEFAULT;
final var oldVersion = TransportVersionUtils.getPreviousVersion(
TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP
);
final BytesStreamOutput out = new BytesStreamOutput();
out.setTransportVersion(oldVersion);
final ClusterState.Custom before = createTestInstance();
final ClusterState.Custom after = makeTestChanges(before);
final Diff<ClusterState.Custom> diff = after.diff(before);
diff.writeTo(out);
final var in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), getNamedWriteableRegistry());
in.setTransportVersion(oldVersion);
final NamedDiff<ClusterState.Custom> diffFromStream = SnapshotDeletionsInProgress.readDiffFrom(in);
assertThat(diffFromStream.apply(before), equalTo(after));
}
public void testXContent() throws IOException { public void testXContent() throws IOException {
final var projectId = randomProjectIdOrDefault();
SnapshotDeletionsInProgress sdip = SnapshotDeletionsInProgress.of( SnapshotDeletionsInProgress sdip = SnapshotDeletionsInProgress.of(
List.of( List.of(
new SnapshotDeletionsInProgress.Entry( new SnapshotDeletionsInProgress.Entry(
projectId,
"repo", "repo",
Collections.emptyList(), Collections.emptyList(),
736694267638L, 736694267638L,
@ -44,10 +178,11 @@ public class SnapshotDeletionsInProgressTests extends ESTestCase {
ChunkedToXContent.wrapAsToXContent(sdip).toXContent(builder, ToXContent.EMPTY_PARAMS); ChunkedToXContent.wrapAsToXContent(sdip).toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject(); builder.endObject();
String json = Strings.toString(builder); String json = Strings.toString(builder);
assertThat(json, equalTo(XContentHelper.stripWhitespace(""" assertThat(json, equalTo(XContentHelper.stripWhitespace(Strings.format("""
{ {
"snapshot_deletions": [ "snapshot_deletions": [
{ {
"project_id": "%s",
"repository": "repo", "repository": "repo",
"snapshots": [], "snapshots": [],
"start_time": "1993-05-06T13:17:47.638Z", "start_time": "1993-05-06T13:17:47.638Z",
@ -56,7 +191,7 @@ public class SnapshotDeletionsInProgressTests extends ESTestCase {
"state": "STARTED" "state": "STARTED"
} }
] ]
}"""))); }""", projectId.id()))));
} }
} }
@ -66,6 +201,7 @@ public class SnapshotDeletionsInProgressTests extends ESTestCase {
randomList( randomList(
10, 10,
() -> new SnapshotDeletionsInProgress.Entry( () -> new SnapshotDeletionsInProgress.Entry(
randomProjectIdOrDefault(),
randomAlphaOfLength(10), randomAlphaOfLength(10),
Collections.emptyList(), Collections.emptyList(),
randomNonNegativeLong(), randomNonNegativeLong(),

View file

@ -170,6 +170,11 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
} }
public void testSnapshotDeletionsInProgressSerialization() throws Exception { public void testSnapshotDeletionsInProgressSerialization() throws Exception {
TransportVersion version = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersions.MINIMUM_COMPATIBLE,
TransportVersion.current()
);
boolean includeRestore = randomBoolean(); boolean includeRestore = randomBoolean();
@ -179,6 +184,9 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
SnapshotDeletionsInProgress.of( SnapshotDeletionsInProgress.of(
List.of( List.of(
new SnapshotDeletionsInProgress.Entry( new SnapshotDeletionsInProgress.Entry(
version.onOrAfter(TransportVersions.PROJECT_ID_IN_SNAPSHOTS_DELETIONS_AND_REPO_CLEANUP)
? randomProjectIdOrDefault()
: ProjectId.DEFAULT,
"repo1", "repo1",
Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())), Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())),
randomNonNegativeLong(), randomNonNegativeLong(),
@ -210,11 +218,6 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
// serialize with current version // serialize with current version
BytesStreamOutput outStream = new BytesStreamOutput(); BytesStreamOutput outStream = new BytesStreamOutput();
TransportVersion version = TransportVersionUtils.randomVersionBetween(
random(),
TransportVersions.MINIMUM_COMPATIBLE,
TransportVersion.current()
);
outStream.setTransportVersion(version); outStream.setTransportVersion(version);
diffs.writeTo(outStream); diffs.writeTo(outStream);
StreamInput inStream = outStream.bytes().streamInput(); StreamInput inStream = outStream.bytes().streamInput();