diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java index ecf910a7ab05..c64e45dd6827 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/RepositoriesServiceIT.java @@ -11,6 +11,7 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; @@ -65,7 +66,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase { assertThat(originalRepositoryMetadata.type(), equalTo(FsRepository.TYPE)); - final Repository originalRepository = repositoriesService.repository(repositoryName); + final Repository originalRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(originalRepository, instanceOf(FsRepository.class)); final boolean updated = randomBoolean(); @@ -89,7 +90,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase { assertThat(updatedRepositoryMetadata.type(), equalTo(updatedRepositoryType)); - final Repository updatedRepository = repositoriesService.repository(repositoryName); + final Repository updatedRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName); assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository)); // check that a noop update does not verify. Since the new data node does not share the same `path.repo`, verification will fail if diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java index fc01c0fecca0..d52e7e91d4f4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/delete/TransportDeleteRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/delete"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportDeleteRepositoryAction( @@ -43,7 +45,8 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster ClusterState state, final ActionListener listener ) { - repositoriesService.unregisterRepository(request, listener); + repositoriesService.unregisterRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java index dbb448b8153e..404ebab5edd1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/TransportGetRepositoriesAction.java @@ -11,11 +11,12 @@ package org.elasticsearch.action.admin.cluster.repositories.get; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -28,14 +29,15 @@ import org.elasticsearch.transport.TransportService; /** * Transport action for get repositories operation */ -public class TransportGetRepositoriesAction extends TransportMasterNodeReadAction { +public class TransportGetRepositoriesAction extends TransportMasterNodeReadProjectAction { @Inject public TransportGetRepositoriesAction( TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( GetRepositoriesAction.NAME, @@ -44,24 +46,25 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadActio threadPool, actionFilters, GetRepositoriesRequest::new, + projectResolver, GetRepositoriesResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); } @Override - protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ProjectState state) { + return state.blocks().globalBlockedException(state.projectId(), ClusterBlockLevel.METADATA_READ); } @Override protected void masterOperation( Task task, final GetRepositoriesRequest request, - ClusterState state, + ProjectState state, final ActionListener listener ) { - final var result = ResolvedRepositories.resolve(state, request.repositories()); + final var result = ResolvedRepositories.resolve(state.metadata(), request.repositories()); if (result.hasMissingRepositories()) { listener.onFailure(new RepositoryMissingException(String.join(", ", result.missing()))); } else { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java index 2456bc6a7c6c..04de5d876efa 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/put/TransportPutRepositoryAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; @@ -36,6 +37,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod public static final ActionType TYPE = new ActionType<>("cluster:admin/repository/put"); private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportPutRepositoryAction( @@ -43,7 +45,8 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod ClusterService clusterService, RepositoriesService repositoriesService, ThreadPool threadPool, - ActionFilters actionFilters + ActionFilters actionFilters, + ProjectResolver projectResolver ) { super( TYPE.name(), @@ -55,11 +58,12 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod EsExecutors.DIRECT_EXECUTOR_SERVICE ); this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected ClusterBlockException checkBlock(PutRepositoryRequest request, ClusterState state) { - return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE); } @Override @@ -69,7 +73,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod ClusterState state, final ActionListener listener ) { - repositoriesService.registerRepository(request, listener); + repositoriesService.registerRepository(projectResolver.getProjectId(), request, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java index ee6e7f320f51..35ee66834716 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryAction.java @@ -11,6 +11,8 @@ package org.elasticsearch.action.admin.cluster.repositories.reservedstate; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.reservedstate.ReservedClusterStateHandler; import org.elasticsearch.reservedstate.TransformState; @@ -60,7 +62,9 @@ public class ReservedRepositoryAction implements ReservedClusterStateHandler { private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportVerifyRepositoryAction( @@ -37,7 +39,8 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction listener ) { repositoriesService.verifyRepository( + projectResolver.getProjectId(), request.name(), listener.map(verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0]))) ); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index eab763165a74..7393ab23d889 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -144,7 +144,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction requestedSnapshotNames = Sets.newHashSet(request.snapshots()); final ListenableFuture repositoryDataListener = new ListenableFuture<>(); - repositoriesService.getRepositoryData(repositoryName, repositoryDataListener); + @FixForMultiProject(description = "resolve the actual projectId, ES-10166") + final var projectId = ProjectId.DEFAULT; + repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener); final Collection snapshotIdsToLoad = new ArrayList<>(); repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> { task.ensureNotCancelled(); diff --git a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java index 5fd573a3f665..a85a9bf73bea 100644 --- a/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java +++ b/server/src/main/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTracker.java @@ -42,7 +42,7 @@ public class RepositoriesHealthTracker extends HealthTracker(); var invalid = new ArrayList(); - repositories.values().forEach(repository -> { + repositories.forEach(repository -> { if (repository instanceof UnknownTypeRepository) { unknown.add(repository.getMetadata().name()); } else if (repository instanceof InvalidRepository) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 92dfd74891e0..76ecd8141f79 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -39,6 +39,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -154,6 +155,7 @@ import org.elasticsearch.repositories.Repository; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader; import org.elasticsearch.search.suggest.completion.CompletionStats; +import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transports; @@ -3527,12 +3529,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } case SNAPSHOT -> { - final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository(); + final Snapshot snapshot = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot(); + final ProjectId projectId = snapshot.getProjectId(); + final String repo = snapshot.getRepository(); executeRecovery( "from snapshot", recoveryState, recoveryListener, - l -> restoreFromRepository(repositoriesService.repository(repo), l) + l -> restoreFromRepository(repositoriesService.repository(projectId, repo), l) ); } case LOCAL_SHARDS -> { diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java index bd86ab93d30e..c46012e8a418 100644 --- a/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/repositories/IndexSnapshotsService.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots; @@ -28,7 +30,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; public class IndexSnapshotsService { @@ -136,8 +137,9 @@ public class IndexSnapshotsService { } private Repository getRepository(String repositoryName) { - final Map repositories = repositoriesService.getRepositories(); - return repositories.get(repositoryName); + @FixForMultiProject(description = "resolve the actual projectId, ES-12176") + final var projectId = ProjectId.DEFAULT; + return repositoriesService.repositoryOrNull(projectId, repositoryName); } private static class FetchShardSnapshotContext { diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 6f009ed92cf9..6248e74bee10 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; @@ -44,6 +45,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -66,6 +69,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -73,6 +77,7 @@ import java.util.stream.Stream; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.core.Strings.format; +import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY; import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY; @@ -84,8 +89,9 @@ import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE * factory information to create new repositories, and provides access to and maintains the lifecycle of repositories. New nodes can easily * find all the repositories via the cluster state after joining a cluster. * - * {@link #repository(String)} can be used to fetch a repository. {@link #createRepository(RepositoryMetadata)} does the heavy lifting of - * creation. {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. + * {@link #repository(ProjectId, String)} can be used to fetch a repository. + * {@link #createRepository(ProjectId, RepositoryMetadata)} does the heavy lifting of creation. + * {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates. */ public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -112,8 +118,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final ThreadPool threadPool; private final NodeClient client; - private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); - private volatile Map repositories = Collections.emptyMap(); + private final Map> internalRepositories = ConcurrentCollections.newConcurrentMap(); + private final Map> repositories = ConcurrentCollections.newConcurrentMap(); private final RepositoriesStatsArchive repositoriesStatsArchive; private final List> preRestoreChecks; @@ -154,10 +160,15 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * This method can be only called on the master node. * It tries to create a new repository on the master, and if it was successful, it adds a new repository to cluster metadata. * + * @param projectId the project ID to which the repository belongs. * @param request register repository request * @param responseListener register repository listener */ - public void registerRepository(final PutRepositoryRequest request, final ActionListener responseListener) { + public void registerRepository( + final ProjectId projectId, + final PutRepositoryRequest request, + final ActionListener responseListener + ) { assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]"; validateRepositoryName(request.name()); @@ -167,7 +178,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C SubscribableListener // Trying to create the new repository on master to make sure it works - .newForked(validationStep -> validatePutRepositoryRequest(request, validationStep)) + .newForked(validationStep -> validatePutRepositoryRequest(projectId, request, validationStep)) // When publication has completed (and all acks received or timed out) then verify the repository. // (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have @@ -176,11 +187,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C final ListenableFuture acknowledgementStep = new ListenableFuture<>(); final ListenableFuture publicationStep = new ListenableFuture<>(); // Boolean==changed. submitUnbatchedTask( - "put_repository [" + request.name() + "]", - new RegisterRepositoryTask(this, request, acknowledgementStep) { + "put_repository " + projectRepoString(projectId, request.name()), + new RegisterRepositoryTask(this, projectId, request, acknowledgementStep) { @Override public void onFailure(Exception e) { - logger.warn(() -> "failed to create repository [" + request.name() + "]", e); + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, request.name()), e); publicationStep.onFailure(e); super.onFailure(e); } @@ -195,9 +206,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { if (changed) { if (found) { - logger.info("updated repository [{}]", request.name()); + logger.info("updated repository {}", projectRepoString(projectId, request.name())); } else { - logger.info("put repository [{}]", request.name()); + logger.info("put repository {}", projectRepoString(projectId, request.name())); } } publicationStep.onResponse(oldState != newState); @@ -220,7 +231,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .>newForked(verifyRepositoryStep -> { if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) { - verifyRepository(request.name(), verifyRepositoryStep); + final ThreadContext threadContext = threadPool.getThreadContext(); + verifyRepository(projectId, request.name(), verifyRepositoryStep); } else { verifyRepositoryStep.onResponse(null); } @@ -231,7 +243,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .execute( ActionRunnable.wrap( getRepositoryDataStep, - ll -> repository(request.name()).getRepositoryData( + ll -> repository(projectId, request.name()).getRepositoryData( // TODO contemplate threading, do we need to fork, see #101445? EsExecutors.DIRECT_EXECUTOR_SERVICE, ll @@ -243,6 +255,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C .andThen( (updateRepoUuidStep, repositoryData) -> updateRepositoryUuidInMetadata( clusterService, + projectId, request.name(), repositoryData, updateRepoUuidStep @@ -263,16 +276,19 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C protected boolean found = false; protected boolean changed = false; + private final ProjectId projectId; private final PutRepositoryRequest request; private final RepositoriesService repositoriesService; RegisterRepositoryTask( final RepositoriesService repositoriesService, + final ProjectId projectId, final PutRepositoryRequest request, final ListenableFuture acknowledgementStep ) { super(request, acknowledgementStep); this.repositoriesService = repositoriesService; + this.projectId = projectId; this.request = request; } @@ -281,14 +297,18 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * @param repositoriesService * @param request */ - public RegisterRepositoryTask(final RepositoriesService repositoriesService, final PutRepositoryRequest request) { - this(repositoriesService, request, null); + public RegisterRepositoryTask( + final RepositoriesService repositoriesService, + final ProjectId projectId, + final PutRepositoryRequest request + ) { + this(repositoriesService, projectId, request, null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); List repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1); for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (repositoryMetadata.name().equals(request.name())) { @@ -304,10 +324,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C request.type(), request.settings() ); - Repository existing = repositoriesService.repositories.get(request.name()); - if (existing == null) { - existing = repositoriesService.internalRepositories.get(request.name()); - } + Repository existing = repositoriesService.repositoryOrNull(projectId, request.name()); assert existing != null : "repository [" + newRepositoryMetadata.name() + "] must exist"; assert existing.getMetadata() == repositoryMetadata; final RepositoryMetadata updatedMetadata; @@ -316,7 +333,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C if (repositoryMetadata.generation() == RepositoryData.CORRUPTED_REPO_GEN) { // If recreating a corrupted repository with the same settings, reset the corrupt flag. // Setting the safe generation to unknown, so that a consistent generation is found. - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); logger.info( "repository [{}/{}] is marked as corrupted, resetting the corruption marker", repositoryMetadata.name(), @@ -334,7 +351,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C // we're updating in place so the updated metadata must point at the same uuid and generations updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings()); } else { - ensureRepositoryNotInUse(currentState, request.name()); + ensureRepositoryNotInUse(projectState, request.name()); updatedMetadata = newRepositoryMetadata; } found = true; @@ -349,7 +366,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C repositories = new RepositoriesMetadata(repositoriesMetadata); changed = true; return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata(ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories)) .build(); } } @@ -361,17 +378,21 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * * @param request */ - public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) { + public void validateRepositoryCanBeCreated(final ProjectId projectId, final PutRepositoryRequest request) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); // Trying to create the new repository on master to make sure it works - closeRepository(createRepository(newRepositoryMetadata)); + closeRepository(createRepository(projectId, newRepositoryMetadata)); } - private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener resultListener) { + private void validatePutRepositoryRequest( + final ProjectId projectId, + final PutRepositoryRequest request, + ActionListener resultListener + ) { final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings()); try { - final var repository = createRepository(newRepositoryMetadata); + final var repository = createRepository(projectId, newRepositoryMetadata); if (request.verify()) { // verify repository on local node only, different from verifyRepository method that runs on other cluster nodes threadPool.executor(ThreadPool.Names.SNAPSHOT) @@ -413,6 +434,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C */ public static void updateRepositoryUuidInMetadata( ClusterService clusterService, + final ProjectId projectId, final String repositoryName, RepositoryData repositoryData, ActionListener listener @@ -424,7 +446,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return; } - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state()).repository(repositoryName); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state().metadata().getProject(projectId)) + .repository(repositoryName); if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) { listener.onResponse(null); return; @@ -441,11 +464,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C submitUnbatchedTask( clusterService, - "update repository UUID [" + repositoryName + "] to [" + repositoryUuid + "]", + "update repository UUID " + projectRepoString(projectId, repositoryName) + " to [" + repositoryUuid + "]", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(projectId); final RepositoriesMetadata currentReposMetadata = RepositoriesMetadata.get(project); final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName); @@ -477,24 +500,32 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C *

