From e1c930f8c18e88528059b4ac979c915a388a51d3 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Wed, 25 Jun 2025 10:34:34 +1000 Subject: [PATCH] Make RepositoriesService project-aware (#129821) This PR makes RepositoriesService project aware so that the basic Put, Get, Delete and Verify repository actions are now project scoped. It intentionally leaves the following aspects out of scope for the current changes: * Repository stats reporting * Repository clean-up, analysis and integrity verification * Repository usages for searchable snapshots and CCR They will be worked on separately. One main reason for leaving them out is that they are not needed by OBS which is currently blocked by repository/snapshot changes. They may also have their own complexities, e.g. stats reporting. Resolves: ES-10478 --- .../repositories/RepositoriesServiceIT.java | 5 +- .../TransportDeleteRepositoryAction.java | 10 +- .../get/TransportGetRepositoriesAction.java | 19 +- .../put/TransportPutRepositoryAction.java | 10 +- .../ReservedRepositoryAction.java | 20 +- .../TransportVerifyRepositoryAction.java | 9 +- .../get/TransportGetSnapshotsAction.java | 2 +- .../TransportSnapshotsStatusAction.java | 8 +- .../tracker/RepositoriesHealthTracker.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 8 +- .../repositories/IndexSnapshotsService.java | 8 +- .../repositories/RepositoriesService.java | 540 +++++++++++------- .../repositories/RepositoryOperation.java | 13 + .../repositories/ResolvedRepositories.java | 8 +- .../VerifyNodeRepositoryAction.java | 8 +- .../blobstore/BlobStoreRepository.java | 6 +- .../snapshots/RestoreService.java | 4 +- .../snapshots/SnapshotShardsService.java | 2 +- .../snapshots/SnapshotsService.java | 10 +- .../ReservedRepositoryActionTests.java | 5 +- .../RepositoriesHealthTrackerTests.java | 11 +- .../RepositoriesServiceTests.java | 225 ++++++-- .../ResolvedRepositoriesTests.java | 41 +- .../BlobStoreRepositoryRestoreTests.java | 8 +- .../blobstore/BlobStoreRepositoryTests.java | 6 +- .../snapshots/RestoreServiceTests.java | 3 +- .../snapshots/SnapshotResiliencyTests.java | 12 +- .../blobstore/BlobStoreTestUtil.java | 6 +- .../DeleteInternalCcrRepositoryAction.java | 6 +- .../PutInternalCcrRepositoryAction.java | 6 +- .../SourceOnlySnapshotShardTests.java | 12 +- .../store/RepositorySupplier.java | 8 +- .../SearchableSnapshotDirectoryTests.java | 6 +- .../build.gradle | 5 +- 34 files changed, 725 insertions(+), 327 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java index ecf910a7ab05..c64e45dd6827 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java @@ -11,6 +11,7 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; @@ -65,7 +66,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase { assertThat(originalRepositoryMetadata.type(), equalTo(FsRepository.TYPE)); - final Repository originalRepository = repositoriesService.repository(repositoryName); + final Repository originalRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(originalRepository, instanceOf(FsRepository.class)); final boolean updated = randomBoolean(); @@ -89,7 +90,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase { assertThat(updatedRepositoryMetadata.type(), equalTo(updatedRepositoryType)); - final Repository updatedRepository = repositoriesService.repository(repositoryName); + final Repository updatedRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository)); // check that a noop update does not verify. Since the new data node does not share the same `path.repo`, verification will fail if diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index fc01c0fecca0..d52e7e91d4f4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/delete"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportDeleteRepositoryAction( @@ -43,7 +45,8 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster ClusterState state, final ActionListener listener ) { - repositoriesService.unregisterRepository(request, listener); + repositoriesService.unregisterRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java index dbb448b8153e..404ebab5edd1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java @@ -11,11 +11,12 @@ package org.elasticsearch.action.admin.cluster.repositories.get; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -28,14 +29,15 @@ import org.elasticsearch.transport.TransportService; /** * Transport action for get repositories operation */ -public class TransportGetRepositoriesAction extends TransportMasterNodeReadAction { +public class TransportGetRepositoriesAction extends TransportMasterNodeReadProjectAction { @Inject public TransportGetRepositoriesAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( GetRepositoriesAction.NAME, @@ -44,24 +46,25 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadActio threadPool, actionFilters, GetRepositoriesRequest::new, + projectResolver, GetRepositoriesResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); } @Override - protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ProjectState state) { + return state.blocks().globalBlockedException(state.projectId(), ClusterBlockLevel.METADATA_READ); } @Override protected void masterOperation( Task task, final GetRepositoriesRequest request, - ClusterState state, + ProjectState state, final ActionListener listener ) { - final var result = ResolvedRepositories.resolve(state, request.repositories()); + final var result = ResolvedRepositories.resolve(state.metadata(), request.repositories()); if (result.hasMissingRepositories()) { listener.onFailure(new RepositoryMissingException(String.join(", ", result.missing()))); } else { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 2456bc6a7c6c..04de5d876efa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/put"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportPutRepositoryAction( @@ -43,7 +45,8 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(PutRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod ClusterState state, final ActionListener listener ) { - repositoriesService.registerRepository(request, listener); + repositoriesService.registerRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java index ee6e7f320f51..35ee66834716 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java @@ -11,6 +11,8 @@ package org.elasticsearch.action.admin.cluster.repositories.reservedstate; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.reservedstate.ReservedClusterStateHandler; import org.elasticsearch.reservedstate.TransformState; @@ -60,7 +62,9 @@ public class ReservedRepositoryAction implements ReservedClusterStateHandler { private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportVerifyRepositoryAction( @@ -37,7 +39,8 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction listener ) { repositoriesService.verifyRepository( + projectResolver.getProjectId(), request.name(), listener.map(verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index eab763165a74..7393ab23d889 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -144,7 +144,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction requestedSnapshotNames = Sets.newHashSet(request.snapshots()); final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + @FixForMultiProject(description = "resolve the actual projectId, ES-10166") + final var projectId = ProjectId.DEFAULT; + repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener); final Collection snapshotIdsToLoad = new ArrayList<>(); repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> { task.ensureNotCancelled(); diff --git a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java index 5fd573a3f665..a85a9bf73bea 100644 --- a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java +++ b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java @@ -42,7 +42,7 @@ public class RepositoriesHealthTracker extends HealthTracker(); var invalid = new ArrayList(); - repositories.values().forEach(repository -> { + repositories.forEach(repository -> { if (repository instanceof UnknownTypeRepository) { unknown.add(repository.getMetadata().name()); } else if (repository instanceof InvalidRepository) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 92dfd74891e0..76ecd8141f79 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -154,6 +155,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; @@ -3527,12 +3529,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } case SNAPSHOT -> { - final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); + final Snapshot snapshot = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot(); + final ProjectId projectId = snapshot.getProjectId(); + final String repo = snapshot.getRepository(); executeRecovery( "from snapshot", recoveryState, recoveryListener, - l -> restoreFromRepository(repositoriesService.repository(repo), l) + l -> restoreFromRepository(repositoriesService.repository(projectId, repo), l) ); } case LOCAL_SHARDS -> { diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java index bd86ab93d30e..c46012e8a418 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; @@ -28,7 +30,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; public class IndexSnapshotsService { @@ -136,8 +137,9 @@ public class IndexSnapshotsService { } private Repository getRepository(String repositoryName) { - final Map repositories = repositoriesService.getRepositories(); - return repositories.get(repositoryName); + @FixForMultiProject(description = "resolve the actual projectId, ES-12176") + final var projectId = ProjectId.DEFAULT; + return repositoriesService.repositoryOrNull(projectId, repositoryName); } private static class FetchShardSnapshotContext { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 6f009ed92cf9..6248e74bee10 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; @@ -44,6 +45,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -66,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -73,6 +77,7 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; @@ -84,8 +89,9 @@ import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE * factory information to create new repositories, and provides access to and maintains the lifecycle of repositories. New nodes can easily * find all the repositories via the cluster state after joining a cluster. * - * {@link #repository(String)} can be used to fetch a repository. {@link #createRepository(RepositoryMetadata)} does the heavy lifting of - * creation. {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. + * {@link #repository(ProjectId, String)} can be used to fetch a repository. + * {@link #createRepository(ProjectId, RepositoryMetadata)} does the heavy lifting of creation. + * {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. */ public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -112,8 +118,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final ThreadPool threadPool; private final NodeClient client; - private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); - private volatile Map repositories = Collections.emptyMap(); + private final Map> internalRepositories = ConcurrentCollections.newConcurrentMap(); + private final Map> repositories = ConcurrentCollections.newConcurrentMap(); private final RepositoriesStatsArchive repositoriesStatsArchive; private final List> preRestoreChecks; @@ -154,10 +160,15 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * This method can be only called on the master node. * It tries to create a new repository on the master, and if it was successful, it adds a new repository to cluster metadata. * + * @param projectId the project ID to which the repository belongs. * @param request register repository request * @param responseListener register repository listener */ - public void registerRepository(final PutRepositoryRequest request, final ActionListener responseListener) { + public void registerRepository( + final ProjectId projectId, + final PutRepositoryRequest request, + final ActionListener responseListener + ) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; validateRepositoryName(request.name()); @@ -167,7 +178,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C SubscribableListener // Trying to create the new repository on master to make sure it works - .newForked(validationStep -> validatePutRepositoryRequest(request, validationStep)) + .newForked(validationStep -> validatePutRepositoryRequest(projectId, request, validationStep)) // When publication has completed (and all acks received or timed out) then verify the repository. // (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have @@ -176,11 +187,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C final ListenableFuture acknowledgementStep = new ListenableFuture<>(); final ListenableFuture publicationStep = new ListenableFuture<>(); // Boolean==changed. submitUnbatchedTask( - "put_repository [" + request.name() + "]", - new RegisterRepositoryTask(this, request, acknowledgementStep) { + "put_repository " + projectRepoString(projectId, request.name()), + new RegisterRepositoryTask(this, projectId, request, acknowledgementStep) { @Override public void onFailure(Exception e) { - logger.warn(() -> "failed to create repository [" + request.name() + "]", e); + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, request.name()), e); publicationStep.onFailure(e); super.onFailure(e); } @@ -195,9 +206,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { if (changed) { if (found) { - logger.info("updated repository [{}]", request.name()); + logger.info("updated repository {}", projectRepoString(projectId, request.name())); } else { - logger.info("put repository [{}]", request.name()); + logger.info("put repository {}", projectRepoString(projectId, request.name())); } } publicationStep.onResponse(oldState != newState); @@ -220,7 +231,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .>newForked(verifyRepositoryStep -> { if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) { - verifyRepository(request.name(), verifyRepositoryStep); + final ThreadContext threadContext = threadPool.getThreadContext(); + verifyRepository(projectId, request.name(), verifyRepositoryStep); } else { verifyRepositoryStep.onResponse(null); } @@ -231,7 +243,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .execute( ActionRunnable.wrap( getRepositoryDataStep, - ll -> repository(request.name()).getRepositoryData( + ll -> repository(projectId, request.name()).getRepositoryData( // TODO contemplate threading, do we need to fork, see #101445? EsExecutors.DIRECT_EXECUTOR_SERVICE, ll @@ -243,6 +255,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .andThen( (updateRepoUuidStep, repositoryData) -> updateRepositoryUuidInMetadata( clusterService, + projectId, request.name(), repositoryData, updateRepoUuidStep @@ -263,16 +276,19 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C protected boolean found = false; protected boolean changed = false; + private final ProjectId projectId; private final PutRepositoryRequest request; private final RepositoriesService repositoriesService; RegisterRepositoryTask( final RepositoriesService repositoriesService, + final ProjectId projectId, final PutRepositoryRequest request, final ListenableFuture acknowledgementStep ) { super(request, acknowledgementStep); this.repositoriesService = repositoriesService; + this.projectId = projectId; this.request = request; } @@ -281,14 +297,18 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * @param repositoriesService * @param request */ - public RegisterRepositoryTask(final RepositoriesService repositoriesService, final PutRepositoryRequest request) { - this(repositoriesService, request, null); + public RegisterRepositoryTask( + final RepositoriesService repositoriesService, + final ProjectId projectId, + final PutRepositoryRequest request + ) { + this(repositoriesService, projectId, request, null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (repositoryMetadata.name().equals(request.name())) { @@ -304,10 +324,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C request.type(), request.settings() ); - Repository existing = repositoriesService.repositories.get(request.name()); - if (existing == null) { - existing = repositoriesService.internalRepositories.get(request.name()); - } + Repository existing = repositoriesService.repositoryOrNull(projectId, request.name()); assert existing != null : "repository [" + newRepositoryMetadata.name() + "] must exist"; assert existing.getMetadata() == repositoryMetadata; final RepositoryMetadata updatedMetadata; @@ -316,7 +333,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C if (repositoryMetadata.generation() == RepositoryData.CORRUPTED_REPO_GEN) { // If recreating a corrupted repository with the same settings, reset the corrupt flag. // Setting the safe generation to unknown, so that a consistent generation is found. - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); logger.info( "repository [{}/{}] is marked as corrupted, resetting the corruption marker", repositoryMetadata.name(), @@ -334,7 +351,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C // we're updating in place so the updated metadata must point at the same uuid and generations updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings()); } else { - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); updatedMetadata = newRepositoryMetadata; } found = true; @@ -349,7 +366,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C repositories = new RepositoriesMetadata(repositoriesMetadata); changed = true; return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata(ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories)) .build(); } } @@ -361,17 +378,21 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * * @param request */ - public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) { + public void validateRepositoryCanBeCreated(final ProjectId projectId, final PutRepositoryRequest request) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); // Trying to create the new repository on master to make sure it works - closeRepository(createRepository(newRepositoryMetadata)); + closeRepository(createRepository(projectId, newRepositoryMetadata)); } - private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener resultListener) { + private void validatePutRepositoryRequest( + final ProjectId projectId, + final PutRepositoryRequest request, + ActionListener resultListener + ) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); try { - final var repository = createRepository(newRepositoryMetadata); + final var repository = createRepository(projectId, newRepositoryMetadata); if (request.verify()) { // verify repository on local node only, different from verifyRepository method that runs on other cluster nodes threadPool.executor(ThreadPool.Names.SNAPSHOT) @@ -413,6 +434,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C */ public static void updateRepositoryUuidInMetadata( ClusterService clusterService, + final ProjectId projectId, final String repositoryName, RepositoryData repositoryData, ActionListener listener @@ -424,7 +446,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return; } - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state()).repository(repositoryName); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state().metadata().getProject(projectId)) + .repository(repositoryName); if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) { listener.onResponse(null); return; @@ -441,11 +464,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C submitUnbatchedTask( clusterService, - "update repository UUID [" + repositoryName + "] to [" + repositoryUuid + "]", + "update repository UUID " + projectRepoString(projectId, repositoryName) + " to [" + repositoryUuid + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(projectId); final RepositoriesMetadata currentReposMetadata = RepositoriesMetadata.get(project); final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName); @@ -477,24 +500,32 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C *

* This method can be only called on the master node. It removes repository information from cluster metadata. * + * @param projectId project to look for the repository * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { - submitUnbatchedTask("delete_repository [" + request.name() + "]", new UnregisterRepositoryTask(request, listener) { - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - if (deletedRepositories.isEmpty() == false) { - logger.info("deleted repositories [{}]", deletedRepositories); + public void unregisterRepository( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { + submitUnbatchedTask( + "delete_repository " + projectRepoString(projectId, request.name()), + new UnregisterRepositoryTask(projectId, request, listener) { + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + if (deletedRepositories.isEmpty() == false) { + logger.info("deleted repositories [{}] for project [{}]", deletedRepositories, projectId); + } + } + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + // repository was created on both master and data nodes + return discoveryNode.isMasterNode() || discoveryNode.canContainData(); } } - - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - // repository was created on both master and data nodes - return discoveryNode.isMasterNode() || discoveryNode.canContainData(); - } - }); + ); } /** @@ -503,10 +534,16 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C */ public static class UnregisterRepositoryTask extends AckedClusterStateUpdateTask { protected final List deletedRepositories = new ArrayList<>(); + private final ProjectId projectId; private final DeleteRepositoryRequest request; - UnregisterRepositoryTask(final DeleteRepositoryRequest request, final ActionListener listener) { + UnregisterRepositoryTask( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { super(request, listener); + this.projectId = projectId; this.request = request; } @@ -514,20 +551,20 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * Constructor used by {@link org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction} * @param name the repository name */ - public UnregisterRepositoryTask(TimeValue dummyTimeout, String name) { - this(new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); + public UnregisterRepositoryTask(TimeValue dummyTimeout, ProjectId projectId, String name) { + this(projectId, new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); if (repositories.repositories().size() > 0) { List repositoriesMetadata = new ArrayList<>(repositories.repositories().size()); boolean changed = false; for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) { - ensureRepositoryNotInUse(currentState, repositoryMetadata.name()); + ensureRepositoryNotInUse(projectState, repositoryMetadata.name()); ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata); deletedRepositories.add(repositoryMetadata.name()); changed = true; @@ -538,7 +575,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C if (changed) { repositories = new RepositoriesMetadata(repositoriesMetadata); return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata( + ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories) + ) .build(); } } @@ -549,8 +588,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - public void verifyRepository(final String repositoryName, final ActionListener> listener) { - final Repository repository = repository(repositoryName); + public void verifyRepository( + final ProjectId projectId, + final String repositoryName, + final ActionListener> listener + ) { + final Repository repository = repository(projectId, repositoryName); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { @@ -566,7 +609,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C try { repository.endVerification(verificationToken); } catch (Exception e) { - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", e); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + + " failed to finish repository verification", + e + ); delegatedListener.onFailure(e); return; } @@ -580,7 +627,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C repository.endVerification(verificationToken); } catch (Exception inner) { inner.addSuppressed(e); - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", inner); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + " failed to finish repository verification", + inner + ); } listener.onFailure(e); }); @@ -608,81 +658,154 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public void applyClusterState(ClusterChangedEvent event) { try { final ClusterState state = event.state(); - assert assertReadonlyRepositoriesNotInUseForWrites(state); - final RepositoriesMetadata oldMetadata = RepositoriesMetadata.get(event.previousState()); - final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state); + final ClusterState previousState = event.previousState(); - // Check if repositories got changed - if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { - for (Repository repo : repositories.values()) { - repo.updateState(state); - } - return; + for (var projectId : event.projectDelta().removed()) { // removed projects + applyProjectStateForRemovedProject(state.version(), previousState.projectState(projectId)); } - logger.trace("processing new index repositories for state version [{}]", event.state().version()); - - Map survivors = new HashMap<>(); - // First, remove repositories that are no longer there - for (Map.Entry entry : repositories.entrySet()) { - if (newMetadata.repository(entry.getKey()) == null) { - logger.debug("unregistering repository [{}]", entry.getKey()); - Repository repository = entry.getValue(); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - } else { - survivors.put(entry.getKey(), entry.getValue()); - } + for (var projectId : event.projectDelta().added()) { // added projects + applyProjectStateForAddedOrExistingProject(state.version(), state.projectState(projectId), null); } - Map builder = new HashMap<>(); - - // Now go through all repositories and update existing or create missing - for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { - Repository repository = survivors.get(repositoryMetadata.name()); - if (repository != null) { - // Found previous version of this repository - if (canUpdateInPlace(repositoryMetadata, repository) == false) { - // Previous version is different from the version in settings - logger.debug("updating repository [{}]", repositoryMetadata.name()); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - repository = null; - try { - repository = createRepository( - repositoryMetadata, - typesRegistry, - RepositoriesService::createUnknownTypeRepository - ); - } catch (RepositoryException ex) { - // TODO: this catch is bogus, it means the old repo is already closed, - // but we have nothing to replace it - logger.warn(() -> "failed to change repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); - } - } - } else { - try { - repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository); - } catch (RepositoryException ex) { - logger.warn(() -> "failed to create repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); - } - } - assert repository != null : "repository should not be null here"; - logger.debug("registering repository [{}]", repositoryMetadata.name()); - builder.put(repositoryMetadata.name(), repository); + // existing projects + final var common = event.projectDelta().added().isEmpty() + ? state.metadata().projects().keySet() + : Sets.difference(state.metadata().projects().keySet(), event.projectDelta().added()); + for (var projectId : common) { + applyProjectStateForAddedOrExistingProject( + state.version(), + state.projectState(projectId), + previousState.projectState(projectId) + ); } - for (Repository repo : builder.values()) { - repo.updateState(state); - } - repositories = unmodifiableMap(builder); } catch (Exception ex) { assert false : new AssertionError(ex); logger.warn("failure updating cluster state ", ex); } } + /** + * Apply changes for one removed project. + * + * @param version The cluster state version of the change. + * @param previousState The previous project state for the removed project. + */ + private void applyProjectStateForRemovedProject(long version, ProjectState previousState) { + final var projectId = previousState.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false : "default project cannot be removed"; + final var survivors = closeRemovedRepositories(version, projectId, getProjectRepositories(projectId), RepositoriesMetadata.EMPTY); + assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet(); + repositories.remove(projectId); + } + + /** + * Apply changes for one project. The project can be either newly added or an existing one. + * + * @param version The cluster state version of the change. + * @param state The current project state + * @param previousState The previous project state, or {@code null} if the project was newly added. + */ + private void applyProjectStateForAddedOrExistingProject(long version, ProjectState state, @Nullable ProjectState previousState) { + assert assertReadonlyRepositoriesNotInUseForWrites(state); + final var projectId = state.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false || previousState != null : "default project cannot be added"; + assert previousState == null || projectId.equals(previousState.projectId()) + : "current and previous states must refer to the same project, but got " + projectId + " != " + previousState.projectId(); + + final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state.metadata()); + final RepositoriesMetadata oldMetadata = previousState == null + ? RepositoriesMetadata.EMPTY + : RepositoriesMetadata.get(previousState.metadata()); + + final Map projectRepositories = getProjectRepositories(projectId); + // Check if repositories got changed + if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { + for (Repository repo : projectRepositories.values()) { + repo.updateState(state.cluster()); + } + return; + } + + logger.trace("processing new index repositories for project [{}] and state version [{}]", projectId, version); + + // First, remove repositories that are no longer there + final var survivors = closeRemovedRepositories(version, projectId, projectRepositories, newMetadata); + + Map builder = new HashMap<>(); + + // Now go through all repositories and update existing or create missing + for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { + Repository repository = survivors.get(repositoryMetadata.name()); + if (repository != null) { + // Found previous version of this repository + if (canUpdateInPlace(repositoryMetadata, repository) == false) { + // Previous version is different from the version in settings + logger.debug("updating repository {}", projectRepoString(projectId, repositoryMetadata.name())); + closeRepository(repository); + archiveRepositoryStats(repository, version); + repository = null; + try { + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); + } catch (RepositoryException ex) { + // TODO: this catch is bogus, it means the old repo is already closed, + // but we have nothing to replace it + logger.warn(() -> "failed to change repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); + } + } + } else { + try { + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); + } catch (RepositoryException ex) { + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); + } + } + assert repository != null : "repository should not be null here"; + logger.debug("registering repository [{}]", projectRepoString(projectId, repositoryMetadata.name())); + builder.put(repositoryMetadata.name(), repository); + } + for (Repository repo : builder.values()) { + repo.updateState(state.cluster()); + } + if (builder.isEmpty() == false) { + repositories.put(projectId, unmodifiableMap(builder)); + } else { + repositories.remove(projectId); + } + } + + private Map closeRemovedRepositories( + long version, + ProjectId projectId, + Map projectRepositories, + RepositoriesMetadata newMetadata + ) { + Map survivors = new HashMap<>(); + for (Map.Entry entry : projectRepositories.entrySet()) { + if (newMetadata.repository(entry.getKey()) == null) { + logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey())); + Repository repository = entry.getValue(); + closeRepository(repository); + archiveRepositoryStats(repository, version); + } else { + survivors.put(entry.getKey(), entry.getValue()); + } + } + return survivors; + } + private static boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repository repository) { assert updatedMetadata.name().equals(repository.getMetadata().name()); return repository.getMetadata().type().equals(updatedMetadata.type()) @@ -692,12 +815,13 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C /** * Gets the {@link RepositoryData} for the given repository. * + * @param projectId project to look for the repository * @param repositoryName repository name * @param listener listener to pass {@link RepositoryData} to */ - public void getRepositoryData(final String repositoryName, final ActionListener listener) { + public void getRepositoryData(final ProjectId projectId, final String repositoryName, final ActionListener listener) { try { - Repository repository = repository(repositoryName); + Repository repository = repository(projectId, repositoryName); assert repository != null; // should only be called once we've validated the repository exists repository.getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? @@ -709,18 +833,28 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } /** - * Returns registered repository + * Returns registered repository, either internal or external * * @param repositoryName repository name * @return registered repository * @throws RepositoryMissingException if repository with such name isn't registered */ + @FixForMultiProject + @Deprecated(forRemoval = true) public Repository repository(String repositoryName) { - Repository repository = repositories.get(repositoryName); - if (repository != null) { - return repository; - } - repository = internalRepositories.get(repositoryName); + return repository(ProjectId.DEFAULT, repositoryName); + } + + /** + * Returns registered repository, either internal or external + * + * @param projectId the project to look for the repository + * @param repositoryName repository name + * @return registered repository + * @throws RepositoryMissingException if repository with such name isn't registered + */ + public Repository repository(ProjectId projectId, String repositoryName) { + Repository repository = repositoryOrNull(projectId, repositoryName); if (repository != null) { return repository; } @@ -728,10 +862,33 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } /** - * @return the current collection of registered repositories, keyed by name. + * Similar to {@link #repository(ProjectId, String)}, but returns {@code null} instead of throw if the repository is not found. */ - public Map getRepositories() { - return unmodifiableMap(repositories); + public Repository repositoryOrNull(ProjectId projectId, String repositoryName) { + Repository repository = repositories.getOrDefault(projectId, Map.of()).get(repositoryName); + if (repository != null) { + return repository; + } + return internalRepositories.getOrDefault(projectId, Map.of()).get(repositoryName); + } + + /** + * @return the current collection of registered repositories from all projects. + */ + public List getRepositories() { + return repositories.values().stream().map(Map::values).flatMap(Collection::stream).toList(); + } + + /** + * @return the current collection of registered repositories for the given project, keyed by name. + */ + public Map getProjectRepositories(ProjectId projectId) { + return repositories.getOrDefault(projectId, Map.of()); + } + + // Package private for testing + boolean hasRepositoryTrackingForProject(ProjectId projectId) { + return repositories.containsKey(projectId) || internalRepositories.containsKey(projectId); } public List repositoriesStats() { @@ -745,8 +902,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public RepositoriesStats getRepositoriesThrottlingStats() { return new RepositoriesStats( - repositories.values() - .stream() + getRepositories().stream() .collect( Collectors.toMap( r -> r.getMetadata().name(), @@ -757,7 +913,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } private List getRepositoryStatsForActiveRepositories() { - return Stream.concat(repositories.values().stream(), internalRepositories.values().stream()) + return Stream.concat( + repositories.values().stream().map(Map::values).flatMap(Collection::stream), + internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream) + ) .filter(r -> r instanceof MeteredBlobStoreRepository) .map(r -> (MeteredBlobStoreRepository) r) .map(MeteredBlobStoreRepository::statsSnapshot) @@ -768,12 +927,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return repositoriesStatsArchive.clear(maxVersionToClear); } - public void registerInternalRepository(String name, String type) { + public void registerInternalRepository(ProjectId projectId, String name, String type) { RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY); - Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { - logger.debug("put internal repository [{}][{}]", name, type); - return createRepository(metadata, internalTypesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists); - }); + Repository repository = internalRepositories.compute(projectId, (ignored, existingRepos) -> { + if (existingRepos == null) { + existingRepos = Map.of(); + } + if (existingRepos.containsKey(name)) { + return existingRepos; + } + logger.debug("put internal repository [{}][{}]", projectRepoString(projectId, name), type); + final var repo = createRepository( + projectId, + metadata, + internalTypesRegistry, + RepositoriesService::throwRepositoryTypeDoesNotExists + ); + final var newRepos = new HashMap<>(existingRepos); + newRepos.put(name, repo); + return unmodifiableMap(newRepos); + }).get(name); if (type.equals(repository.getMetadata().type()) == false) { logger.warn( () -> format( @@ -785,7 +958,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C type ) ); - } else if (repositories.containsKey(name)) { + } else if (getProjectRepositories(projectId).containsKey(name)) { logger.warn( () -> format( "non-internal repository [%s] already registered. this repository will block the " @@ -798,11 +971,24 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - public void unregisterInternalRepository(String name) { - Repository repository = internalRepositories.remove(name); + public void unregisterInternalRepository(ProjectId projectId, String name) { + final var repositoryRef = new AtomicReference(); + internalRepositories.computeIfPresent(projectId, (ignored, existingRepos) -> { + if (existingRepos.containsKey(name) == false) { + return existingRepos; + } + final var newRepos = new HashMap<>(existingRepos); + repositoryRef.set(newRepos.remove(name)); + if (newRepos.isEmpty()) { + return null; + } else { + return unmodifiableMap(newRepos); + } + }); + Repository repository = repositoryRef.get(); if (repository != null) { RepositoryMetadata metadata = repository.getMetadata(); - logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), name)); + logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), projectRepoString(projectId, name))); closeRepository(repository); } } @@ -811,7 +997,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * Closes the given repository. */ private static void closeRepository(Repository repository) { - logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name()); + logger.debug( + "closing repository [{}]{}", + repository.getMetadata().type(), + projectRepoString(repository.getProjectId(), repository.getMetadata().name()) + ); repository.close(); } @@ -824,19 +1014,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - /** - * Creates repository holder. This method starts the repository - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - private static Repository createRepository( - RepositoryMetadata repositoryMetadata, - Map factories, - BiFunction defaultFactory - ) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory); - } - /** * Creates repository holder. This method starts the repository */ @@ -863,27 +1040,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - /** - * Creates a repository holder. - * - *

WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:

- * - *
    - *
  • This method does not register the repository (e.g., in the cluster state).
  • - *
  • This method starts the repository. The repository should be closed after use.
  • - *
  • The repository metadata should be associated to an already registered non-internal repository type and factory pair.
  • - *
- * - * @param repositoryMetadata the repository metadata - * @return the started repository - * @throws RepositoryException if repository type is not registered - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - public Repository createRepository(RepositoryMetadata repositoryMetadata) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata); - } - /** * Creates a repository holder. * @@ -947,25 +1103,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - private static void ensureRepositoryNotInUseForWrites(ClusterState clusterState, String repository) { - if (SnapshotsInProgress.get(clusterState).forRepo(repository).isEmpty() == false) { + private static void ensureRepositoryNotInUseForWrites(ProjectState projectState, String repository) { + final ProjectId projectId = projectState.projectId(); + if (SnapshotsInProgress.get(projectState.cluster()).forRepo(projectId, repository).isEmpty() == false) { throw newRepositoryConflictException(repository, "snapshot is in progress"); } - for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(clusterState).getEntries()) { - if (entry.repository().equals(repository)) { + for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(projectState.cluster()).getEntries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "snapshot deletion is in progress"); } } - for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(clusterState).entries()) { - if (entry.repository().equals(repository)) { + for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(projectState.cluster()).entries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "repository clean up is in progress"); } } } - private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { - ensureRepositoryNotInUseForWrites(clusterState, repository); - for (RestoreInProgress.Entry entry : RestoreInProgress.get(clusterState)) { + private static void ensureRepositoryNotInUse(ProjectState projectState, String repository) { + ensureRepositoryNotInUseForWrites(projectState, repository); + for (RestoreInProgress.Entry entry : RestoreInProgress.get(projectState.cluster())) { if (repository.equals(entry.snapshot().getRepository())) { throw newRepositoryConflictException(repository, "snapshot restore is in progress"); } @@ -979,11 +1136,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C /** * Test-only check for the invariant that read-only repositories never have any write activities. */ - private static boolean assertReadonlyRepositoriesNotInUseForWrites(ClusterState clusterState) { - for (final var repositoryMetadata : RepositoriesMetadata.get(clusterState).repositories()) { + private static boolean assertReadonlyRepositoriesNotInUseForWrites(ProjectState projectState) { + assert projectState != null; + for (final var repositoryMetadata : RepositoriesMetadata.get(projectState.metadata()).repositories()) { if (isReadOnly(repositoryMetadata.settings())) { try { - ensureRepositoryNotInUseForWrites(clusterState, repositoryMetadata.name()); + ensureRepositoryNotInUseForWrites(projectState, repositoryMetadata.name()); } catch (Exception e) { throw new AssertionError("repository [" + repositoryMetadata + "] is readonly but still in use", e); } @@ -1071,7 +1229,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return RepositoryUsageStats.EMPTY; } final var statsByType = new HashMap>(); - for (final var repository : repositories.values()) { + for (final var repository : getRepositories()) { final var repositoryType = repository.getMetadata().type(); final var typeStats = statsByType.computeIfAbsent(repositoryType, ignored -> new HashMap<>()); typeStats.compute(COUNT_USAGE_STATS_NAME, (k, count) -> (count == null ? 0L : count) + 1); @@ -1096,8 +1254,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C protected void doClose() throws IOException { clusterService.removeApplier(this); final Collection repos = new ArrayList<>(); - repos.addAll(internalRepositories.values()); - repos.addAll(repositories.values()); + repos.addAll(internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream).toList()); + repos.addAll(getRepositories()); IOUtils.close(repos); for (Repository repo : repos) { repo.awaitIdle(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 5357218b03f4..e6bdf5bc5efc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -52,6 +52,19 @@ public interface RepositoryOperation { projectId.writeTo(out); out.writeString(name); } + + @Override + public String toString() { + return projectRepoString(projectId, name); + } + } + + static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) { + return new ProjectRepo(projectId, repositoryName); + } + + static String projectRepoString(ProjectId projectId, String repositoryName) { + return "[" + projectId + "][" + repositoryName + "]"; } DiffableUtils.KeySerializer PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() { diff --git a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java index ea147f336cb7..91b9e348bf34 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java +++ b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java @@ -9,7 +9,7 @@ package org.elasticsearch.repositories; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -21,7 +21,7 @@ import java.util.List; import java.util.Set; /** - * The result of calling {@link #resolve(ClusterState, String[])} to resolve a description of some snapshot repositories (from a path + * The result of calling {@link #resolve(ProjectMetadata, String[])} to resolve a description of some snapshot repositories (from a path * component of a request to the get-repositories or get-snapshots APIs) against the known repositories in the cluster state: the * {@link RepositoryMetadata} for the extant repositories that match the description, together with a list of the parts of the description * that failed to match any known repository. @@ -38,8 +38,8 @@ public record ResolvedRepositories(List repositoryMetadata, || (patterns.length == 1 && (ALL_PATTERN.equalsIgnoreCase(patterns[0]) || Regex.isMatchAllPattern(patterns[0]))); } - public static ResolvedRepositories resolve(ClusterState state, String[] patterns) { - final var repositories = RepositoriesMetadata.get(state); + public static ResolvedRepositories resolve(ProjectMetadata projectMetadata, String[] patterns) { + final var repositories = RepositoriesMetadata.get(projectMetadata); if (isMatchAll(patterns)) { return new ResolvedRepositories(repositories.repositories(), List.of()); } diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index a2d4f2921555..f84adbe407b7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.LegacyActionRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,6 +40,7 @@ public class VerifyNodeRepositoryAction { private final ClusterService clusterService; private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportAction( @@ -46,18 +48,20 @@ public class VerifyNodeRepositoryAction { ActionFilters actionFilters, ThreadPool threadPool, ClusterService clusterService, - RepositoriesService repositoriesService + RepositoriesService repositoriesService, + ProjectResolver projectResolver ) { super(ACTION_NAME, transportService, actionFilters, Request::new, threadPool.executor(ThreadPool.Names.SNAPSHOT)); this.clusterService = clusterService; this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); try { - Repository repository = repositoriesService.repository(request.repository); + Repository repository = repositoriesService.repository(projectResolver.getProjectId(), request.repository); repository.verify(request.verificationToken, localNode); listener.onResponse(ActionResponse.Empty.INSTANCE); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9611f4243776..996c9cda4dea 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2407,7 +2407,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) { throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]"); } - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(getProjectId()); return ClusterState.builder(currentState) .putProjectMetadata( ProjectMetadata.builder(project) @@ -2476,6 +2476,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // someone switched the repo contents out from under us RepositoriesService.updateRepositoryUuidInMetadata( clusterService, + getProjectId(), metadata.name(), loaded, new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded)) @@ -3115,7 +3116,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } private RepositoryMetadata getRepoMetadata(ClusterState state) { - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state).repository(metadata.name()); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId())) + .repository(metadata.name()); assert repositoryMetadata != null || lifecycle.stoppedOrClosed() : "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]"; return repositoryMetadata; diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index bf6b078571bf..f57db79d4625 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -602,7 +602,9 @@ public final class RestoreService implements ClusterStateApplier { return; } - for (Repository repository : repositoriesService.getRepositories().values()) { + @FixForMultiProject(description = "resolve the actual projectId, ES-10228") + final var projectId = ProjectId.DEFAULT; + for (Repository repository : repositoriesService.getProjectRepositories(projectId).values()) { // We only care about BlobStoreRepositories because they're the only ones that can contain a searchable snapshot, and we // only care about ones with missing UUIDs. It's possible to have the UUID change from under us if, e.g., the repository was // wiped by an external force, but in this case any searchable snapshots are lost anyway so it doesn't really matter. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 68a935599ed6..d49b670451f5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -605,7 +605,7 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); } - final Repository repository = repositoriesService.repository(snapshot.getRepository()); + final Repository repository = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); SnapshotIndexCommit snapshotIndexCommit = null; try { snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index abbf2ab22343..8a1f1ad79a17 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1377,7 +1377,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement final String repoName = snapshot.getRepository(); if (tryEnterRepoLoop(repoName)) { if (repositoryData == null) { - repositoriesService.repository(repoName) + repositoriesService.repository(snapshot.getProjectId(), repoName) .getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? new ActionListener<>() { @@ -1486,7 +1486,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement } final String repository = snapshot.getRepository(); final ListenableFuture metadataListener = new ListenableFuture<>(); - final Repository repo = repositoriesService.repository(snapshot.getRepository()); + final Repository repo = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); if (entry.isClone()) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source()); @@ -2441,7 +2441,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement */ private void deleteSnapshotsFromRepository(SnapshotDeletionsInProgress.Entry deleteEntry, IndexVersion minNodeVersion) { final long expectedRepoGen = deleteEntry.repositoryStateId(); - repositoriesService.getRepositoryData(deleteEntry.repository(), new ActionListener<>() { + repositoriesService.getRepositoryData(deleteEntry.projectId(), deleteEntry.repository(), new ActionListener<>() { @Override public void onResponse(RepositoryData repositoryData) { assert repositoryData.getGenId() == expectedRepoGen @@ -2564,7 +2564,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement return; } final SubscribableListener doneFuture = new SubscribableListener<>(); - repositoriesService.repository(deleteEntry.repository()) + repositoriesService.repository(deleteEntry.projectId(), deleteEntry.repository()) .deleteSnapshots(snapshotIds, repositoryData.getGenId(), minNodeVersion, new ActionListener<>() { @Override public void onResponse(RepositoryData updatedRepoData) { @@ -3788,7 +3788,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement entry.source(), clone.getValue(), clone.getKey(), - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.projectId(), entry.repository()) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java index d62d99aa1c43..cee2c5fd8d41 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java @@ -34,6 +34,7 @@ import java.util.Set; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -147,12 +148,12 @@ public class ReservedRepositoryActionTests extends ESTestCase { ); doAnswer(invocation -> { - var request = (PutRepositoryRequest) invocation.getArguments()[0]; + var request = (PutRepositoryRequest) invocation.getArguments()[1]; if (request.type().equals("inter_planetary")) { throw new RepositoryException(request.name(), "repository type [" + request.type() + "] does not exist"); } return null; - }).when(repositoriesService).validateRepositoryCanBeCreated(any()); + }).when(repositoriesService).validateRepositoryCanBeCreated(eq(ProjectId.DEFAULT), any()); return repositoriesService; } diff --git a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java index 561c05165eab..e565971cef94 100644 --- a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.List; -import java.util.Map; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -44,7 +43,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { } public void testGetHealthNoRepos() { - when(repositoriesService.getRepositories()).thenReturn(Map.of()); + when(repositoriesService.getRepositories()).thenReturn(List.of()); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -58,7 +57,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { when(metadata.generation()).thenReturn(randomNonNegativeLong()); var repo = mock(Repository.class); when(repo.getMetadata()).thenReturn(metadata); - when(repositoriesService.getRepositories()).thenReturn(Map.of(randomAlphaOfLength(10), repo)); + when(repositoriesService.getRepositories()).thenReturn(List.of(repo)); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -68,9 +67,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { public void testGetHealthUnknownType() { var repo = createRepositoryMetadata(); - when(repositoriesService.getRepositories()).thenReturn( - Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(randomProjectIdOrDefault(), repo)) - ); + when(repositoriesService.getRepositories()).thenReturn(List.of(new UnknownTypeRepository(randomProjectIdOrDefault(), repo))); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -82,7 +79,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { public void testGetHealthInvalid() { var repo = createRepositoryMetadata(); when(repositoriesService.getRepositories()).thenReturn( - Map.of(repo.name(), new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) + List.of(new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) ); var health = repositoriesHealthTracker.determineCurrentHealth(); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 8c4fcd28c988..96d601f9091f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -23,12 +23,14 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.project.TestProjectResolvers; +import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -68,8 +70,16 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BooleanSupplier; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; public class RepositoriesServiceTests extends ESTestCase { @@ -77,6 +87,7 @@ public class RepositoriesServiceTests extends ESTestCase { private ClusterService clusterService; private RepositoriesService repositoriesService; private ThreadPool threadPool; + private ProjectId projectId; @Override public void setUp() throws Exception { @@ -94,6 +105,13 @@ public class RepositoriesServiceTests extends ESTestCase { Collections.emptySet() ); clusterService = ClusterServiceUtils.createClusterService(threadPool); + projectId = randomProjectIdOrDefault(); + if (ProjectId.DEFAULT.equals(projectId) == false) { + ClusterServiceUtils.setState( + clusterService, + ClusterState.builder(clusterService.state()).putProjectMetadata(ProjectMetadata.builder(projectId)).build() + ); + } DiscoveryNode localNode = DiscoveryNodeUtils.builder("local").name("local").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build(); NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()); @@ -157,9 +175,9 @@ public class RepositoriesServiceTests extends ESTestCase { public void testRegisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertEquals(repoName, repository.getMetadata().name()); assertEquals(TestRepository.TYPE, repository.getMetadata().type()); assertEquals(Settings.EMPTY, repository.getMetadata().settings()); @@ -168,24 +186,24 @@ public class RepositoriesServiceTests extends ESTestCase { public void testUnregisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.unregisterInternalRepository(repoName); - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.unregisterInternalRepository(projectId, repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); assertTrue(((TestRepository) repository).isClosed); } public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); assertFalse(((TestRepository) repository).isClosed); - Repository repository2 = repositoriesService.repository(repoName); + Repository repository2 = repositoriesService.repository(projectId, repoName); assertSame(repository, repository2); } @@ -203,11 +221,11 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); // also make sure that cluster state does not include failed repo - assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); }); + assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(projectId, repoName); }); } public void testPutRepositoryVerificationFailsOnExisting() { @@ -216,7 +234,7 @@ public class RepositoriesServiceTests extends ESTestCase { .type(TestRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); @@ -225,10 +243,10 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(true); resultListener = new SubscribableListener<>(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); - var repository = repositoriesService.repository(repoName); + var repository = repositoriesService.repository(projectId, repoName); assertEquals(repository.getMetadata().type(), TestRepository.TYPE); } @@ -238,14 +256,14 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(false); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); } public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE); @@ -276,7 +294,7 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, "unknown"); repositoriesService.applyClusterState(new ClusterChangedEvent("starting", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(UnknownTypeRepository.class)); } @@ -288,7 +306,7 @@ public class RepositoriesServiceTests extends ESTestCase { repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -297,7 +315,7 @@ public class RepositoriesServiceTests extends ESTestCase { var repoName = randomAlphaOfLengthBetween(10, 25); var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type("unknown"); - repositoriesService.registerRepository(request, new ActionListener<>() { + repositoriesService.registerRepository(projectId, request, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { fail("Should not register unknown repository type"); @@ -306,7 +324,10 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void onFailure(Exception e) { assertThat(e, isA(RepositoryException.class)); - assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist for project [default]")); + assertThat( + e.getMessage(), + equalTo("[" + repoName + "] repository type [unknown] does not exist for project [" + projectId + "]") + ); } }); } @@ -318,7 +339,7 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); } @@ -329,12 +350,12 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); clusterState = createClusterStateWithRepo(repoName, TestRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put test repository", clusterState, emptyState())); - repo = repositoriesService.repository(repoName); + repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(TestRepository.class)); } @@ -346,7 +367,7 @@ public class RepositoriesServiceTests extends ESTestCase { repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -370,17 +391,17 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); // 2. repository creation successfully when current node become master node and repository is put again var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type(TestRepository.TYPE); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var response = safeAwait(resultListener); assertTrue(response.isAcknowledged()); - assertThat(repositoriesService.repository(repoName), isA(TestRepository.class)); + assertThat(repositoriesService.repository(projectId, repoName), isA(TestRepository.class)); } public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { @@ -393,19 +414,21 @@ public class RepositoriesServiceTests extends ESTestCase { .newForked( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE), l.map(ignored -> null) ) ) .andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), ActionTestUtils.assertNoSuccessListener(e -> { @@ -425,20 +448,21 @@ public class RepositoriesServiceTests extends ESTestCase { ) ) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(newGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), l.map(ignored -> null) @@ -446,7 +470,7 @@ public class RepositoriesServiceTests extends ESTestCase { ) .andThenAccept( ignored -> assertTrue( - repositoriesService.repository(repoName) + repositoriesService.repository(projectId, repoName) .getMetadata() .settings() .getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null) @@ -455,17 +479,120 @@ public class RepositoriesServiceTests extends ESTestCase { ); } + public void testRepositoryUpdatesForMultipleProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + // 1. Initial project + final var repoName = "repo"; + final var state0 = createClusterStateWithRepo(repoName, TestRepository.TYPE); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state0, emptyState())); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(1)); + final var initialProjectRepo = (TestRepository) repositoriesService.getProjectRepositories(projectId).values().iterator().next(); + assertThat(repositoriesService.getRepositories(), contains(initialProjectRepo)); + if (ProjectId.DEFAULT.equals(projectId) == false) { + assertFalse(repositoriesService.hasRepositoryTrackingForProject(ProjectId.DEFAULT)); + } + + // 2. Add a new project + final var anotherProjectId = randomUniqueProjectId(); + final var anotherRepoName = "another-repo"; + final var state1 = ClusterState.builder(state0) + .putProjectMetadata( + ProjectMetadata.builder(anotherProjectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.EMPTY), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state1, state0)); + assertThat(repositoriesService.getProjectRepositories(anotherProjectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(3)); + assertThat(repositoriesService.getRepositories(), hasItem(initialProjectRepo)); + final Collection anotherProjectRepos = repositoriesService.getProjectRepositories(anotherProjectId).values(); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 3. Update existing project + assertFalse(initialProjectRepo.isClosed); + final var state2 = ClusterState.builder(state1) + .putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.builder().put("foo", "bar").build()), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state2, state1)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(4)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 4. Remove another project + anotherProjectRepos.forEach(repo -> assertFalse(((TestRepository) repo).isClosed)); + final var state3 = ClusterState.builder(state2) + .metadata(Metadata.builder(state2.metadata()).removeProject(anotherProjectId)) + .routingTable(GlobalRoutingTable.builder(state2.globalRoutingTable()).removeProject(anotherProjectId).build()) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state3, state2)); + anotherProjectRepos.forEach(repo -> assertTrue(((TestRepository) repo).isClosed)); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(anotherProjectId)); + assertThat(repositoriesService.getRepositories(), hasSize(2)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + } + + public void testInternalRepositoryForMultiProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + String repoName = "name"; + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + final TestRepository initialProjectRepo = (TestRepository) repositoriesService.repository(projectId, repoName); + + // Repo of the same name but different project is a different repository instance + final var anotherProjectId = randomUniqueProjectId(); + repositoriesService.registerInternalRepository(anotherProjectId, repoName, TestRepository.TYPE); + final TestRepository anotherProjectRepo = (TestRepository) repositoriesService.repository(anotherProjectId, repoName); + assertThat(initialProjectRepo, not(sameInstance(anotherProjectRepo))); + + // Remove the project repository, the repo should be closed and the project is removed from tracking + repositoriesService.unregisterInternalRepository(projectId, repoName); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(projectId)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.repository(anotherProjectId, repoName), sameInstance(anotherProjectRepo)); + assertTrue(anotherProjectRepo.isStarted); + } + private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener listener) { clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - return new ClusterState.Builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .putCustom( - RepositoriesMetadata.TYPE, - RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) - ) - ).build(); + final ProjectMetadata projectMetadata = currentState.getMetadata().getProject(projectId); + return ClusterState.builder(currentState) + .putProjectMetadata( + ProjectMetadata.builder(projectMetadata) + .putCustom( + RepositoriesMetadata.TYPE, + RepositoriesMetadata.get(projectMetadata) + .withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) + ) + ) + .build(); } @Override @@ -482,12 +609,13 @@ public class RepositoriesServiceTests extends ESTestCase { private ClusterState createClusterStateWithRepo(String repoName, String repoType) { ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - Metadata.Builder mdBuilder = Metadata.builder(); - mdBuilder.putCustom( - RepositoriesMetadata.TYPE, - new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + state.putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + ) ); - state.metadata(mdBuilder); return state.build(); } @@ -500,6 +628,7 @@ public class RepositoriesServiceTests extends ESTestCase { expectThrows( RepositoryException.class, () -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName), null ) @@ -625,7 +754,7 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void updateState(final ClusterState state) { - metadata = RepositoriesMetadata.get(state).repository(metadata.name()); + metadata = RepositoriesMetadata.get(state.metadata().getProject(getProjectId())).repository(metadata.name()); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java index 69521529cd03..4df62531230b 100644 --- a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java @@ -10,6 +10,8 @@ package org.elasticsearch.repositories; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -23,25 +25,36 @@ import java.util.List; public class ResolvedRepositoriesTests extends ESTestCase { + private ProjectId projectId; + + @Override + public void setUp() throws Exception { + super.setUp(); + projectId = randomProjectIdOrDefault(); + } + public void testAll() { runMatchAllTest(); runMatchAllTest("*"); runMatchAllTest("_all"); } - private static void runMatchAllTest(String... patterns) { + private void runMatchAllTest(String... patterns) { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var result = getRepositories(state, patterns); - assertEquals(RepositoriesMetadata.get(state).repositories(), result.repositoryMetadata()); + assertEquals(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories(), result.repositoryMetadata()); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } public void testMatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); - final var name = randomFrom(RepositoriesMetadata.get(state).repositories()).name(); + final var name = randomFrom(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories()).name(); final var result = getRepositories(state, name); - assertEquals(List.of(RepositoriesMetadata.get(state).repository(name)), result.repositoryMetadata()); + assertEquals( + List.of(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repository(name)), + result.repositoryMetadata() + ); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } @@ -49,7 +62,7 @@ public class ResolvedRepositoriesTests extends ESTestCase { public void testMismatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var notAName = randomValueOtherThanMany( - n -> RepositoriesMetadata.get(state).repositories().stream().anyMatch(m -> n.equals(m.name())), + n -> RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories().stream().anyMatch(m -> n.equals(m.name())), ESTestCase::randomIdentifier ); final var result = getRepositories(state, notAName); @@ -70,25 +83,29 @@ public class ResolvedRepositoriesTests extends ESTestCase { runWildcardTest(state, List.of("other-repo", "test-match-1", "test-match-2"), "other-repo", "test-*", "-*-exclude"); } - private static void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { + private void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { final var result = getRepositories(clusterState, patterns); final var description = Strings.format("%s should yield %s", Arrays.toString(patterns), expectedNames); assertFalse(description, result.hasMissingRepositories()); assertEquals(description, expectedNames, result.repositoryMetadata().stream().map(RepositoryMetadata::name).toList()); } - private static ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { - return ResolvedRepositories.resolve(clusterState, patterns); + private ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { + return ResolvedRepositories.resolve(clusterState.metadata().getProject(projectId), patterns); } - private static ClusterState clusterStateWithRepositories(String... repoNames) { + private ClusterState clusterStateWithRepositories(String... repoNames) { final var repositories = new ArrayList(repoNames.length); for (final var repoName : repoNames) { repositories.add(new RepositoryMetadata(repoName, "test", Settings.EMPTY)); } - return ClusterState.EMPTY_STATE.copyAndUpdateMetadata( - b -> b.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories)) - ); + return ClusterState.EMPTY_STATE.copyAndUpdateMetadata(b -> { + ProjectMetadata.Builder projectBuilder = b.getProject(projectId); + if (projectBuilder == null) { + projectBuilder = ProjectMetadata.builder(projectId); + } + b.put(projectBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories))); + }); } } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 044b054ad873..6c2083196973 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -12,6 +12,7 @@ package org.elasticsearch.repositories.blobstore; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.util.TestUtil; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -21,6 +22,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -219,11 +221,13 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { /** Create a {@link Repository} with a random name **/ private Repository createRepository() { + @FixForMultiProject(description = "randomize when snapshot and restore support multiple projects, see also ES-10225, ES-10228") + final ProjectId projectId = ProjectId.DEFAULT; Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 076be2cd27e8..eb0b54571298 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -520,11 +521,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } public void testEnsureUploadListenerIsResolvedWhenAFileSnapshotTaskFails() throws Exception { + final ProjectId projectId = randomProjectIdOrDefault(); Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java index 259fc6fea693..e45dc4e3d8cd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -259,7 +260,7 @@ public class RestoreServiceTests extends ESTestCase { } final RepositoriesService repositoriesService = mock(RepositoriesService.class); - when(repositoriesService.getRepositories()).thenReturn(repositories); + when(repositoriesService.getProjectRepositories(eq(ProjectId.DEFAULT))).thenReturn(repositories); final AtomicBoolean completed = new AtomicBoolean(); RestoreService.refreshRepositoryUuids( true, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 7477e648e054..c005691b212b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2522,7 +2522,8 @@ public class SnapshotResiliencyTests extends ESTestCase { actionFilters, threadPool, clusterService, - repositoriesService + repositoriesService, + TestProjectResolvers.DEFAULT_PROJECT_ONLY ) ); actions.put( @@ -2769,7 +2770,14 @@ public class SnapshotResiliencyTests extends ESTestCase { ); actions.put( TransportPutRepositoryAction.TYPE, - new TransportPutRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters) + new TransportPutRepositoryAction( + transportService, + clusterService, + repositoriesService, + threadPool, + actionFilters, + TestProjectResolvers.DEFAULT_PROJECT_ONLY + ) ); actions.put( TransportCleanupRepositoryAction.TYPE, diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index e9a50c12eae0..0a16a7f23e38 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -398,14 +398,14 @@ public final class BlobStoreTestUtil { * @param repositoryMetadata RepositoryMetadata to initialize the cluster state with * @return Mock ClusterService */ - public static ClusterService mockClusterService(RepositoryMetadata repositoryMetadata) { + public static ClusterService mockClusterService(ProjectId projectId, RepositoryMetadata repositoryMetadata) { return mockClusterService( ClusterState.builder(ClusterState.EMPTY_STATE) .metadata( - Metadata.builder() + Metadata.builder(ClusterState.EMPTY_STATE.metadata()) .clusterUUID(UUIDs.randomBase64UUID(random())) .put( - ProjectMetadata.builder(ProjectId.DEFAULT) + ProjectMetadata.builder(projectId) .putCustom( RepositoriesMetadata.TYPE, new RepositoriesMetadata(Collections.singletonList(repositoryMetadata)) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index 61922ee0b887..4e101e6b0d24 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public class DeleteInternalCcrRepositoryAction extends ActionType listener) { - repositoriesService.unregisterInternalRepository(request.getName()); + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") + final var projectId = ProjectId.DEFAULT; + repositoriesService.unregisterInternalRepository(projectId, request.getName()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index ee44b4303760..a96771d29b14 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public class PutInternalCcrRepositoryAction extends ActionType listener) { - repositoriesService.registerInternalRepository(request.getName(), request.getType()); + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") + final var projectId = ProjectId.DEFAULT; + repositoriesService.registerInternalRepository(projectId, request.getName(), request.getType()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 207ac4f38aed..fbb7ddfb6024 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -95,6 +96,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.hamcrest.Matchers.equalTo; +@FixForMultiProject(description = "Randomizing projectId once snapshot and restore support multiple projects, ES-10225, ES-10228") public class SourceOnlySnapshotShardTests extends IndexShardTestCase { public void testSourceIncomplete() throws IOException { @@ -123,7 +125,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -174,7 +176,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { recoverShardFromStore(shard); SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -215,7 +217,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -344,7 +346,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -571,7 +573,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { private Repository createRepository(ProjectId projectId) { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final Repository repository = new FsRepository( projectId, repositoryMetadata, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java index 63522ce2309a..9994b90c59ba 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java @@ -9,6 +9,8 @@ package org.elasticsearch.xpack.searchablesnapshots.store; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -46,14 +48,16 @@ public class RepositorySupplier implements Supplier { } private Repository getRepository() { + @FixForMultiProject(description = "resolve the actual projectId, ES-12138") + final var projectId = ProjectId.DEFAULT; if (repositoryUuid == null) { // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name - final Repository repository = repositoriesService.repository(repositoryName); + final Repository repository = repositoriesService.repository(projectId, repositoryName); assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName; return repository; } - final Map repositoriesByName = repositoriesService.getRepositories(); + final Map repositoriesByName = repositoriesService.getProjectRepositories(projectId); final String currentRepositoryNameHint = repositoryNameHint; final Repository repositoryByLastKnownName = repositoriesByName.get(currentRepositoryNameHint); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java index c090c715cafd..35d71ba23e28 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.blobcache.shared.SharedBlobCacheService; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.UUIDs; @@ -592,8 +593,9 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot repositorySettings.build() ); + final ProjectId projectId = randomProjectIdOrDefault(); final BlobStoreRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, new Environment( Settings.builder() @@ -604,7 +606,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot null ), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService(repositoryMetadata), + BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ); diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 39115556d290..f064c97ff296 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -47,8 +47,10 @@ tasks.named("yamlRestTest").configure { ArrayList blacklist = [ /* These tests don't work on multi-project yet - we need to go through each of them and make them work */ '^cat.recovery/*/*', + '^cat.repositories/*/*', '^cat.snapshots/*/*', '^cluster.desired_balance/10_basic/*', + '^cluster.stats/10_basic/snapshot stats reported in get cluster stats', '^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API '^health/10_basic/*', '^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security @@ -57,11 +59,12 @@ tasks.named("yamlRestTest").configure { '^indices.resolve_cluster/*/*/*', '^indices.shard_stores/*/*', '^migration/*/*', + '^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)', '^snapshot.clone/*/*', '^snapshot.create/*/*', '^snapshot.delete/*/*', '^snapshot.get/*/*', - '^snapshot.get_repository/20_repository_uuid/*', + '^snapshot.get_repository/*/*', '^snapshot.restore/*/*', '^snapshot.status/*/*', '^synonyms/*/*',