* This method can be only called on the master node. It removes repository information from cluster metadata. * + * @param projectId project to look for the repository * @param request unregister repository request * @param listener unregister repository listener */ - public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener listener) { - submitUnbatchedTask("delete_repository [" + request.name() + "]", new UnregisterRepositoryTask(request, listener) { - @Override - public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { - if (deletedRepositories.isEmpty() == false) { - logger.info("deleted repositories [{}]", deletedRepositories); + public void unregisterRepository( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { + submitUnbatchedTask( + "delete_repository " + projectRepoString(projectId, request.name()), + new UnregisterRepositoryTask(projectId, request, listener) { + @Override + public void clusterStateProcessed(ClusterState oldState, ClusterState newState) { + if (deletedRepositories.isEmpty() == false) { + logger.info("deleted repositories [{}] for project [{}]", deletedRepositories, projectId); + } + } + + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + // repository was created on both master and data nodes + return discoveryNode.isMasterNode() || discoveryNode.canContainData(); } } - - @Override - public boolean mustAck(DiscoveryNode discoveryNode) { - // repository was created on both master and data nodes - return discoveryNode.isMasterNode() || discoveryNode.canContainData(); - } - }); + ); } /** @@ -503,10 +534,16 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C */ public static class UnregisterRepositoryTask extends AckedClusterStateUpdateTask { protected final List deletedRepositories = new ArrayList<>(); + private final ProjectId projectId; private final DeleteRepositoryRequest request; - UnregisterRepositoryTask(final DeleteRepositoryRequest request, final ActionListener listener) { + UnregisterRepositoryTask( + final ProjectId projectId, + final DeleteRepositoryRequest request, + final ActionListener listener + ) { super(request, listener); + this.projectId = projectId; this.request = request; } @@ -514,20 +551,20 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * Constructor used by {@link org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction} * @param name the repository name */ - public UnregisterRepositoryTask(TimeValue dummyTimeout, String name) { - this(new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); + public UnregisterRepositoryTask(TimeValue dummyTimeout, ProjectId projectId, String name) { + this(projectId, new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null); } @Override public ClusterState execute(ClusterState currentState) { - final var project = currentState.metadata().getDefaultProject(); - RepositoriesMetadata repositories = RepositoriesMetadata.get(project); + final var projectState = currentState.projectState(projectId); + RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata()); if (repositories.repositories().size() > 0) { List repositoriesMetadata = new ArrayList<>(repositories.repositories().size()); boolean changed = false; for (RepositoryMetadata repositoryMetadata : repositories.repositories()) { if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) { - ensureRepositoryNotInUse(currentState, repositoryMetadata.name()); + ensureRepositoryNotInUse(projectState, repositoryMetadata.name()); ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata); deletedRepositories.add(repositoryMetadata.name()); changed = true; @@ -538,7 +575,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C if (changed) { repositories = new RepositoriesMetadata(repositoriesMetadata); return ClusterState.builder(currentState) - .putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories)) + .putProjectMetadata( + ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories) + ) .build(); } } @@ -549,8 +588,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - public void verifyRepository(final String repositoryName, final ActionListener> listener) { - final Repository repository = repository(repositoryName); + public void verifyRepository( + final ProjectId projectId, + final String repositoryName, + final ActionListener> listener + ) { + final Repository repository = repository(projectId, repositoryName); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) { @Override protected void doRun() { @@ -566,7 +609,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C try { repository.endVerification(verificationToken); } catch (Exception e) { - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", e); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + + " failed to finish repository verification", + e + ); delegatedListener.onFailure(e); return; } @@ -580,7 +627,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C repository.endVerification(verificationToken); } catch (Exception inner) { inner.addSuppressed(e); - logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", inner); + logger.warn( + () -> projectRepoString(projectId, repositoryName) + " failed to finish repository verification", + inner + ); } listener.onFailure(e); }); @@ -608,81 +658,154 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public void applyClusterState(ClusterChangedEvent event) { try { final ClusterState state = event.state(); - assert assertReadonlyRepositoriesNotInUseForWrites(state); - final RepositoriesMetadata oldMetadata = RepositoriesMetadata.get(event.previousState()); - final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state); + final ClusterState previousState = event.previousState(); - // Check if repositories got changed - if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { - for (Repository repo : repositories.values()) { - repo.updateState(state); - } - return; + for (var projectId : event.projectDelta().removed()) { // removed projects + applyProjectStateForRemovedProject(state.version(), previousState.projectState(projectId)); } - logger.trace("processing new index repositories for state version [{}]", event.state().version()); - - Map survivors = new HashMap<>(); - // First, remove repositories that are no longer there - for (Map.Entry entry : repositories.entrySet()) { - if (newMetadata.repository(entry.getKey()) == null) { - logger.debug("unregistering repository [{}]", entry.getKey()); - Repository repository = entry.getValue(); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - } else { - survivors.put(entry.getKey(), entry.getValue()); - } + for (var projectId : event.projectDelta().added()) { // added projects + applyProjectStateForAddedOrExistingProject(state.version(), state.projectState(projectId), null); } - Map builder = new HashMap<>(); - - // Now go through all repositories and update existing or create missing - for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { - Repository repository = survivors.get(repositoryMetadata.name()); - if (repository != null) { - // Found previous version of this repository - if (canUpdateInPlace(repositoryMetadata, repository) == false) { - // Previous version is different from the version in settings - logger.debug("updating repository [{}]", repositoryMetadata.name()); - closeRepository(repository); - archiveRepositoryStats(repository, state.version()); - repository = null; - try { - repository = createRepository( - repositoryMetadata, - typesRegistry, - RepositoriesService::createUnknownTypeRepository - ); - } catch (RepositoryException ex) { - // TODO: this catch is bogus, it means the old repo is already closed, - // but we have nothing to replace it - logger.warn(() -> "failed to change repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); - } - } - } else { - try { - repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository); - } catch (RepositoryException ex) { - logger.warn(() -> "failed to create repository [" + repositoryMetadata.name() + "]", ex); - repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex); - } - } - assert repository != null : "repository should not be null here"; - logger.debug("registering repository [{}]", repositoryMetadata.name()); - builder.put(repositoryMetadata.name(), repository); + // existing projects + final var common = event.projectDelta().added().isEmpty() + ? state.metadata().projects().keySet() + : Sets.difference(state.metadata().projects().keySet(), event.projectDelta().added()); + for (var projectId : common) { + applyProjectStateForAddedOrExistingProject( + state.version(), + state.projectState(projectId), + previousState.projectState(projectId) + ); } - for (Repository repo : builder.values()) { - repo.updateState(state); - } - repositories = unmodifiableMap(builder); } catch (Exception ex) { assert false : new AssertionError(ex); logger.warn("failure updating cluster state ", ex); } } + /** + * Apply changes for one removed project. + * + * @param version The cluster state version of the change. + * @param previousState The previous project state for the removed project. + */ + private void applyProjectStateForRemovedProject(long version, ProjectState previousState) { + final var projectId = previousState.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false : "default project cannot be removed"; + final var survivors = closeRemovedRepositories(version, projectId, getProjectRepositories(projectId), RepositoriesMetadata.EMPTY); + assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet(); + repositories.remove(projectId); + } + + /** + * Apply changes for one project. The project can be either newly added or an existing one. + * + * @param version The cluster state version of the change. + * @param state The current project state + * @param previousState The previous project state, or {@code null} if the project was newly added. + */ + private void applyProjectStateForAddedOrExistingProject(long version, ProjectState state, @Nullable ProjectState previousState) { + assert assertReadonlyRepositoriesNotInUseForWrites(state); + final var projectId = state.projectId(); + assert ProjectId.DEFAULT.equals(projectId) == false || previousState != null : "default project cannot be added"; + assert previousState == null || projectId.equals(previousState.projectId()) + : "current and previous states must refer to the same project, but got " + projectId + " != " + previousState.projectId(); + + final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state.metadata()); + final RepositoriesMetadata oldMetadata = previousState == null + ? RepositoriesMetadata.EMPTY + : RepositoriesMetadata.get(previousState.metadata()); + + final Map projectRepositories = getProjectRepositories(projectId); + // Check if repositories got changed + if (oldMetadata.equalsIgnoreGenerations(newMetadata)) { + for (Repository repo : projectRepositories.values()) { + repo.updateState(state.cluster()); + } + return; + } + + logger.trace("processing new index repositories for project [{}] and state version [{}]", projectId, version); + + // First, remove repositories that are no longer there + final var survivors = closeRemovedRepositories(version, projectId, projectRepositories, newMetadata); + + Map builder = new HashMap<>(); + + // Now go through all repositories and update existing or create missing + for (RepositoryMetadata repositoryMetadata : newMetadata.repositories()) { + Repository repository = survivors.get(repositoryMetadata.name()); + if (repository != null) { + // Found previous version of this repository + if (canUpdateInPlace(repositoryMetadata, repository) == false) { + // Previous version is different from the version in settings + logger.debug("updating repository {}", projectRepoString(projectId, repositoryMetadata.name())); + closeRepository(repository); + archiveRepositoryStats(repository, version); + repository = null; + try { + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); + } catch (RepositoryException ex) { + // TODO: this catch is bogus, it means the old repo is already closed, + // but we have nothing to replace it + logger.warn(() -> "failed to change repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); + } + } + } else { + try { + repository = createRepository( + projectId, + repositoryMetadata, + typesRegistry, + RepositoriesService::createUnknownTypeRepository + ); + } catch (RepositoryException ex) { + logger.warn(() -> "failed to create repository " + projectRepoString(projectId, repositoryMetadata.name()), ex); + repository = new InvalidRepository(projectId, repositoryMetadata, ex); + } + } + assert repository != null : "repository should not be null here"; + logger.debug("registering repository [{}]", projectRepoString(projectId, repositoryMetadata.name())); + builder.put(repositoryMetadata.name(), repository); + } + for (Repository repo : builder.values()) { + repo.updateState(state.cluster()); + } + if (builder.isEmpty() == false) { + repositories.put(projectId, unmodifiableMap(builder)); + } else { + repositories.remove(projectId); + } + } + + private Map closeRemovedRepositories( + long version, + ProjectId projectId, + Map projectRepositories, + RepositoriesMetadata newMetadata + ) { + Map survivors = new HashMap<>(); + for (Map.Entry entry : projectRepositories.entrySet()) { + if (newMetadata.repository(entry.getKey()) == null) { + logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey())); + Repository repository = entry.getValue(); + closeRepository(repository); + archiveRepositoryStats(repository, version); + } else { + survivors.put(entry.getKey(), entry.getValue()); + } + } + return survivors; + } + private static boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repository repository) { assert updatedMetadata.name().equals(repository.getMetadata().name()); return repository.getMetadata().type().equals(updatedMetadata.type()) @@ -692,12 +815,13 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C /** * Gets the {@link RepositoryData} for the given repository. * + * @param projectId project to look for the repository * @param repositoryName repository name * @param listener listener to pass {@link RepositoryData} to */ - public void getRepositoryData(final String repositoryName, final ActionListener listener) { + public void getRepositoryData(final ProjectId projectId, final String repositoryName, final ActionListener listener) { try { - Repository repository = repository(repositoryName); + Repository repository = repository(projectId, repositoryName); assert repository != null; // should only be called once we've validated the repository exists repository.getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? @@ -709,18 +833,28 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } /** - * Returns registered repository + * Returns registered repository, either internal or external * * @param repositoryName repository name * @return registered repository * @throws RepositoryMissingException if repository with such name isn't registered */ + @FixForMultiProject + @Deprecated(forRemoval = true) public Repository repository(String repositoryName) { - Repository repository = repositories.get(repositoryName); - if (repository != null) { - return repository; - } - repository = internalRepositories.get(repositoryName); + return repository(ProjectId.DEFAULT, repositoryName); + } + + /** + * Returns registered repository, either internal or external + * + * @param projectId the project to look for the repository + * @param repositoryName repository name + * @return registered repository + * @throws RepositoryMissingException if repository with such name isn't registered + */ + public Repository repository(ProjectId projectId, String repositoryName) { + Repository repository = repositoryOrNull(projectId, repositoryName); if (repository != null) { return repository; } @@ -728,10 +862,33 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } /** - * @return the current collection of registered repositories, keyed by name. + * Similar to {@link #repository(ProjectId, String)}, but returns {@code null} instead of throw if the repository is not found. */ - public Map getRepositories() { - return unmodifiableMap(repositories); + public Repository repositoryOrNull(ProjectId projectId, String repositoryName) { + Repository repository = repositories.getOrDefault(projectId, Map.of()).get(repositoryName); + if (repository != null) { + return repository; + } + return internalRepositories.getOrDefault(projectId, Map.of()).get(repositoryName); + } + + /** + * @return the current collection of registered repositories from all projects. + */ + public List getRepositories() { + return repositories.values().stream().map(Map::values).flatMap(Collection::stream).toList(); + } + + /** + * @return the current collection of registered repositories for the given project, keyed by name. + */ + public Map getProjectRepositories(ProjectId projectId) { + return repositories.getOrDefault(projectId, Map.of()); + } + + // Package private for testing + boolean hasRepositoryTrackingForProject(ProjectId projectId) { + return repositories.containsKey(projectId) || internalRepositories.containsKey(projectId); } public List repositoriesStats() { @@ -745,8 +902,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C public RepositoriesStats getRepositoriesThrottlingStats() { return new RepositoriesStats( - repositories.values() - .stream() + getRepositories().stream() .collect( Collectors.toMap( r -> r.getMetadata().name(), @@ -757,7 +913,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } private List getRepositoryStatsForActiveRepositories() { - return Stream.concat(repositories.values().stream(), internalRepositories.values().stream()) + return Stream.concat( + repositories.values().stream().map(Map::values).flatMap(Collection::stream), + internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream) + ) .filter(r -> r instanceof MeteredBlobStoreRepository) .map(r -> (MeteredBlobStoreRepository) r) .map(MeteredBlobStoreRepository::statsSnapshot) @@ -768,12 +927,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return repositoriesStatsArchive.clear(maxVersionToClear); } - public void registerInternalRepository(String name, String type) { + public void registerInternalRepository(ProjectId projectId, String name, String type) { RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY); - Repository repository = internalRepositories.computeIfAbsent(name, (n) -> { - logger.debug("put internal repository [{}][{}]", name, type); - return createRepository(metadata, internalTypesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists); - }); + Repository repository = internalRepositories.compute(projectId, (ignored, existingRepos) -> { + if (existingRepos == null) { + existingRepos = Map.of(); + } + if (existingRepos.containsKey(name)) { + return existingRepos; + } + logger.debug("put internal repository [{}][{}]", projectRepoString(projectId, name), type); + final var repo = createRepository( + projectId, + metadata, + internalTypesRegistry, + RepositoriesService::throwRepositoryTypeDoesNotExists + ); + final var newRepos = new HashMap<>(existingRepos); + newRepos.put(name, repo); + return unmodifiableMap(newRepos); + }).get(name); if (type.equals(repository.getMetadata().type()) == false) { logger.warn( () -> format( @@ -785,7 +958,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C type ) ); - } else if (repositories.containsKey(name)) { + } else if (getProjectRepositories(projectId).containsKey(name)) { logger.warn( () -> format( "non-internal repository [%s] already registered. this repository will block the " @@ -798,11 +971,24 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - public void unregisterInternalRepository(String name) { - Repository repository = internalRepositories.remove(name); + public void unregisterInternalRepository(ProjectId projectId, String name) { + final var repositoryRef = new AtomicReference(); + internalRepositories.computeIfPresent(projectId, (ignored, existingRepos) -> { + if (existingRepos.containsKey(name) == false) { + return existingRepos; + } + final var newRepos = new HashMap<>(existingRepos); + repositoryRef.set(newRepos.remove(name)); + if (newRepos.isEmpty()) { + return null; + } else { + return unmodifiableMap(newRepos); + } + }); + Repository repository = repositoryRef.get(); if (repository != null) { RepositoryMetadata metadata = repository.getMetadata(); - logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), name)); + logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), projectRepoString(projectId, name))); closeRepository(repository); } } @@ -811,7 +997,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C * Closes the given repository. */ private static void closeRepository(Repository repository) { - logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name()); + logger.debug( + "closing repository [{}]{}", + repository.getMetadata().type(), + projectRepoString(repository.getProjectId(), repository.getMetadata().name()) + ); repository.close(); } @@ -824,19 +1014,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - /** - * Creates repository holder. This method starts the repository - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - private static Repository createRepository( - RepositoryMetadata repositoryMetadata, - Map factories, - BiFunction defaultFactory - ) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory); - } - /** * Creates repository holder. This method starts the repository */ @@ -863,27 +1040,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - /** - * Creates a repository holder. - * - *

WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:

- * - *
    - *
  • This method does not register the repository (e.g., in the cluster state).
  • - *
  • This method starts the repository. The repository should be closed after use.
  • - *
  • The repository metadata should be associated to an already registered non-internal repository type and factory pair.
  • - *
- * - * @param repositoryMetadata the repository metadata - * @return the started repository - * @throws RepositoryException if repository type is not registered - */ - @FixForMultiProject(description = "resolve the actual ProjectId") - @Deprecated(forRemoval = true) - public Repository createRepository(RepositoryMetadata repositoryMetadata) { - return createRepository(ProjectId.DEFAULT, repositoryMetadata); - } - /** * Creates a repository holder. * @@ -947,25 +1103,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C } } - private static void ensureRepositoryNotInUseForWrites(ClusterState clusterState, String repository) { - if (SnapshotsInProgress.get(clusterState).forRepo(repository).isEmpty() == false) { + private static void ensureRepositoryNotInUseForWrites(ProjectState projectState, String repository) { + final ProjectId projectId = projectState.projectId(); + if (SnapshotsInProgress.get(projectState.cluster()).forRepo(projectId, repository).isEmpty() == false) { throw newRepositoryConflictException(repository, "snapshot is in progress"); } - for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(clusterState).getEntries()) { - if (entry.repository().equals(repository)) { + for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(projectState.cluster()).getEntries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "snapshot deletion is in progress"); } } - for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(clusterState).entries()) { - if (entry.repository().equals(repository)) { + for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(projectState.cluster()).entries()) { + if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) { throw newRepositoryConflictException(repository, "repository clean up is in progress"); } } } - private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) { - ensureRepositoryNotInUseForWrites(clusterState, repository); - for (RestoreInProgress.Entry entry : RestoreInProgress.get(clusterState)) { + private static void ensureRepositoryNotInUse(ProjectState projectState, String repository) { + ensureRepositoryNotInUseForWrites(projectState, repository); + for (RestoreInProgress.Entry entry : RestoreInProgress.get(projectState.cluster())) { if (repository.equals(entry.snapshot().getRepository())) { throw newRepositoryConflictException(repository, "snapshot restore is in progress"); } @@ -979,11 +1136,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C /** * Test-only check for the invariant that read-only repositories never have any write activities. */ - private static boolean assertReadonlyRepositoriesNotInUseForWrites(ClusterState clusterState) { - for (final var repositoryMetadata : RepositoriesMetadata.get(clusterState).repositories()) { + private static boolean assertReadonlyRepositoriesNotInUseForWrites(ProjectState projectState) { + assert projectState != null; + for (final var repositoryMetadata : RepositoriesMetadata.get(projectState.metadata()).repositories()) { if (isReadOnly(repositoryMetadata.settings())) { try { - ensureRepositoryNotInUseForWrites(clusterState, repositoryMetadata.name()); + ensureRepositoryNotInUseForWrites(projectState, repositoryMetadata.name()); } catch (Exception e) { throw new AssertionError("repository [" + repositoryMetadata + "] is readonly but still in use", e); } @@ -1071,7 +1229,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C return RepositoryUsageStats.EMPTY; } final var statsByType = new HashMap>(); - for (final var repository : repositories.values()) { + for (final var repository : getRepositories()) { final var repositoryType = repository.getMetadata().type(); final var typeStats = statsByType.computeIfAbsent(repositoryType, ignored -> new HashMap<>()); typeStats.compute(COUNT_USAGE_STATS_NAME, (k, count) -> (count == null ? 0L : count) + 1); @@ -1096,8 +1254,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C protected void doClose() throws IOException { clusterService.removeApplier(this); final Collection repos = new ArrayList<>(); - repos.addAll(internalRepositories.values()); - repos.addAll(repositories.values()); + repos.addAll(internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream).toList()); + repos.addAll(getRepositories()); IOUtils.close(repos); for (Repository repo : repos) { repo.awaitIdle(); diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java index 5357218b03f4..e6bdf5bc5efc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryOperation.java @@ -52,6 +52,19 @@ public interface RepositoryOperation { projectId.writeTo(out); out.writeString(name); } + + @Override + public String toString() { + return projectRepoString(projectId, name); + } + } + + static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) { + return new ProjectRepo(projectId, repositoryName); + } + + static String projectRepoString(ProjectId projectId, String repositoryName) { + return "[" + projectId + "][" + repositoryName + "]"; } DiffableUtils.KeySerializer PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() { diff --git a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java index ea147f336cb7..91b9e348bf34 100644 --- a/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java +++ b/server/src/main/java/org/elasticsearch/repositories/ResolvedRepositories.java @@ -9,7 +9,7 @@ package org.elasticsearch.repositories; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -21,7 +21,7 @@ import java.util.List; import java.util.Set; /** - * The result of calling {@link #resolve(ClusterState, String[])} to resolve a description of some snapshot repositories (from a path + * The result of calling {@link #resolve(ProjectMetadata, String[])} to resolve a description of some snapshot repositories (from a path * component of a request to the get-repositories or get-snapshots APIs) against the known repositories in the cluster state: the * {@link RepositoryMetadata} for the extant repositories that match the description, together with a list of the parts of the description * that failed to match any known repository. @@ -38,8 +38,8 @@ public record ResolvedRepositories(List repositoryMetadata, || (patterns.length == 1 && (ALL_PATTERN.equalsIgnoreCase(patterns[0]) || Regex.isMatchAllPattern(patterns[0]))); } - public static ResolvedRepositories resolve(ClusterState state, String[] patterns) { - final var repositories = RepositoriesMetadata.get(state); + public static ResolvedRepositories resolve(ProjectMetadata projectMetadata, String[] patterns) { + final var repositories = RepositoriesMetadata.get(projectMetadata); if (isMatchAll(patterns)) { return new ResolvedRepositories(repositories.repositories(), List.of()); } diff --git a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java index a2d4f2921555..f84adbe407b7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/repositories/VerifyNodeRepositoryAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.LegacyActionRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -39,6 +40,7 @@ public class VerifyNodeRepositoryAction { private final ClusterService clusterService; private final RepositoriesService repositoriesService; + private final ProjectResolver projectResolver; @Inject public TransportAction( @@ -46,18 +48,20 @@ public class VerifyNodeRepositoryAction { ActionFilters actionFilters, ThreadPool threadPool, ClusterService clusterService, - RepositoriesService repositoriesService + RepositoriesService repositoriesService, + ProjectResolver projectResolver ) { super(ACTION_NAME, transportService, actionFilters, Request::new, threadPool.executor(ThreadPool.Names.SNAPSHOT)); this.clusterService = clusterService; this.repositoriesService = repositoriesService; + this.projectResolver = projectResolver; } @Override protected void doExecute(Task task, Request request, ActionListener listener) { DiscoveryNode localNode = clusterService.state().nodes().getLocalNode(); try { - Repository repository = repositoriesService.repository(request.repository); + Repository repository = repositoriesService.repository(projectResolver.getProjectId(), request.repository); repository.verify(request.verificationToken, localNode); listener.onResponse(ActionResponse.Empty.INSTANCE); } catch (Exception e) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9611f4243776..996c9cda4dea 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -2407,7 +2407,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) { throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]"); } - final var project = currentState.metadata().getDefaultProject(); + final var project = currentState.metadata().getProject(getProjectId()); return ClusterState.builder(currentState) .putProjectMetadata( ProjectMetadata.builder(project) @@ -2476,6 +2476,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // someone switched the repo contents out from under us RepositoriesService.updateRepositoryUuidInMetadata( clusterService, + getProjectId(), metadata.name(), loaded, new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded)) @@ -3115,7 +3116,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } private RepositoryMetadata getRepoMetadata(ClusterState state) { - final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state).repository(metadata.name()); + final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId())) + .repository(metadata.name()); assert repositoryMetadata != null || lifecycle.stoppedOrClosed() : "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]"; return repositoryMetadata; diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index bf6b078571bf..f57db79d4625 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -602,7 +602,9 @@ public final class RestoreService implements ClusterStateApplier { return; } - for (Repository repository : repositoriesService.getRepositories().values()) { + @FixForMultiProject(description = "resolve the actual projectId, ES-10228") + final var projectId = ProjectId.DEFAULT; + for (Repository repository : repositoriesService.getProjectRepositories(projectId).values()) { // We only care about BlobStoreRepositories because they're the only ones that can contain a searchable snapshot, and we // only care about ones with missing UUIDs. It's possible to have the UUID change from under us if, e.g., the repository was // wiped by an external force, but in this case any searchable snapshots are lost anyway so it doesn't really matter. diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 68a935599ed6..d49b670451f5 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -605,7 +605,7 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet"); } - final Repository repository = repositoriesService.repository(snapshot.getRepository()); + final Repository repository = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); SnapshotIndexCommit snapshotIndexCommit = null; try { snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush"); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index abbf2ab22343..8a1f1ad79a17 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1377,7 +1377,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement final String repoName = snapshot.getRepository(); if (tryEnterRepoLoop(repoName)) { if (repositoryData == null) { - repositoriesService.repository(repoName) + repositoriesService.repository(snapshot.getProjectId(), repoName) .getRepositoryData( EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445? new ActionListener<>() { @@ -1486,7 +1486,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement } final String repository = snapshot.getRepository(); final ListenableFuture metadataListener = new ListenableFuture<>(); - final Repository repo = repositoriesService.repository(snapshot.getRepository()); + final Repository repo = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository()); if (entry.isClone()) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> { final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source()); @@ -2441,7 +2441,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement */ private void deleteSnapshotsFromRepository(SnapshotDeletionsInProgress.Entry deleteEntry, IndexVersion minNodeVersion) { final long expectedRepoGen = deleteEntry.repositoryStateId(); - repositoriesService.getRepositoryData(deleteEntry.repository(), new ActionListener<>() { + repositoriesService.getRepositoryData(deleteEntry.projectId(), deleteEntry.repository(), new ActionListener<>() { @Override public void onResponse(RepositoryData repositoryData) { assert repositoryData.getGenId() == expectedRepoGen @@ -2564,7 +2564,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement return; } final SubscribableListener doneFuture = new SubscribableListener<>(); - repositoriesService.repository(deleteEntry.repository()) + repositoriesService.repository(deleteEntry.projectId(), deleteEntry.repository()) .deleteSnapshots(snapshotIds, repositoryData.getGenId(), minNodeVersion, new ActionListener<>() { @Override public void onResponse(RepositoryData updatedRepoData) { @@ -3788,7 +3788,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement entry.source(), clone.getValue(), clone.getKey(), - repositoriesService.repository(entry.repository()) + repositoriesService.repository(entry.projectId(), entry.repository()) ); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java index d62d99aa1c43..cee2c5fd8d41 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/repositories/reservedstate/ReservedRepositoryActionTests.java @@ -34,6 +34,7 @@ import java.util.Set; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -147,12 +148,12 @@ public class ReservedRepositoryActionTests extends ESTestCase { ); doAnswer(invocation -> { - var request = (PutRepositoryRequest) invocation.getArguments()[0]; + var request = (PutRepositoryRequest) invocation.getArguments()[1]; if (request.type().equals("inter_planetary")) { throw new RepositoryException(request.name(), "repository type [" + request.type() + "] does not exist"); } return null; - }).when(repositoriesService).validateRepositoryCanBeCreated(any()); + }).when(repositoriesService).validateRepositoryCanBeCreated(eq(ProjectId.DEFAULT), any()); return repositoriesService; } diff --git a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java index 561c05165eab..e565971cef94 100644 --- a/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/health/node/tracker/RepositoriesHealthTrackerTests.java @@ -22,7 +22,6 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.List; -import java.util.Map; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -44,7 +43,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { } public void testGetHealthNoRepos() { - when(repositoriesService.getRepositories()).thenReturn(Map.of()); + when(repositoriesService.getRepositories()).thenReturn(List.of()); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -58,7 +57,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { when(metadata.generation()).thenReturn(randomNonNegativeLong()); var repo = mock(Repository.class); when(repo.getMetadata()).thenReturn(metadata); - when(repositoriesService.getRepositories()).thenReturn(Map.of(randomAlphaOfLength(10), repo)); + when(repositoriesService.getRepositories()).thenReturn(List.of(repo)); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -68,9 +67,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { public void testGetHealthUnknownType() { var repo = createRepositoryMetadata(); - when(repositoriesService.getRepositories()).thenReturn( - Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(randomProjectIdOrDefault(), repo)) - ); + when(repositoriesService.getRepositories()).thenReturn(List.of(new UnknownTypeRepository(randomProjectIdOrDefault(), repo))); var health = repositoriesHealthTracker.determineCurrentHealth(); @@ -82,7 +79,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase { public void testGetHealthInvalid() { var repo = createRepositoryMetadata(); when(repositoriesService.getRepositories()).thenReturn( - Map.of(repo.name(), new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) + List.of(new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test"))) ); var health = repositoriesHealthTracker.determineCurrentHealth(); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 8c4fcd28c988..96d601f9091f 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -23,12 +23,14 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.project.TestProjectResolvers; +import org.elasticsearch.cluster.routing.GlobalRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; @@ -68,8 +70,16 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BooleanSupplier; +import static org.hamcrest.Matchers.aMapWithSize; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.sameInstance; import static org.mockito.Mockito.mock; public class RepositoriesServiceTests extends ESTestCase { @@ -77,6 +87,7 @@ public class RepositoriesServiceTests extends ESTestCase { private ClusterService clusterService; private RepositoriesService repositoriesService; private ThreadPool threadPool; + private ProjectId projectId; @Override public void setUp() throws Exception { @@ -94,6 +105,13 @@ public class RepositoriesServiceTests extends ESTestCase { Collections.emptySet() ); clusterService = ClusterServiceUtils.createClusterService(threadPool); + projectId = randomProjectIdOrDefault(); + if (ProjectId.DEFAULT.equals(projectId) == false) { + ClusterServiceUtils.setState( + clusterService, + ClusterState.builder(clusterService.state()).putProjectMetadata(ProjectMetadata.builder(projectId)).build() + ); + } DiscoveryNode localNode = DiscoveryNodeUtils.builder("local").name("local").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build(); NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow()); @@ -157,9 +175,9 @@ public class RepositoriesServiceTests extends ESTestCase { public void testRegisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertEquals(repoName, repository.getMetadata().name()); assertEquals(TestRepository.TYPE, repository.getMetadata().type()); assertEquals(Settings.EMPTY, repository.getMetadata().settings()); @@ -168,24 +186,24 @@ public class RepositoriesServiceTests extends ESTestCase { public void testUnregisterInternalRepository() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.unregisterInternalRepository(repoName); - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + repositoriesService.unregisterInternalRepository(projectId, repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); assertTrue(((TestRepository) repository).isClosed); } public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); - Repository repository = repositoriesService.repository(repoName); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + Repository repository = repositoriesService.repository(projectId, repoName); assertFalse(((TestRepository) repository).isClosed); - repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE); + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); assertFalse(((TestRepository) repository).isClosed); - Repository repository2 = repositoriesService.repository(repoName); + Repository repository2 = repositoriesService.repository(projectId, repoName); assertSame(repository, repository2); } @@ -203,11 +221,11 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); // also make sure that cluster state does not include failed repo - assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); }); + assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(projectId, repoName); }); } public void testPutRepositoryVerificationFailsOnExisting() { @@ -216,7 +234,7 @@ public class RepositoriesServiceTests extends ESTestCase { .type(TestRepository.TYPE) .verify(true); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); @@ -225,10 +243,10 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(true); resultListener = new SubscribableListener<>(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var failure = safeAwaitFailure(resultListener); assertThat(failure, isA(RepositoryVerificationException.class)); - var repository = repositoriesService.repository(repoName); + var repository = repositoriesService.repository(projectId, repoName); assertEquals(repository.getMetadata().type(), TestRepository.TYPE); } @@ -238,14 +256,14 @@ public class RepositoriesServiceTests extends ESTestCase { .type(VerificationFailRepository.TYPE) .verify(false); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var ackResponse = safeAwait(resultListener); assertTrue(ackResponse.isAcknowledged()); } public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() { String repoName = "name"; - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)); + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)); ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE); @@ -276,7 +294,7 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, "unknown"); repositoriesService.applyClusterState(new ClusterChangedEvent("starting", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(UnknownTypeRepository.class)); } @@ -288,7 +306,7 @@ public class RepositoriesServiceTests extends ESTestCase { repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -297,7 +315,7 @@ public class RepositoriesServiceTests extends ESTestCase { var repoName = randomAlphaOfLengthBetween(10, 25); var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type("unknown"); - repositoriesService.registerRepository(request, new ActionListener<>() { + repositoriesService.registerRepository(projectId, request, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { fail("Should not register unknown repository type"); @@ -306,7 +324,10 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void onFailure(Exception e) { assertThat(e, isA(RepositoryException.class)); - assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist for project [default]")); + assertThat( + e.getMessage(), + equalTo("[" + repoName + "] repository type [unknown] does not exist for project [" + projectId + "]") + ); } }); } @@ -318,7 +339,7 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); } @@ -329,12 +350,12 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); clusterState = createClusterStateWithRepo(repoName, TestRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put test repository", clusterState, emptyState())); - repo = repositoriesService.repository(repoName); + repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(TestRepository.class)); } @@ -346,7 +367,7 @@ public class RepositoriesServiceTests extends ESTestCase { repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState)); assertThat( - expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(), + expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(), equalTo("[" + repoName + "] missing") ); } @@ -370,17 +391,17 @@ public class RepositoriesServiceTests extends ESTestCase { var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE); repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState())); - var repo = repositoriesService.repository(repoName); + var repo = repositoriesService.repository(projectId, repoName); assertThat(repo, isA(InvalidRepository.class)); // 2. repository creation successfully when current node become master node and repository is put again var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type(TestRepository.TYPE); var resultListener = new SubscribableListener(); - repositoriesService.registerRepository(request, resultListener); + repositoriesService.registerRepository(projectId, request, resultListener); var response = safeAwait(resultListener); assertTrue(response.isAcknowledged()); - assertThat(repositoriesService.repository(repoName), isA(TestRepository.class)); + assertThat(repositoriesService.repository(projectId, repoName), isA(TestRepository.class)); } public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() { @@ -393,19 +414,21 @@ public class RepositoriesServiceTests extends ESTestCase { .newForked( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE), l.map(ignored -> null) ) ) .andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), ActionTestUtils.assertNoSuccessListener(e -> { @@ -425,20 +448,21 @@ public class RepositoriesServiceTests extends ESTestCase { ) ) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(originalGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l)) .andThenAccept(ignored -> { - final var metadata = repositoriesService.repository(repoName).getMetadata(); + final var metadata = repositoriesService.repository(projectId, repoName).getMetadata(); assertEquals(newGeneration, metadata.generation()); assertEquals(newGeneration, metadata.pendingGeneration()); assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)); }) .andThen( l -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE) .settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)), l.map(ignored -> null) @@ -446,7 +470,7 @@ public class RepositoriesServiceTests extends ESTestCase { ) .andThenAccept( ignored -> assertTrue( - repositoriesService.repository(repoName) + repositoriesService.repository(projectId, repoName) .getMetadata() .settings() .getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null) @@ -455,17 +479,120 @@ public class RepositoriesServiceTests extends ESTestCase { ); } + public void testRepositoryUpdatesForMultipleProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + // 1. Initial project + final var repoName = "repo"; + final var state0 = createClusterStateWithRepo(repoName, TestRepository.TYPE); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state0, emptyState())); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(1)); + final var initialProjectRepo = (TestRepository) repositoriesService.getProjectRepositories(projectId).values().iterator().next(); + assertThat(repositoriesService.getRepositories(), contains(initialProjectRepo)); + if (ProjectId.DEFAULT.equals(projectId) == false) { + assertFalse(repositoriesService.hasRepositoryTrackingForProject(ProjectId.DEFAULT)); + } + + // 2. Add a new project + final var anotherProjectId = randomUniqueProjectId(); + final var anotherRepoName = "another-repo"; + final var state1 = ClusterState.builder(state0) + .putProjectMetadata( + ProjectMetadata.builder(anotherProjectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.EMPTY), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state1, state0)); + assertThat(repositoriesService.getProjectRepositories(anotherProjectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(3)); + assertThat(repositoriesService.getRepositories(), hasItem(initialProjectRepo)); + final Collection anotherProjectRepos = repositoriesService.getProjectRepositories(anotherProjectId).values(); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 3. Update existing project + assertFalse(initialProjectRepo.isClosed); + final var state2 = ClusterState.builder(state1) + .putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata( + List.of( + new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.builder().put("foo", "bar").build()), + new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY) + ) + ) + ) + ) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state2, state1)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(2)); + assertThat(repositoriesService.getRepositories(), hasSize(4)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new))); + + // 4. Remove another project + anotherProjectRepos.forEach(repo -> assertFalse(((TestRepository) repo).isClosed)); + final var state3 = ClusterState.builder(state2) + .metadata(Metadata.builder(state2.metadata()).removeProject(anotherProjectId)) + .routingTable(GlobalRoutingTable.builder(state2.globalRoutingTable()).removeProject(anotherProjectId).build()) + .build(); + repositoriesService.applyClusterState(new ClusterChangedEvent("test", state3, state2)); + anotherProjectRepos.forEach(repo -> assertTrue(((TestRepository) repo).isClosed)); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(anotherProjectId)); + assertThat(repositoriesService.getRepositories(), hasSize(2)); + assertThat( + repositoriesService.getRepositories(), + hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new)) + ); + } + + public void testInternalRepositoryForMultiProjects() { + assertThat(repositoriesService.getRepositories(), empty()); + String repoName = "name"; + repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE); + final TestRepository initialProjectRepo = (TestRepository) repositoriesService.repository(projectId, repoName); + + // Repo of the same name but different project is a different repository instance + final var anotherProjectId = randomUniqueProjectId(); + repositoriesService.registerInternalRepository(anotherProjectId, repoName, TestRepository.TYPE); + final TestRepository anotherProjectRepo = (TestRepository) repositoriesService.repository(anotherProjectId, repoName); + assertThat(initialProjectRepo, not(sameInstance(anotherProjectRepo))); + + // Remove the project repository, the repo should be closed and the project is removed from tracking + repositoriesService.unregisterInternalRepository(projectId, repoName); + assertFalse(repositoriesService.hasRepositoryTrackingForProject(projectId)); + assertTrue(initialProjectRepo.isClosed); + assertThat(repositoriesService.repository(anotherProjectId, repoName), sameInstance(anotherProjectRepo)); + assertTrue(anotherProjectRepo.isStarted); + } + private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener listener) { clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - return new ClusterState.Builder(currentState).metadata( - Metadata.builder(currentState.metadata()) - .putCustom( - RepositoriesMetadata.TYPE, - RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) - ) - ).build(); + final ProjectMetadata projectMetadata = currentState.getMetadata().getProject(projectId); + return ClusterState.builder(currentState) + .putProjectMetadata( + ProjectMetadata.builder(projectMetadata) + .putCustom( + RepositoriesMetadata.TYPE, + RepositoriesMetadata.get(projectMetadata) + .withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration) + ) + ) + .build(); } @Override @@ -482,12 +609,13 @@ public class RepositoriesServiceTests extends ESTestCase { private ClusterState createClusterStateWithRepo(String repoName, String repoType) { ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); - Metadata.Builder mdBuilder = Metadata.builder(); - mdBuilder.putCustom( - RepositoriesMetadata.TYPE, - new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + state.putProjectMetadata( + ProjectMetadata.builder(projectId) + .putCustom( + RepositoriesMetadata.TYPE, + new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY))) + ) ); - state.metadata(mdBuilder); return state.build(); } @@ -500,6 +628,7 @@ public class RepositoriesServiceTests extends ESTestCase { expectThrows( RepositoryException.class, () -> repositoriesService.registerRepository( + projectId, new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName), null ) @@ -625,7 +754,7 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void updateState(final ClusterState state) { - metadata = RepositoriesMetadata.get(state).repository(metadata.name()); + metadata = RepositoriesMetadata.get(state.metadata().getProject(getProjectId())).repository(metadata.name()); } @Override diff --git a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java index 69521529cd03..4df62531230b 100644 --- a/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/ResolvedRepositoriesTests.java @@ -10,6 +10,8 @@ package org.elasticsearch.repositories; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; @@ -23,25 +25,36 @@ import java.util.List; public class ResolvedRepositoriesTests extends ESTestCase { + private ProjectId projectId; + + @Override + public void setUp() throws Exception { + super.setUp(); + projectId = randomProjectIdOrDefault(); + } + public void testAll() { runMatchAllTest(); runMatchAllTest("*"); runMatchAllTest("_all"); } - private static void runMatchAllTest(String... patterns) { + private void runMatchAllTest(String... patterns) { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var result = getRepositories(state, patterns); - assertEquals(RepositoriesMetadata.get(state).repositories(), result.repositoryMetadata()); + assertEquals(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories(), result.repositoryMetadata()); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } public void testMatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); - final var name = randomFrom(RepositoriesMetadata.get(state).repositories()).name(); + final var name = randomFrom(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories()).name(); final var result = getRepositories(state, name); - assertEquals(List.of(RepositoriesMetadata.get(state).repository(name)), result.repositoryMetadata()); + assertEquals( + List.of(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repository(name)), + result.repositoryMetadata() + ); assertThat(result.missing(), Matchers.empty()); assertFalse(result.hasMissingRepositories()); } @@ -49,7 +62,7 @@ public class ResolvedRepositoriesTests extends ESTestCase { public void testMismatchingName() { final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new)); final var notAName = randomValueOtherThanMany( - n -> RepositoriesMetadata.get(state).repositories().stream().anyMatch(m -> n.equals(m.name())), + n -> RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories().stream().anyMatch(m -> n.equals(m.name())), ESTestCase::randomIdentifier ); final var result = getRepositories(state, notAName); @@ -70,25 +83,29 @@ public class ResolvedRepositoriesTests extends ESTestCase { runWildcardTest(state, List.of("other-repo", "test-match-1", "test-match-2"), "other-repo", "test-*", "-*-exclude"); } - private static void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { + private void runWildcardTest(ClusterState clusterState, List expectedNames, String... patterns) { final var result = getRepositories(clusterState, patterns); final var description = Strings.format("%s should yield %s", Arrays.toString(patterns), expectedNames); assertFalse(description, result.hasMissingRepositories()); assertEquals(description, expectedNames, result.repositoryMetadata().stream().map(RepositoryMetadata::name).toList()); } - private static ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { - return ResolvedRepositories.resolve(clusterState, patterns); + private ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) { + return ResolvedRepositories.resolve(clusterState.metadata().getProject(projectId), patterns); } - private static ClusterState clusterStateWithRepositories(String... repoNames) { + private ClusterState clusterStateWithRepositories(String... repoNames) { final var repositories = new ArrayList(repoNames.length); for (final var repoName : repoNames) { repositories.add(new RepositoryMetadata(repoName, "test", Settings.EMPTY)); } - return ClusterState.EMPTY_STATE.copyAndUpdateMetadata( - b -> b.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories)) - ); + return ClusterState.EMPTY_STATE.copyAndUpdateMetadata(b -> { + ProjectMetadata.Builder projectBuilder = b.getProject(projectId); + if (projectBuilder == null) { + projectBuilder = ProjectMetadata.builder(projectId); + } + b.put(projectBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories))); + }); } } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 044b054ad873..6c2083196973 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -12,6 +12,7 @@ package org.elasticsearch.repositories.blobstore; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.util.TestUtil; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; @@ -21,6 +22,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -219,11 +221,13 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { /** Create a {@link Repository} with a random name **/ private Repository createRepository() { + @FixForMultiProject(description = "randomize when snapshot and restore support multiple projects, see also ES-10225, ES-10228") + final ProjectId projectId = ProjectId.DEFAULT; Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 076be2cd27e8..eb0b54571298 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -520,11 +521,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } public void testEnsureUploadListenerIsResolvedWhenAFileSnapshotTaskFails() throws Exception { + final ProjectId projectId = randomProjectIdOrDefault(); Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final FsRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, createEnvironment(), xContentRegistry(), diff --git a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java index 259fc6fea693..e45dc4e3d8cd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RestoreServiceTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; @@ -259,7 +260,7 @@ public class RestoreServiceTests extends ESTestCase { } final RepositoriesService repositoriesService = mock(RepositoriesService.class); - when(repositoriesService.getRepositories()).thenReturn(repositories); + when(repositoriesService.getProjectRepositories(eq(ProjectId.DEFAULT))).thenReturn(repositories); final AtomicBoolean completed = new AtomicBoolean(); RestoreService.refreshRepositoryUuids( true, diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 7477e648e054..c005691b212b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -2522,7 +2522,8 @@ public class SnapshotResiliencyTests extends ESTestCase { actionFilters, threadPool, clusterService, - repositoriesService + repositoriesService, + TestProjectResolvers.DEFAULT_PROJECT_ONLY ) ); actions.put( @@ -2769,7 +2770,14 @@ public class SnapshotResiliencyTests extends ESTestCase { ); actions.put( TransportPutRepositoryAction.TYPE, - new TransportPutRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters) + new TransportPutRepositoryAction( + transportService, + clusterService, + repositoriesService, + threadPool, + actionFilters, + TestProjectResolvers.DEFAULT_PROJECT_ONLY + ) ); actions.put( TransportCleanupRepositoryAction.TYPE, diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index e9a50c12eae0..0a16a7f23e38 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -398,14 +398,14 @@ public final class BlobStoreTestUtil { * @param repositoryMetadata RepositoryMetadata to initialize the cluster state with * @return Mock ClusterService */ - public static ClusterService mockClusterService(RepositoryMetadata repositoryMetadata) { + public static ClusterService mockClusterService(ProjectId projectId, RepositoryMetadata repositoryMetadata) { return mockClusterService( ClusterState.builder(ClusterState.EMPTY_STATE) .metadata( - Metadata.builder() + Metadata.builder(ClusterState.EMPTY_STATE.metadata()) .clusterUUID(UUIDs.randomBase64UUID(random())) .put( - ProjectMetadata.builder(ProjectId.DEFAULT) + ProjectMetadata.builder(projectId) .putCustom( RepositoriesMetadata.TYPE, new RepositoriesMetadata(Collections.singletonList(repositoryMetadata)) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java index 61922ee0b887..4e101e6b0d24 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/DeleteInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public class DeleteInternalCcrRepositoryAction extends ActionType listener) { - repositoriesService.unregisterInternalRepository(request.getName()); + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") + final var projectId = ProjectId.DEFAULT; + repositoriesService.unregisterInternalRepository(projectId, request.getName()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java index ee44b4303760..a96771d29b14 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutInternalCcrRepositoryAction.java @@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.tasks.Task; @@ -45,7 +47,9 @@ public class PutInternalCcrRepositoryAction extends ActionType listener) { - repositoriesService.registerInternalRepository(request.getName(), request.getType()); + @FixForMultiProject(description = "resolve the actual projectId, ES-12139") + final var projectId = ProjectId.DEFAULT; + repositoriesService.registerInternalRepository(projectId, request.getName(), request.getType()); listener.onResponse(ActionResponse.Empty.INSTANCE); } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java index 207ac4f38aed..fbb7ddfb6024 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java @@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.TestEnvironment; @@ -95,6 +96,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder; import static org.hamcrest.Matchers.equalTo; +@FixForMultiProject(description = "Randomizing projectId once snapshot and restore support multiple projects, ES-10225, ES-10228") public class SourceOnlySnapshotShardTests extends IndexShardTestCase { public void testSourceIncomplete() throws IOException { @@ -123,7 +125,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -174,7 +176,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { recoverShardFromStore(shard); SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -215,7 +217,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -344,7 +346,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { } SnapshotId snapshotId = new SnapshotId("test", "test"); IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID()); - final var projectId = randomProjectIdOrDefault(); + final var projectId = ProjectId.DEFAULT; SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId)); assertThat(repository.getProjectId(), equalTo(projectId)); repository.start(); @@ -571,7 +573,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { private Repository createRepository(ProjectId projectId) { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings); - final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata); final Repository repository = new FsRepository( projectId, repositoryMetadata, diff --git a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java index 63522ce2309a..9994b90c59ba 100644 --- a/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java +++ b/x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/store/RepositorySupplier.java @@ -9,6 +9,8 @@ package org.elasticsearch.xpack.searchablesnapshots.store; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -46,14 +48,16 @@ public class RepositorySupplier implements Supplier { } private Repository getRepository() { + @FixForMultiProject(description = "resolve the actual projectId, ES-12138") + final var projectId = ProjectId.DEFAULT; if (repositoryUuid == null) { // repository containing pre-7.12 snapshots has no UUID so we assume it matches by name - final Repository repository = repositoriesService.repository(repositoryName); + final Repository repository = repositoriesService.repository(projectId, repositoryName); assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName; return repository; } - final Map repositoriesByName = repositoriesService.getRepositories(); + final Map repositoriesByName = repositoriesService.getProjectRepositories(projectId); final String currentRepositoryNameHint = repositoryNameHint; final Repository repositoryByLastKnownName = repositoriesByName.get(currentRepositoryNameHint); diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java index c090c715cafd..35d71ba23e28 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/store/SearchableSnapshotDirectoryTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.blobcache.shared.SharedBlobCacheService; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.CheckedBiConsumer; import org.elasticsearch.common.UUIDs; @@ -592,8 +593,9 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot repositorySettings.build() ); + final ProjectId projectId = randomProjectIdOrDefault(); final BlobStoreRepository repository = new FsRepository( - randomProjectIdOrDefault(), + projectId, repositoryMetadata, new Environment( Settings.builder() @@ -604,7 +606,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot null ), NamedXContentRegistry.EMPTY, - BlobStoreTestUtil.mockClusterService(repositoryMetadata), + BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata), MockBigArrays.NON_RECYCLING_INSTANCE, new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)) ); diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 39115556d290..f064c97ff296 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -47,8 +47,10 @@ tasks.named("yamlRestTest").configure { ArrayList blacklist = [ /* These tests don't work on multi-project yet - we need to go through each of them and make them work */ '^cat.recovery/*/*', + '^cat.repositories/*/*', '^cat.snapshots/*/*', '^cluster.desired_balance/10_basic/*', + '^cluster.stats/10_basic/snapshot stats reported in get cluster stats', '^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API '^health/10_basic/*', '^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security @@ -57,11 +59,12 @@ tasks.named("yamlRestTest").configure { '^indices.resolve_cluster/*/*/*', '^indices.shard_stores/*/*', '^migration/*/*', + '^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)', '^snapshot.clone/*/*', '^snapshot.create/*/*', '^snapshot.delete/*/*', '^snapshot.get/*/*', - '^snapshot.get_repository/20_repository_uuid/*', + '^snapshot.get_repository/*/*', '^snapshot.restore/*/*', '^snapshot.status/*/*', '^synonyms/*/*',