Make RepositoriesService project-aware (#129821)

This PR makes RepositoriesService project aware so that the basic Put,
Get, Delete and Verify repository actions are now project scoped. 

It intentionally leaves the following aspects out of scope for the
current changes: * Repository stats reporting * Repository clean-up,
analysis and integrity verification * Repository usages for searchable
snapshots and CCR

They will be worked on separately. One main reason for leaving them out
is that they are not needed by OBS which is currently blocked by
repository/snapshot changes. They may also have their own complexities,
e.g. stats reporting.

Resolves: ES-10478
This commit is contained in:
Yang Wang 2025-06-25 10:34:34 +10:00 committed by GitHub
parent 4fe787a1df
commit e1c930f8c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
34 changed files with 725 additions and 327 deletions

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
@ -65,7 +66,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase {
assertThat(originalRepositoryMetadata.type(), equalTo(FsRepository.TYPE));
final Repository originalRepository = repositoriesService.repository(repositoryName);
final Repository originalRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName);
assertThat(originalRepository, instanceOf(FsRepository.class));
final boolean updated = randomBoolean();
@ -89,7 +90,7 @@ public class RepositoriesServiceIT extends ESIntegTestCase {
assertThat(updatedRepositoryMetadata.type(), equalTo(updatedRepositoryType));
final Repository updatedRepository = repositoriesService.repository(repositoryName);
final Repository updatedRepository = repositoriesService.repository(ProjectId.DEFAULT, repositoryName);
assertThat(updatedRepository, updated ? not(sameInstance(originalRepository)) : sameInstance(originalRepository));
// check that a noop update does not verify. Since the new data node does not share the same `path.repo`, verification will fail if

View file

@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -36,6 +37,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/repository/delete");
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
@Inject
public TransportDeleteRepositoryAction(
@ -43,7 +45,8 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster
ClusterService clusterService,
RepositoriesService repositoriesService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -55,11 +58,12 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(DeleteRepositoryRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE);
}
@Override
@ -69,7 +73,7 @@ public class TransportDeleteRepositoryAction extends AcknowledgedTransportMaster
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
repositoriesService.unregisterRepository(request, listener);
repositoriesService.unregisterRepository(projectResolver.getProjectId(), request, listener);
}
@Override

View file

@ -11,11 +11,12 @@ package org.elasticsearch.action.admin.cluster.repositories.get;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -28,14 +29,15 @@ import org.elasticsearch.transport.TransportService;
/**
* Transport action for get repositories operation
*/
public class TransportGetRepositoriesAction extends TransportMasterNodeReadAction<GetRepositoriesRequest, GetRepositoriesResponse> {
public class TransportGetRepositoriesAction extends TransportMasterNodeReadProjectAction<GetRepositoriesRequest, GetRepositoriesResponse> {
@Inject
public TransportGetRepositoriesAction(
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
GetRepositoriesAction.NAME,
@ -44,24 +46,25 @@ public class TransportGetRepositoriesAction extends TransportMasterNodeReadActio
threadPool,
actionFilters,
GetRepositoriesRequest::new,
projectResolver,
GetRepositoriesResponse::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
@Override
protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
protected ClusterBlockException checkBlock(GetRepositoriesRequest request, ProjectState state) {
return state.blocks().globalBlockedException(state.projectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
protected void masterOperation(
Task task,
final GetRepositoriesRequest request,
ClusterState state,
ProjectState state,
final ActionListener<GetRepositoriesResponse> listener
) {
final var result = ResolvedRepositories.resolve(state, request.repositories());
final var result = ResolvedRepositories.resolve(state.metadata(), request.repositories());
if (result.hasMissingRepositories()) {
listener.onFailure(new RepositoryMissingException(String.join(", ", result.missing())));
} else {

View file

@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAc
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -36,6 +37,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod
public static final ActionType<AcknowledgedResponse> TYPE = new ActionType<>("cluster:admin/repository/put");
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
@Inject
public TransportPutRepositoryAction(
@ -43,7 +45,8 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod
ClusterService clusterService,
RepositoriesService repositoriesService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
TYPE.name(),
@ -55,11 +58,12 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(PutRepositoryRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_WRITE);
}
@Override
@ -69,7 +73,7 @@ public class TransportPutRepositoryAction extends AcknowledgedTransportMasterNod
ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
repositoriesService.registerRepository(request, listener);
repositoriesService.registerRepository(projectResolver.getProjectId(), request, listener);
}
@Override

View file

@ -11,6 +11,8 @@ package org.elasticsearch.action.admin.cluster.repositories.reservedstate;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
import org.elasticsearch.reservedstate.TransformState;
@ -60,7 +62,9 @@ public class ReservedRepositoryAction implements ReservedClusterStateHandler<Lis
for (var repositoryRequest : repositories) {
validate(repositoryRequest);
RepositoriesService.validateRepositoryName(repositoryRequest.name());
repositoriesService.validateRepositoryCanBeCreated(repositoryRequest);
@FixForMultiProject(description = "resolve the actual projectId, ES-10479")
final var projectId = ProjectId.DEFAULT;
repositoriesService.validateRepositoryCanBeCreated(projectId, repositoryRequest);
}
return repositories;
@ -72,8 +76,14 @@ public class ReservedRepositoryAction implements ReservedClusterStateHandler<Lis
ClusterState state = prevState.state();
@FixForMultiProject(description = "resolve the actual projectId, ES-10479")
final var projectId = ProjectId.DEFAULT;
for (var request : requests) {
RepositoriesService.RegisterRepositoryTask task = new RepositoriesService.RegisterRepositoryTask(repositoriesService, request);
RepositoriesService.RegisterRepositoryTask task = new RepositoriesService.RegisterRepositoryTask(
repositoriesService,
projectId,
request
);
state = task.execute(state);
}
@ -83,7 +93,11 @@ public class ReservedRepositoryAction implements ReservedClusterStateHandler<Lis
toDelete.removeAll(entities);
for (var repositoryToDelete : toDelete) {
var task = new RepositoriesService.UnregisterRepositoryTask(RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT, repositoryToDelete);
var task = new RepositoriesService.UnregisterRepositoryTask(
RESERVED_CLUSTER_STATE_HANDLER_IGNORED_TIMEOUT,
projectId,
repositoryToDelete
);
state = task.execute(state);
}

View file

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.injection.guice.Inject;
@ -30,6 +31,7 @@ import org.elasticsearch.transport.TransportService;
public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<VerifyRepositoryRequest, VerifyRepositoryResponse> {
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
@Inject
public TransportVerifyRepositoryAction(
@ -37,7 +39,8 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
ClusterService clusterService,
RepositoriesService repositoriesService,
ThreadPool threadPool,
ActionFilters actionFilters
ActionFilters actionFilters,
ProjectResolver projectResolver
) {
super(
VerifyRepositoryAction.NAME,
@ -50,11 +53,12 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
}
@Override
protected ClusterBlockException checkBlock(VerifyRepositoryRequest request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
return state.blocks().globalBlockedException(projectResolver.getProjectId(), ClusterBlockLevel.METADATA_READ);
}
@Override
@ -65,6 +69,7 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
final ActionListener<VerifyRepositoryResponse> listener
) {
repositoriesService.verifyRepository(
projectResolver.getProjectId(),
request.name(),
listener.map(verifyResponse -> new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))
);

View file

@ -144,7 +144,7 @@ public class TransportGetSnapshotsAction extends TransportMasterNodeAction<GetSn
) {
assert task instanceof CancellableTask : task + " not cancellable";
final var resolvedRepositories = ResolvedRepositories.resolve(state, request.repositories());
final var resolvedRepositories = ResolvedRepositories.resolve(state.metadata().getProject(), request.repositories());
if (resolvedRepositories.hasMissingRepositories()) {
throw new RepositoryMissingException(String.join(", ", resolvedRepositories.missing()));
}

View file

@ -24,11 +24,13 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.injection.guice.Inject;
@ -252,7 +254,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
// BWC behavior, load the stats directly from the repository.
shardStatus = new SnapshotIndexShardStatus(
shardId,
repositoriesService.repository(entry.repository())
repositoriesService.repository(entry.projectId(), entry.repository())
.getShardSnapshotStatus(
entry.snapshot().getSnapshotId(),
entry.indices().get(shardId.getIndexName()),
@ -297,7 +299,9 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
final ListenableFuture<RepositoryData> repositoryDataListener = new ListenableFuture<>();
repositoriesService.getRepositoryData(repositoryName, repositoryDataListener);
@FixForMultiProject(description = "resolve the actual projectId, ES-10166")
final var projectId = ProjectId.DEFAULT;
repositoriesService.getRepositoryData(projectId, repositoryName, repositoryDataListener);
final Collection<SnapshotId> snapshotIdsToLoad = new ArrayList<>();
repositoryDataListener.addListener(listener.delegateFailureAndWrap((delegate, repositoryData) -> {
task.ensureNotCancelled();

View file

@ -42,7 +42,7 @@ public class RepositoriesHealthTracker extends HealthTracker<RepositoriesHealthI
var unknown = new ArrayList<String>();
var invalid = new ArrayList<String>();
repositories.values().forEach(repository -> {
repositories.forEach(repository -> {
if (repository instanceof UnknownTypeRepository) {
unknown.add(repository.getMetadata().name());
} else if (repository instanceof InvalidRepository) {

View file

@ -39,6 +39,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
@ -154,6 +155,7 @@ import org.elasticsearch.repositories.Repository;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.internal.FieldUsageTrackingDirectoryReader;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transports;
@ -3527,12 +3529,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
case SNAPSHOT -> {
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
final Snapshot snapshot = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot();
final ProjectId projectId = snapshot.getProjectId();
final String repo = snapshot.getRepository();
executeRecovery(
"from snapshot",
recoveryState,
recoveryListener,
l -> restoreFromRepository(repositoriesService.repository(repo), l)
l -> restoreFromRepository(repositoriesService.repository(projectId, repo), l)
);
}
case LOCAL_SHARDS -> {

View file

@ -13,8 +13,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
@ -28,7 +30,6 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class IndexSnapshotsService {
@ -136,8 +137,9 @@ public class IndexSnapshotsService {
}
private Repository getRepository(String repositoryName) {
final Map<String, Repository> repositories = repositoriesService.getRepositories();
return repositories.get(repositoryName);
@FixForMultiProject(description = "resolve the actual projectId, ES-12176")
final var projectId = ProjectId.DEFAULT;
return repositoriesService.repositoryOrNull(projectId, repositoryName);
}
private static class FetchShardSnapshotContext {

View file

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
@ -44,6 +45,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
@ -66,6 +69,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@ -73,6 +77,7 @@ import java.util.stream.Stream;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.core.Strings.format;
import static org.elasticsearch.repositories.RepositoryOperation.projectRepoString;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_NAME_SETTING_KEY;
import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOTS_REPOSITORY_UUID_SETTING_KEY;
@ -84,8 +89,9 @@ import static org.elasticsearch.snapshots.SearchableSnapshotsSettings.SEARCHABLE
* factory information to create new repositories, and provides access to and maintains the lifecycle of repositories. New nodes can easily
* find all the repositories via the cluster state after joining a cluster.
*
* {@link #repository(String)} can be used to fetch a repository. {@link #createRepository(RepositoryMetadata)} does the heavy lifting of
* creation. {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates.
* {@link #repository(ProjectId, String)} can be used to fetch a repository.
* {@link #createRepository(ProjectId, RepositoryMetadata)} does the heavy lifting of creation.
* {@link #applyClusterState(ClusterChangedEvent)} handles adding and removing repositories per cluster state updates.
*/
public class RepositoriesService extends AbstractLifecycleComponent implements ClusterStateApplier {
@ -112,8 +118,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
private final ThreadPool threadPool;
private final NodeClient client;
private final Map<String, Repository> internalRepositories = ConcurrentCollections.newConcurrentMap();
private volatile Map<String, Repository> repositories = Collections.emptyMap();
private final Map<ProjectId, Map<String, Repository>> internalRepositories = ConcurrentCollections.newConcurrentMap();
private final Map<ProjectId, Map<String, Repository>> repositories = ConcurrentCollections.newConcurrentMap();
private final RepositoriesStatsArchive repositoriesStatsArchive;
private final List<BiConsumer<Snapshot, IndexVersion>> preRestoreChecks;
@ -154,10 +160,15 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* This method can be only called on the master node.
* It tries to create a new repository on the master, and if it was successful, it adds a new repository to cluster metadata.
*
* @param projectId the project ID to which the repository belongs.
* @param request register repository request
* @param responseListener register repository listener
*/
public void registerRepository(final PutRepositoryRequest request, final ActionListener<AcknowledgedResponse> responseListener) {
public void registerRepository(
final ProjectId projectId,
final PutRepositoryRequest request,
final ActionListener<AcknowledgedResponse> responseListener
) {
assert lifecycle.started() : "Trying to register new repository but service is in state [" + lifecycle.state() + "]";
validateRepositoryName(request.name());
@ -167,7 +178,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
SubscribableListener
// Trying to create the new repository on master to make sure it works
.<Void>newForked(validationStep -> validatePutRepositoryRequest(request, validationStep))
.<Void>newForked(validationStep -> validatePutRepositoryRequest(projectId, request, validationStep))
// When publication has completed (and all acks received or timed out) then verify the repository.
// (if acks timed out then acknowledgementStep completes before the master processes this cluster state, hence why we have
@ -176,11 +187,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
final ListenableFuture<AcknowledgedResponse> acknowledgementStep = new ListenableFuture<>();
final ListenableFuture<Boolean> publicationStep = new ListenableFuture<>(); // Boolean==changed.
submitUnbatchedTask(
"put_repository [" + request.name() + "]",
new RegisterRepositoryTask(this, request, acknowledgementStep) {
"put_repository " + projectRepoString(projectId, request.name()),
new RegisterRepositoryTask(this, projectId, request, acknowledgementStep) {
@Override
public void onFailure(Exception e) {
logger.warn(() -> "failed to create repository [" + request.name() + "]", e);
logger.warn(() -> "failed to create repository " + projectRepoString(projectId, request.name()), e);
publicationStep.onFailure(e);
super.onFailure(e);
}
@ -195,9 +206,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
if (changed) {
if (found) {
logger.info("updated repository [{}]", request.name());
logger.info("updated repository {}", projectRepoString(projectId, request.name()));
} else {
logger.info("put repository [{}]", request.name());
logger.info("put repository {}", projectRepoString(projectId, request.name()));
}
}
publicationStep.onResponse(oldState != newState);
@ -220,7 +231,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
.<List<DiscoveryNode>>newForked(verifyRepositoryStep -> {
if (taskResult.ackResponse.isAcknowledged() && taskResult.changed) {
verifyRepository(request.name(), verifyRepositoryStep);
final ThreadContext threadContext = threadPool.getThreadContext();
verifyRepository(projectId, request.name(), verifyRepositoryStep);
} else {
verifyRepositoryStep.onResponse(null);
}
@ -231,7 +243,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
.execute(
ActionRunnable.wrap(
getRepositoryDataStep,
ll -> repository(request.name()).getRepositoryData(
ll -> repository(projectId, request.name()).getRepositoryData(
// TODO contemplate threading, do we need to fork, see #101445?
EsExecutors.DIRECT_EXECUTOR_SERVICE,
ll
@ -243,6 +255,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
.<Void>andThen(
(updateRepoUuidStep, repositoryData) -> updateRepositoryUuidInMetadata(
clusterService,
projectId,
request.name(),
repositoryData,
updateRepoUuidStep
@ -263,16 +276,19 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
protected boolean found = false;
protected boolean changed = false;
private final ProjectId projectId;
private final PutRepositoryRequest request;
private final RepositoriesService repositoriesService;
RegisterRepositoryTask(
final RepositoriesService repositoriesService,
final ProjectId projectId,
final PutRepositoryRequest request,
final ListenableFuture<AcknowledgedResponse> acknowledgementStep
) {
super(request, acknowledgementStep);
this.repositoriesService = repositoriesService;
this.projectId = projectId;
this.request = request;
}
@ -281,14 +297,18 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* @param repositoriesService
* @param request
*/
public RegisterRepositoryTask(final RepositoriesService repositoriesService, final PutRepositoryRequest request) {
this(repositoriesService, request, null);
public RegisterRepositoryTask(
final RepositoriesService repositoriesService,
final ProjectId projectId,
final PutRepositoryRequest request
) {
this(repositoriesService, projectId, request, null);
}
@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getDefaultProject();
RepositoriesMetadata repositories = RepositoriesMetadata.get(project);
final var projectState = currentState.projectState(projectId);
RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata());
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size() + 1);
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (repositoryMetadata.name().equals(request.name())) {
@ -304,10 +324,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
request.type(),
request.settings()
);
Repository existing = repositoriesService.repositories.get(request.name());
if (existing == null) {
existing = repositoriesService.internalRepositories.get(request.name());
}
Repository existing = repositoriesService.repositoryOrNull(projectId, request.name());
assert existing != null : "repository [" + newRepositoryMetadata.name() + "] must exist";
assert existing.getMetadata() == repositoryMetadata;
final RepositoryMetadata updatedMetadata;
@ -316,7 +333,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
if (repositoryMetadata.generation() == RepositoryData.CORRUPTED_REPO_GEN) {
// If recreating a corrupted repository with the same settings, reset the corrupt flag.
// Setting the safe generation to unknown, so that a consistent generation is found.
ensureRepositoryNotInUse(currentState, request.name());
ensureRepositoryNotInUse(projectState, request.name());
logger.info(
"repository [{}/{}] is marked as corrupted, resetting the corruption marker",
repositoryMetadata.name(),
@ -334,7 +351,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
// we're updating in place so the updated metadata must point at the same uuid and generations
updatedMetadata = repositoryMetadata.withSettings(newRepositoryMetadata.settings());
} else {
ensureRepositoryNotInUse(currentState, request.name());
ensureRepositoryNotInUse(projectState, request.name());
updatedMetadata = newRepositoryMetadata;
}
found = true;
@ -349,7 +366,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
repositories = new RepositoriesMetadata(repositoriesMetadata);
changed = true;
return ClusterState.builder(currentState)
.putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories))
.putProjectMetadata(ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories))
.build();
}
}
@ -361,17 +378,21 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
*
* @param request
*/
public void validateRepositoryCanBeCreated(final PutRepositoryRequest request) {
public void validateRepositoryCanBeCreated(final ProjectId projectId, final PutRepositoryRequest request) {
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
// Trying to create the new repository on master to make sure it works
closeRepository(createRepository(newRepositoryMetadata));
closeRepository(createRepository(projectId, newRepositoryMetadata));
}
private void validatePutRepositoryRequest(final PutRepositoryRequest request, ActionListener<Void> resultListener) {
private void validatePutRepositoryRequest(
final ProjectId projectId,
final PutRepositoryRequest request,
ActionListener<Void> resultListener
) {
final RepositoryMetadata newRepositoryMetadata = new RepositoryMetadata(request.name(), request.type(), request.settings());
try {
final var repository = createRepository(newRepositoryMetadata);
final var repository = createRepository(projectId, newRepositoryMetadata);
if (request.verify()) {
// verify repository on local node only, different from verifyRepository method that runs on other cluster nodes
threadPool.executor(ThreadPool.Names.SNAPSHOT)
@ -413,6 +434,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
*/
public static void updateRepositoryUuidInMetadata(
ClusterService clusterService,
final ProjectId projectId,
final String repositoryName,
RepositoryData repositoryData,
ActionListener<Void> listener
@ -424,7 +446,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
return;
}
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state()).repository(repositoryName);
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(clusterService.state().metadata().getProject(projectId))
.repository(repositoryName);
if (repositoryMetadata == null || repositoryMetadata.uuid().equals(repositoryUuid)) {
listener.onResponse(null);
return;
@ -441,11 +464,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
submitUnbatchedTask(
clusterService,
"update repository UUID [" + repositoryName + "] to [" + repositoryUuid + "]",
"update repository UUID " + projectRepoString(projectId, repositoryName) + " to [" + repositoryUuid + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getDefaultProject();
final var project = currentState.metadata().getProject(projectId);
final RepositoriesMetadata currentReposMetadata = RepositoriesMetadata.get(project);
final RepositoryMetadata repositoryMetadata = currentReposMetadata.repository(repositoryName);
@ -477,15 +500,22 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* <p>
* This method can be only called on the master node. It removes repository information from cluster metadata.
*
* @param projectId project to look for the repository
* @param request unregister repository request
* @param listener unregister repository listener
*/
public void unregisterRepository(final DeleteRepositoryRequest request, final ActionListener<AcknowledgedResponse> listener) {
submitUnbatchedTask("delete_repository [" + request.name() + "]", new UnregisterRepositoryTask(request, listener) {
public void unregisterRepository(
final ProjectId projectId,
final DeleteRepositoryRequest request,
final ActionListener<AcknowledgedResponse> listener
) {
submitUnbatchedTask(
"delete_repository " + projectRepoString(projectId, request.name()),
new UnregisterRepositoryTask(projectId, request, listener) {
@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
if (deletedRepositories.isEmpty() == false) {
logger.info("deleted repositories [{}]", deletedRepositories);
logger.info("deleted repositories [{}] for project [{}]", deletedRepositories, projectId);
}
}
@ -494,7 +524,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
// repository was created on both master and data nodes
return discoveryNode.isMasterNode() || discoveryNode.canContainData();
}
});
}
);
}
/**
@ -503,10 +534,16 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
*/
public static class UnregisterRepositoryTask extends AckedClusterStateUpdateTask {
protected final List<String> deletedRepositories = new ArrayList<>();
private final ProjectId projectId;
private final DeleteRepositoryRequest request;
UnregisterRepositoryTask(final DeleteRepositoryRequest request, final ActionListener<AcknowledgedResponse> listener) {
UnregisterRepositoryTask(
final ProjectId projectId,
final DeleteRepositoryRequest request,
final ActionListener<AcknowledgedResponse> listener
) {
super(request, listener);
this.projectId = projectId;
this.request = request;
}
@ -514,20 +551,20 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* Constructor used by {@link org.elasticsearch.action.admin.cluster.repositories.reservedstate.ReservedRepositoryAction}
* @param name the repository name
*/
public UnregisterRepositoryTask(TimeValue dummyTimeout, String name) {
this(new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null);
public UnregisterRepositoryTask(TimeValue dummyTimeout, ProjectId projectId, String name) {
this(projectId, new DeleteRepositoryRequest(dummyTimeout, dummyTimeout, name), null);
}
@Override
public ClusterState execute(ClusterState currentState) {
final var project = currentState.metadata().getDefaultProject();
RepositoriesMetadata repositories = RepositoriesMetadata.get(project);
final var projectState = currentState.projectState(projectId);
RepositoriesMetadata repositories = RepositoriesMetadata.get(projectState.metadata());
if (repositories.repositories().size() > 0) {
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>(repositories.repositories().size());
boolean changed = false;
for (RepositoryMetadata repositoryMetadata : repositories.repositories()) {
if (Regex.simpleMatch(request.name(), repositoryMetadata.name())) {
ensureRepositoryNotInUse(currentState, repositoryMetadata.name());
ensureRepositoryNotInUse(projectState, repositoryMetadata.name());
ensureNoSearchableSnapshotsIndicesInUse(currentState, repositoryMetadata);
deletedRepositories.add(repositoryMetadata.name());
changed = true;
@ -538,7 +575,9 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
if (changed) {
repositories = new RepositoriesMetadata(repositoriesMetadata);
return ClusterState.builder(currentState)
.putProjectMetadata(ProjectMetadata.builder(project).putCustom(RepositoriesMetadata.TYPE, repositories))
.putProjectMetadata(
ProjectMetadata.builder(projectState.metadata()).putCustom(RepositoriesMetadata.TYPE, repositories)
)
.build();
}
}
@ -549,8 +588,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
public void verifyRepository(final String repositoryName, final ActionListener<List<DiscoveryNode>> listener) {
final Repository repository = repository(repositoryName);
public void verifyRepository(
final ProjectId projectId,
final String repositoryName,
final ActionListener<List<DiscoveryNode>> listener
) {
final Repository repository = repository(projectId, repositoryName);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
@ -566,7 +609,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", e);
logger.warn(
() -> projectRepoString(projectId, repositoryName)
+ " failed to finish repository verification",
e
);
delegatedListener.onFailure(e);
return;
}
@ -580,7 +627,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> "[" + repositoryName + "] failed to finish repository verification", inner);
logger.warn(
() -> projectRepoString(projectId, repositoryName) + " failed to finish repository verification",
inner
);
}
listener.onFailure(e);
});
@ -608,32 +658,79 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
public void applyClusterState(ClusterChangedEvent event) {
try {
final ClusterState state = event.state();
assert assertReadonlyRepositoriesNotInUseForWrites(state);
final RepositoriesMetadata oldMetadata = RepositoriesMetadata.get(event.previousState());
final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state);
final ClusterState previousState = event.previousState();
for (var projectId : event.projectDelta().removed()) { // removed projects
applyProjectStateForRemovedProject(state.version(), previousState.projectState(projectId));
}
for (var projectId : event.projectDelta().added()) { // added projects
applyProjectStateForAddedOrExistingProject(state.version(), state.projectState(projectId), null);
}
// existing projects
final var common = event.projectDelta().added().isEmpty()
? state.metadata().projects().keySet()
: Sets.difference(state.metadata().projects().keySet(), event.projectDelta().added());
for (var projectId : common) {
applyProjectStateForAddedOrExistingProject(
state.version(),
state.projectState(projectId),
previousState.projectState(projectId)
);
}
} catch (Exception ex) {
assert false : new AssertionError(ex);
logger.warn("failure updating cluster state ", ex);
}
}
/**
* Apply changes for one removed project.
*
* @param version The cluster state version of the change.
* @param previousState The previous project state for the removed project.
*/
private void applyProjectStateForRemovedProject(long version, ProjectState previousState) {
final var projectId = previousState.projectId();
assert ProjectId.DEFAULT.equals(projectId) == false : "default project cannot be removed";
final var survivors = closeRemovedRepositories(version, projectId, getProjectRepositories(projectId), RepositoriesMetadata.EMPTY);
assert survivors.isEmpty() : "expect no repositories for removed project [" + projectId + "], but got " + survivors.keySet();
repositories.remove(projectId);
}
/**
* Apply changes for one project. The project can be either newly added or an existing one.
*
* @param version The cluster state version of the change.
* @param state The current project state
* @param previousState The previous project state, or {@code null} if the project was newly added.
*/
private void applyProjectStateForAddedOrExistingProject(long version, ProjectState state, @Nullable ProjectState previousState) {
assert assertReadonlyRepositoriesNotInUseForWrites(state);
final var projectId = state.projectId();
assert ProjectId.DEFAULT.equals(projectId) == false || previousState != null : "default project cannot be added";
assert previousState == null || projectId.equals(previousState.projectId())
: "current and previous states must refer to the same project, but got " + projectId + " != " + previousState.projectId();
final RepositoriesMetadata newMetadata = RepositoriesMetadata.get(state.metadata());
final RepositoriesMetadata oldMetadata = previousState == null
? RepositoriesMetadata.EMPTY
: RepositoriesMetadata.get(previousState.metadata());
final Map<String, Repository> projectRepositories = getProjectRepositories(projectId);
// Check if repositories got changed
if (oldMetadata.equalsIgnoreGenerations(newMetadata)) {
for (Repository repo : repositories.values()) {
repo.updateState(state);
for (Repository repo : projectRepositories.values()) {
repo.updateState(state.cluster());
}
return;
}
logger.trace("processing new index repositories for state version [{}]", event.state().version());
logger.trace("processing new index repositories for project [{}] and state version [{}]", projectId, version);
Map<String, Repository> survivors = new HashMap<>();
// First, remove repositories that are no longer there
for (Map.Entry<String, Repository> entry : repositories.entrySet()) {
if (newMetadata.repository(entry.getKey()) == null) {
logger.debug("unregistering repository [{}]", entry.getKey());
Repository repository = entry.getValue();
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
} else {
survivors.put(entry.getKey(), entry.getValue());
}
}
final var survivors = closeRemovedRepositories(version, projectId, projectRepositories, newMetadata);
Map<String, Repository> builder = new HashMap<>();
@ -644,12 +741,13 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
// Found previous version of this repository
if (canUpdateInPlace(repositoryMetadata, repository) == false) {
// Previous version is different from the version in settings
logger.debug("updating repository [{}]", repositoryMetadata.name());
logger.debug("updating repository {}", projectRepoString(projectId, repositoryMetadata.name()));
closeRepository(repository);
archiveRepositoryStats(repository, state.version());
archiveRepositoryStats(repository, version);
repository = null;
try {
repository = createRepository(
projectId,
repositoryMetadata,
typesRegistry,
RepositoriesService::createUnknownTypeRepository
@ -657,32 +755,57 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
} catch (RepositoryException ex) {
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(() -> "failed to change repository [" + repositoryMetadata.name() + "]", ex);
repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex);
logger.warn(() -> "failed to change repository " + projectRepoString(projectId, repositoryMetadata.name()), ex);
repository = new InvalidRepository(projectId, repositoryMetadata, ex);
}
}
} else {
try {
repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository);
repository = createRepository(
projectId,
repositoryMetadata,
typesRegistry,
RepositoriesService::createUnknownTypeRepository
);
} catch (RepositoryException ex) {
logger.warn(() -> "failed to create repository [" + repositoryMetadata.name() + "]", ex);
repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex);
logger.warn(() -> "failed to create repository " + projectRepoString(projectId, repositoryMetadata.name()), ex);
repository = new InvalidRepository(projectId, repositoryMetadata, ex);
}
}
assert repository != null : "repository should not be null here";
logger.debug("registering repository [{}]", repositoryMetadata.name());
logger.debug("registering repository [{}]", projectRepoString(projectId, repositoryMetadata.name()));
builder.put(repositoryMetadata.name(), repository);
}
for (Repository repo : builder.values()) {
repo.updateState(state);
repo.updateState(state.cluster());
}
repositories = unmodifiableMap(builder);
} catch (Exception ex) {
assert false : new AssertionError(ex);
logger.warn("failure updating cluster state ", ex);
if (builder.isEmpty() == false) {
repositories.put(projectId, unmodifiableMap(builder));
} else {
repositories.remove(projectId);
}
}
private Map<String, Repository> closeRemovedRepositories(
long version,
ProjectId projectId,
Map<String, Repository> projectRepositories,
RepositoriesMetadata newMetadata
) {
Map<String, Repository> survivors = new HashMap<>();
for (Map.Entry<String, Repository> entry : projectRepositories.entrySet()) {
if (newMetadata.repository(entry.getKey()) == null) {
logger.debug("unregistering repository {}", projectRepoString(projectId, entry.getKey()));
Repository repository = entry.getValue();
closeRepository(repository);
archiveRepositoryStats(repository, version);
} else {
survivors.put(entry.getKey(), entry.getValue());
}
}
return survivors;
}
private static boolean canUpdateInPlace(RepositoryMetadata updatedMetadata, Repository repository) {
assert updatedMetadata.name().equals(repository.getMetadata().name());
return repository.getMetadata().type().equals(updatedMetadata.type())
@ -692,12 +815,13 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
/**
* Gets the {@link RepositoryData} for the given repository.
*
* @param projectId project to look for the repository
* @param repositoryName repository name
* @param listener listener to pass {@link RepositoryData} to
*/
public void getRepositoryData(final String repositoryName, final ActionListener<RepositoryData> listener) {
public void getRepositoryData(final ProjectId projectId, final String repositoryName, final ActionListener<RepositoryData> listener) {
try {
Repository repository = repository(repositoryName);
Repository repository = repository(projectId, repositoryName);
assert repository != null; // should only be called once we've validated the repository exists
repository.getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445?
@ -709,18 +833,28 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
/**
* Returns registered repository
* Returns registered repository, either internal or external
*
* @param repositoryName repository name
* @return registered repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
@FixForMultiProject
@Deprecated(forRemoval = true)
public Repository repository(String repositoryName) {
Repository repository = repositories.get(repositoryName);
if (repository != null) {
return repository;
return repository(ProjectId.DEFAULT, repositoryName);
}
repository = internalRepositories.get(repositoryName);
/**
* Returns registered repository, either internal or external
*
* @param projectId the project to look for the repository
* @param repositoryName repository name
* @return registered repository
* @throws RepositoryMissingException if repository with such name isn't registered
*/
public Repository repository(ProjectId projectId, String repositoryName) {
Repository repository = repositoryOrNull(projectId, repositoryName);
if (repository != null) {
return repository;
}
@ -728,10 +862,33 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
/**
* @return the current collection of registered repositories, keyed by name.
* Similar to {@link #repository(ProjectId, String)}, but returns {@code null} instead of throw if the repository is not found.
*/
public Map<String, Repository> getRepositories() {
return unmodifiableMap(repositories);
public Repository repositoryOrNull(ProjectId projectId, String repositoryName) {
Repository repository = repositories.getOrDefault(projectId, Map.of()).get(repositoryName);
if (repository != null) {
return repository;
}
return internalRepositories.getOrDefault(projectId, Map.of()).get(repositoryName);
}
/**
* @return the current collection of registered repositories from all projects.
*/
public List<Repository> getRepositories() {
return repositories.values().stream().map(Map::values).flatMap(Collection::stream).toList();
}
/**
* @return the current collection of registered repositories for the given project, keyed by name.
*/
public Map<String, Repository> getProjectRepositories(ProjectId projectId) {
return repositories.getOrDefault(projectId, Map.of());
}
// Package private for testing
boolean hasRepositoryTrackingForProject(ProjectId projectId) {
return repositories.containsKey(projectId) || internalRepositories.containsKey(projectId);
}
public List<RepositoryStatsSnapshot> repositoriesStats() {
@ -745,8 +902,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
public RepositoriesStats getRepositoriesThrottlingStats() {
return new RepositoriesStats(
repositories.values()
.stream()
getRepositories().stream()
.collect(
Collectors.toMap(
r -> r.getMetadata().name(),
@ -757,7 +913,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
private List<RepositoryStatsSnapshot> getRepositoryStatsForActiveRepositories() {
return Stream.concat(repositories.values().stream(), internalRepositories.values().stream())
return Stream.concat(
repositories.values().stream().map(Map::values).flatMap(Collection::stream),
internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream)
)
.filter(r -> r instanceof MeteredBlobStoreRepository)
.map(r -> (MeteredBlobStoreRepository) r)
.map(MeteredBlobStoreRepository::statsSnapshot)
@ -768,12 +927,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
return repositoriesStatsArchive.clear(maxVersionToClear);
}
public void registerInternalRepository(String name, String type) {
public void registerInternalRepository(ProjectId projectId, String name, String type) {
RepositoryMetadata metadata = new RepositoryMetadata(name, type, Settings.EMPTY);
Repository repository = internalRepositories.computeIfAbsent(name, (n) -> {
logger.debug("put internal repository [{}][{}]", name, type);
return createRepository(metadata, internalTypesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists);
});
Repository repository = internalRepositories.compute(projectId, (ignored, existingRepos) -> {
if (existingRepos == null) {
existingRepos = Map.of();
}
if (existingRepos.containsKey(name)) {
return existingRepos;
}
logger.debug("put internal repository [{}][{}]", projectRepoString(projectId, name), type);
final var repo = createRepository(
projectId,
metadata,
internalTypesRegistry,
RepositoriesService::throwRepositoryTypeDoesNotExists
);
final var newRepos = new HashMap<>(existingRepos);
newRepos.put(name, repo);
return unmodifiableMap(newRepos);
}).get(name);
if (type.equals(repository.getMetadata().type()) == false) {
logger.warn(
() -> format(
@ -785,7 +958,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
type
)
);
} else if (repositories.containsKey(name)) {
} else if (getProjectRepositories(projectId).containsKey(name)) {
logger.warn(
() -> format(
"non-internal repository [%s] already registered. this repository will block the "
@ -798,11 +971,24 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
public void unregisterInternalRepository(String name) {
Repository repository = internalRepositories.remove(name);
public void unregisterInternalRepository(ProjectId projectId, String name) {
final var repositoryRef = new AtomicReference<Repository>();
internalRepositories.computeIfPresent(projectId, (ignored, existingRepos) -> {
if (existingRepos.containsKey(name) == false) {
return existingRepos;
}
final var newRepos = new HashMap<>(existingRepos);
repositoryRef.set(newRepos.remove(name));
if (newRepos.isEmpty()) {
return null;
} else {
return unmodifiableMap(newRepos);
}
});
Repository repository = repositoryRef.get();
if (repository != null) {
RepositoryMetadata metadata = repository.getMetadata();
logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), name));
logger.debug(() -> format("delete internal repository [%s][%s].", metadata.type(), projectRepoString(projectId, name)));
closeRepository(repository);
}
}
@ -811,7 +997,11 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* Closes the given repository.
*/
private static void closeRepository(Repository repository) {
logger.debug("closing repository [{}][{}]", repository.getMetadata().type(), repository.getMetadata().name());
logger.debug(
"closing repository [{}]{}",
repository.getMetadata().type(),
projectRepoString(repository.getProjectId(), repository.getMetadata().name())
);
repository.close();
}
@ -824,19 +1014,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
/**
* Creates repository holder. This method starts the repository
*/
@FixForMultiProject(description = "resolve the actual ProjectId")
@Deprecated(forRemoval = true)
private static Repository createRepository(
RepositoryMetadata repositoryMetadata,
Map<String, Repository.Factory> factories,
BiFunction<ProjectId, RepositoryMetadata, Repository> defaultFactory
) {
return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory);
}
/**
* Creates repository holder. This method starts the repository
*/
@ -863,27 +1040,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
/**
* Creates a repository holder.
*
* <p>WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:</p>
*
* <ul>
* <li>This method does not register the repository (e.g., in the cluster state).</li>
* <li>This method starts the repository. The repository should be closed after use.</li>
* <li>The repository metadata should be associated to an already registered non-internal repository type and factory pair.</li>
* </ul>
*
* @param repositoryMetadata the repository metadata
* @return the started repository
* @throws RepositoryException if repository type is not registered
*/
@FixForMultiProject(description = "resolve the actual ProjectId")
@Deprecated(forRemoval = true)
public Repository createRepository(RepositoryMetadata repositoryMetadata) {
return createRepository(ProjectId.DEFAULT, repositoryMetadata);
}
/**
* Creates a repository holder.
*
@ -947,25 +1103,26 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
}
}
private static void ensureRepositoryNotInUseForWrites(ClusterState clusterState, String repository) {
if (SnapshotsInProgress.get(clusterState).forRepo(repository).isEmpty() == false) {
private static void ensureRepositoryNotInUseForWrites(ProjectState projectState, String repository) {
final ProjectId projectId = projectState.projectId();
if (SnapshotsInProgress.get(projectState.cluster()).forRepo(projectId, repository).isEmpty() == false) {
throw newRepositoryConflictException(repository, "snapshot is in progress");
}
for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(clusterState).getEntries()) {
if (entry.repository().equals(repository)) {
for (SnapshotDeletionsInProgress.Entry entry : SnapshotDeletionsInProgress.get(projectState.cluster()).getEntries()) {
if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) {
throw newRepositoryConflictException(repository, "snapshot deletion is in progress");
}
}
for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(clusterState).entries()) {
if (entry.repository().equals(repository)) {
for (RepositoryCleanupInProgress.Entry entry : RepositoryCleanupInProgress.get(projectState.cluster()).entries()) {
if (entry.projectId().equals(projectId) && entry.repository().equals(repository)) {
throw newRepositoryConflictException(repository, "repository clean up is in progress");
}
}
}
private static void ensureRepositoryNotInUse(ClusterState clusterState, String repository) {
ensureRepositoryNotInUseForWrites(clusterState, repository);
for (RestoreInProgress.Entry entry : RestoreInProgress.get(clusterState)) {
private static void ensureRepositoryNotInUse(ProjectState projectState, String repository) {
ensureRepositoryNotInUseForWrites(projectState, repository);
for (RestoreInProgress.Entry entry : RestoreInProgress.get(projectState.cluster())) {
if (repository.equals(entry.snapshot().getRepository())) {
throw newRepositoryConflictException(repository, "snapshot restore is in progress");
}
@ -979,11 +1136,12 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
/**
* Test-only check for the invariant that read-only repositories never have any write activities.
*/
private static boolean assertReadonlyRepositoriesNotInUseForWrites(ClusterState clusterState) {
for (final var repositoryMetadata : RepositoriesMetadata.get(clusterState).repositories()) {
private static boolean assertReadonlyRepositoriesNotInUseForWrites(ProjectState projectState) {
assert projectState != null;
for (final var repositoryMetadata : RepositoriesMetadata.get(projectState.metadata()).repositories()) {
if (isReadOnly(repositoryMetadata.settings())) {
try {
ensureRepositoryNotInUseForWrites(clusterState, repositoryMetadata.name());
ensureRepositoryNotInUseForWrites(projectState, repositoryMetadata.name());
} catch (Exception e) {
throw new AssertionError("repository [" + repositoryMetadata + "] is readonly but still in use", e);
}
@ -1071,7 +1229,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
return RepositoryUsageStats.EMPTY;
}
final var statsByType = new HashMap<String, Map<String, Long>>();
for (final var repository : repositories.values()) {
for (final var repository : getRepositories()) {
final var repositoryType = repository.getMetadata().type();
final var typeStats = statsByType.computeIfAbsent(repositoryType, ignored -> new HashMap<>());
typeStats.compute(COUNT_USAGE_STATS_NAME, (k, count) -> (count == null ? 0L : count) + 1);
@ -1096,8 +1254,8 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
protected void doClose() throws IOException {
clusterService.removeApplier(this);
final Collection<Repository> repos = new ArrayList<>();
repos.addAll(internalRepositories.values());
repos.addAll(repositories.values());
repos.addAll(internalRepositories.values().stream().map(Map::values).flatMap(Collection::stream).toList());
repos.addAll(getRepositories());
IOUtils.close(repos);
for (Repository repo : repos) {
repo.awaitIdle();

View file

@ -52,6 +52,19 @@ public interface RepositoryOperation {
projectId.writeTo(out);
out.writeString(name);
}
@Override
public String toString() {
return projectRepoString(projectId, name);
}
}
static ProjectRepo projectRepo(ProjectId projectId, String repositoryName) {
return new ProjectRepo(projectId, repositoryName);
}
static String projectRepoString(ProjectId projectId, String repositoryName) {
return "[" + projectId + "][" + repositoryName + "]";
}
DiffableUtils.KeySerializer<ProjectRepo> PROJECT_REPO_SERIALIZER = new DiffableUtils.KeySerializer<>() {

View file

@ -9,7 +9,7 @@
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
@ -21,7 +21,7 @@ import java.util.List;
import java.util.Set;
/**
* The result of calling {@link #resolve(ClusterState, String[])} to resolve a description of some snapshot repositories (from a path
* The result of calling {@link #resolve(ProjectMetadata, String[])} to resolve a description of some snapshot repositories (from a path
* component of a request to the get-repositories or get-snapshots APIs) against the known repositories in the cluster state: the
* {@link RepositoryMetadata} for the extant repositories that match the description, together with a list of the parts of the description
* that failed to match any known repository.
@ -38,8 +38,8 @@ public record ResolvedRepositories(List<RepositoryMetadata> repositoryMetadata,
|| (patterns.length == 1 && (ALL_PATTERN.equalsIgnoreCase(patterns[0]) || Regex.isMatchAllPattern(patterns[0])));
}
public static ResolvedRepositories resolve(ClusterState state, String[] patterns) {
final var repositories = RepositoriesMetadata.get(state);
public static ResolvedRepositories resolve(ProjectMetadata projectMetadata, String[] patterns) {
final var repositories = RepositoriesMetadata.get(projectMetadata);
if (isMatchAll(patterns)) {
return new ResolvedRepositories(repositories.repositories(), List.of());
}

View file

@ -17,6 +17,7 @@ import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -39,6 +40,7 @@ public class VerifyNodeRepositoryAction {
private final ClusterService clusterService;
private final RepositoriesService repositoriesService;
private final ProjectResolver projectResolver;
@Inject
public TransportAction(
@ -46,18 +48,20 @@ public class VerifyNodeRepositoryAction {
ActionFilters actionFilters,
ThreadPool threadPool,
ClusterService clusterService,
RepositoriesService repositoriesService
RepositoriesService repositoriesService,
ProjectResolver projectResolver
) {
super(ACTION_NAME, transportService, actionFilters, Request::new, threadPool.executor(ThreadPool.Names.SNAPSHOT));
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
this.projectResolver = projectResolver;
}
@Override
protected void doExecute(Task task, Request request, ActionListener<ActionResponse.Empty> listener) {
DiscoveryNode localNode = clusterService.state().nodes().getLocalNode();
try {
Repository repository = repositoriesService.repository(request.repository);
Repository repository = repositoriesService.repository(projectResolver.getProjectId(), request.repository);
repository.verify(request.verificationToken, localNode);
listener.onResponse(ActionResponse.Empty.INSTANCE);
} catch (Exception e) {

View file

@ -2407,7 +2407,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (repoMetadata.generation() != RepositoryData.UNKNOWN_REPO_GEN) {
throw new RepositoryException(repoMetadata.name(), "Found unexpected initialized repo metadata [" + repoMetadata + "]");
}
final var project = currentState.metadata().getDefaultProject();
final var project = currentState.metadata().getProject(getProjectId());
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(project)
@ -2476,6 +2476,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// someone switched the repo contents out from under us
RepositoriesService.updateRepositoryUuidInMetadata(
clusterService,
getProjectId(),
metadata.name(),
loaded,
new ThreadedActionListener<>(threadPool.generic(), listener.map(v -> loaded))
@ -3115,7 +3116,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
private RepositoryMetadata getRepoMetadata(ClusterState state) {
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state).repository(metadata.name());
final RepositoryMetadata repositoryMetadata = RepositoriesMetadata.get(state.getMetadata().getProject(getProjectId()))
.repository(metadata.name());
assert repositoryMetadata != null || lifecycle.stoppedOrClosed()
: "did not find metadata for repo [" + metadata.name() + "] in state [" + lifecycleState() + "]";
return repositoryMetadata;

View file

@ -602,7 +602,9 @@ public final class RestoreService implements ClusterStateApplier {
return;
}
for (Repository repository : repositoriesService.getRepositories().values()) {
@FixForMultiProject(description = "resolve the actual projectId, ES-10228")
final var projectId = ProjectId.DEFAULT;
for (Repository repository : repositoriesService.getProjectRepositories(projectId).values()) {
// We only care about BlobStoreRepositories because they're the only ones that can contain a searchable snapshot, and we
// only care about ones with missing UUIDs. It's possible to have the UUID change from under us if, e.g., the repository was
// wiped by an external force, but in this case any searchable snapshots are lost anyway so it doesn't really matter.

View file

@ -605,7 +605,7 @@ public final class SnapshotShardsService extends AbstractLifecycleComponent impl
throw new IndexShardSnapshotFailedException(shardId, "shard didn't fully recover yet");
}
final Repository repository = repositoriesService.repository(snapshot.getRepository());
final Repository repository = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository());
SnapshotIndexCommit snapshotIndexCommit = null;
try {
snapshotStatus.updateStatusDescription("acquiring commit reference from IndexShard: triggers a shard flush");

View file

@ -1377,7 +1377,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
final String repoName = snapshot.getRepository();
if (tryEnterRepoLoop(repoName)) {
if (repositoryData == null) {
repositoriesService.repository(repoName)
repositoriesService.repository(snapshot.getProjectId(), repoName)
.getRepositoryData(
EsExecutors.DIRECT_EXECUTOR_SERVICE, // TODO contemplate threading here, do we need to fork, see #101445?
new ActionListener<>() {
@ -1486,7 +1486,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
}
final String repository = snapshot.getRepository();
final ListenableFuture<Metadata> metadataListener = new ListenableFuture<>();
final Repository repo = repositoriesService.repository(snapshot.getRepository());
final Repository repo = repositoriesService.repository(snapshot.getProjectId(), snapshot.getRepository());
if (entry.isClone()) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(metadataListener, () -> {
final Metadata existingMetadata = repo.getSnapshotGlobalMetadata(entry.source());
@ -2441,7 +2441,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
*/
private void deleteSnapshotsFromRepository(SnapshotDeletionsInProgress.Entry deleteEntry, IndexVersion minNodeVersion) {
final long expectedRepoGen = deleteEntry.repositoryStateId();
repositoriesService.getRepositoryData(deleteEntry.repository(), new ActionListener<>() {
repositoriesService.getRepositoryData(deleteEntry.projectId(), deleteEntry.repository(), new ActionListener<>() {
@Override
public void onResponse(RepositoryData repositoryData) {
assert repositoryData.getGenId() == expectedRepoGen
@ -2564,7 +2564,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
return;
}
final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
repositoriesService.repository(deleteEntry.repository())
repositoriesService.repository(deleteEntry.projectId(), deleteEntry.repository())
.deleteSnapshots(snapshotIds, repositoryData.getGenId(), minNodeVersion, new ActionListener<>() {
@Override
public void onResponse(RepositoryData updatedRepoData) {
@ -3788,7 +3788,7 @@ public final class SnapshotsService extends AbstractLifecycleComponent implement
entry.source(),
clone.getValue(),
clone.getKey(),
repositoriesService.repository(entry.repository())
repositoriesService.repository(entry.projectId(), entry.repository())
);
}
}

View file

@ -34,6 +34,7 @@ import java.util.Set;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -147,12 +148,12 @@ public class ReservedRepositoryActionTests extends ESTestCase {
);
doAnswer(invocation -> {
var request = (PutRepositoryRequest) invocation.getArguments()[0];
var request = (PutRepositoryRequest) invocation.getArguments()[1];
if (request.type().equals("inter_planetary")) {
throw new RepositoryException(request.name(), "repository type [" + request.type() + "] does not exist");
}
return null;
}).when(repositoriesService).validateRepositoryCanBeCreated(any());
}).when(repositoriesService).validateRepositoryCanBeCreated(eq(ProjectId.DEFAULT), any());
return repositoriesService;
}

View file

@ -22,7 +22,6 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@ -44,7 +43,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
}
public void testGetHealthNoRepos() {
when(repositoriesService.getRepositories()).thenReturn(Map.of());
when(repositoriesService.getRepositories()).thenReturn(List.of());
var health = repositoriesHealthTracker.determineCurrentHealth();
@ -58,7 +57,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
when(metadata.generation()).thenReturn(randomNonNegativeLong());
var repo = mock(Repository.class);
when(repo.getMetadata()).thenReturn(metadata);
when(repositoriesService.getRepositories()).thenReturn(Map.of(randomAlphaOfLength(10), repo));
when(repositoriesService.getRepositories()).thenReturn(List.of(repo));
var health = repositoriesHealthTracker.determineCurrentHealth();
@ -68,9 +67,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
public void testGetHealthUnknownType() {
var repo = createRepositoryMetadata();
when(repositoriesService.getRepositories()).thenReturn(
Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(randomProjectIdOrDefault(), repo))
);
when(repositoriesService.getRepositories()).thenReturn(List.of(new UnknownTypeRepository(randomProjectIdOrDefault(), repo)));
var health = repositoriesHealthTracker.determineCurrentHealth();
@ -82,7 +79,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
public void testGetHealthInvalid() {
var repo = createRepositoryMetadata();
when(repositoriesService.getRepositories()).thenReturn(
Map.of(repo.name(), new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test")))
List.of(new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test")))
);
var health = repositoriesHealthTracker.determineCurrentHealth();

View file

@ -23,12 +23,14 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.routing.GlobalRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
@ -68,8 +70,16 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BooleanSupplier;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
public class RepositoriesServiceTests extends ESTestCase {
@ -77,6 +87,7 @@ public class RepositoriesServiceTests extends ESTestCase {
private ClusterService clusterService;
private RepositoriesService repositoriesService;
private ThreadPool threadPool;
private ProjectId projectId;
@Override
public void setUp() throws Exception {
@ -94,6 +105,13 @@ public class RepositoriesServiceTests extends ESTestCase {
Collections.emptySet()
);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
projectId = randomProjectIdOrDefault();
if (ProjectId.DEFAULT.equals(projectId) == false) {
ClusterServiceUtils.setState(
clusterService,
ClusterState.builder(clusterService.state()).putProjectMetadata(ProjectMetadata.builder(projectId)).build()
);
}
DiscoveryNode localNode = DiscoveryNodeUtils.builder("local").name("local").roles(Set.of(DiscoveryNodeRole.MASTER_ROLE)).build();
NodeClient client = new NodeClient(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
@ -157,9 +175,9 @@ public class RepositoriesServiceTests extends ESTestCase {
public void testRegisterInternalRepository() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(repoName);
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName));
repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(projectId, repoName);
assertEquals(repoName, repository.getMetadata().name());
assertEquals(TestRepository.TYPE, repository.getMetadata().type());
assertEquals(Settings.EMPTY, repository.getMetadata().settings());
@ -168,24 +186,24 @@ public class RepositoriesServiceTests extends ESTestCase {
public void testUnregisterInternalRepository() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(repoName);
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName));
repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(projectId, repoName);
assertFalse(((TestRepository) repository).isClosed);
repositoriesService.unregisterInternalRepository(repoName);
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
repositoriesService.unregisterInternalRepository(projectId, repoName);
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName));
assertTrue(((TestRepository) repository).isClosed);
}
public void testRegisterWillNotUpdateIfInternalRepositoryWithNameExists() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(repoName);
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName));
repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE);
Repository repository = repositoriesService.repository(projectId, repoName);
assertFalse(((TestRepository) repository).isClosed);
repositoriesService.registerInternalRepository(repoName, TestRepository.TYPE);
repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE);
assertFalse(((TestRepository) repository).isClosed);
Repository repository2 = repositoriesService.repository(repoName);
Repository repository2 = repositoriesService.repository(projectId, repoName);
assertSame(repository, repository2);
}
@ -203,11 +221,11 @@ public class RepositoriesServiceTests extends ESTestCase {
.type(VerificationFailRepository.TYPE)
.verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
repositoriesService.registerRepository(projectId, request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
// also make sure that cluster state does not include failed repo
assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(repoName); });
assertThrows(RepositoryMissingException.class, () -> { repositoriesService.repository(projectId, repoName); });
}
public void testPutRepositoryVerificationFailsOnExisting() {
@ -216,7 +234,7 @@ public class RepositoriesServiceTests extends ESTestCase {
.type(TestRepository.TYPE)
.verify(true);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
repositoriesService.registerRepository(projectId, request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
@ -225,10 +243,10 @@ public class RepositoriesServiceTests extends ESTestCase {
.type(VerificationFailRepository.TYPE)
.verify(true);
resultListener = new SubscribableListener<>();
repositoriesService.registerRepository(request, resultListener);
repositoriesService.registerRepository(projectId, request, resultListener);
var failure = safeAwaitFailure(resultListener);
assertThat(failure, isA(RepositoryVerificationException.class));
var repository = repositoriesService.repository(repoName);
var repository = repositoriesService.repository(projectId, repoName);
assertEquals(repository.getMetadata().type(), TestRepository.TYPE);
}
@ -238,14 +256,14 @@ public class RepositoriesServiceTests extends ESTestCase {
.type(VerificationFailRepository.TYPE)
.verify(false);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
repositoriesService.registerRepository(projectId, request, resultListener);
var ackResponse = safeAwait(resultListener);
assertTrue(ackResponse.isAcknowledged());
}
public void testRepositoriesStatsCanHaveTheSameNameAndDifferentTypeOverTime() {
String repoName = "name";
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName));
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName));
ClusterState clusterStateWithRepoTypeA = createClusterStateWithRepo(repoName, MeteredRepositoryTypeA.TYPE);
@ -276,7 +294,7 @@ public class RepositoriesServiceTests extends ESTestCase {
var clusterState = createClusterStateWithRepo(repoName, "unknown");
repositoriesService.applyClusterState(new ClusterChangedEvent("starting", clusterState, emptyState()));
var repo = repositoriesService.repository(repoName);
var repo = repositoriesService.repository(projectId, repoName);
assertThat(repo, isA(UnknownTypeRepository.class));
}
@ -288,7 +306,7 @@ public class RepositoriesServiceTests extends ESTestCase {
repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState));
assertThat(
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(),
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(),
equalTo("[" + repoName + "] missing")
);
}
@ -297,7 +315,7 @@ public class RepositoriesServiceTests extends ESTestCase {
var repoName = randomAlphaOfLengthBetween(10, 25);
var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type("unknown");
repositoriesService.registerRepository(request, new ActionListener<>() {
repositoriesService.registerRepository(projectId, request, new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail("Should not register unknown repository type");
@ -306,7 +324,10 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override
public void onFailure(Exception e) {
assertThat(e, isA(RepositoryException.class));
assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist for project [default]"));
assertThat(
e.getMessage(),
equalTo("[" + repoName + "] repository type [unknown] does not exist for project [" + projectId + "]")
);
}
});
}
@ -318,7 +339,7 @@ public class RepositoriesServiceTests extends ESTestCase {
var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState()));
var repo = repositoriesService.repository(repoName);
var repo = repositoriesService.repository(projectId, repoName);
assertThat(repo, isA(InvalidRepository.class));
}
@ -329,12 +350,12 @@ public class RepositoriesServiceTests extends ESTestCase {
var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState()));
var repo = repositoriesService.repository(repoName);
var repo = repositoriesService.repository(projectId, repoName);
assertThat(repo, isA(InvalidRepository.class));
clusterState = createClusterStateWithRepo(repoName, TestRepository.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("put test repository", clusterState, emptyState()));
repo = repositoriesService.repository(repoName);
repo = repositoriesService.repository(projectId, repoName);
assertThat(repo, isA(TestRepository.class));
}
@ -346,7 +367,7 @@ public class RepositoriesServiceTests extends ESTestCase {
repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState()));
repositoriesService.applyClusterState(new ClusterChangedEvent("removing repo", emptyState(), clusterState));
assertThat(
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(repoName)).getMessage(),
expectThrows(RepositoryMissingException.class, () -> repositoriesService.repository(projectId, repoName)).getMessage(),
equalTo("[" + repoName + "] missing")
);
}
@ -370,17 +391,17 @@ public class RepositoriesServiceTests extends ESTestCase {
var clusterState = createClusterStateWithRepo(repoName, UnstableRepository.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("put unstable repository", clusterState, emptyState()));
var repo = repositoriesService.repository(repoName);
var repo = repositoriesService.repository(projectId, repoName);
assertThat(repo, isA(InvalidRepository.class));
// 2. repository creation successfully when current node become master node and repository is put again
var request = new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).name(repoName).type(TestRepository.TYPE);
var resultListener = new SubscribableListener<AcknowledgedResponse>();
repositoriesService.registerRepository(request, resultListener);
repositoriesService.registerRepository(projectId, request, resultListener);
var response = safeAwait(resultListener);
assertTrue(response.isAcknowledged());
assertThat(repositoriesService.repository(repoName), isA(TestRepository.class));
assertThat(repositoriesService.repository(projectId, repoName), isA(TestRepository.class));
}
public void testCannotSetRepositoryReadonlyFlagDuringGenerationChange() {
@ -393,19 +414,21 @@ public class RepositoriesServiceTests extends ESTestCase {
.newForked(
l -> repositoriesService.registerRepository(
projectId,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE),
l.map(ignored -> null)
)
)
.andThen(l -> updateGenerations(repoName, originalGeneration, newGeneration, l))
.andThenAccept(ignored -> {
final var metadata = repositoriesService.repository(repoName).getMetadata();
final var metadata = repositoriesService.repository(projectId, repoName).getMetadata();
assertEquals(originalGeneration, metadata.generation());
assertEquals(newGeneration, metadata.pendingGeneration());
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
})
.andThen(
l -> repositoriesService.registerRepository(
projectId,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE)
.settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)),
ActionTestUtils.assertNoSuccessListener(e -> {
@ -425,20 +448,21 @@ public class RepositoriesServiceTests extends ESTestCase {
)
)
.andThenAccept(ignored -> {
final var metadata = repositoriesService.repository(repoName).getMetadata();
final var metadata = repositoriesService.repository(projectId, repoName).getMetadata();
assertEquals(originalGeneration, metadata.generation());
assertEquals(newGeneration, metadata.pendingGeneration());
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
})
.andThen(l -> updateGenerations(repoName, newGeneration, newGeneration, l))
.andThenAccept(ignored -> {
final var metadata = repositoriesService.repository(repoName).getMetadata();
final var metadata = repositoriesService.repository(projectId, repoName).getMetadata();
assertEquals(newGeneration, metadata.generation());
assertEquals(newGeneration, metadata.pendingGeneration());
assertNull(metadata.settings().getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null));
})
.andThen(
l -> repositoriesService.registerRepository(
projectId,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName).type(TestRepository.TYPE)
.settings(Settings.builder().put(BlobStoreRepository.READONLY_SETTING_KEY, true)),
l.map(ignored -> null)
@ -446,7 +470,7 @@ public class RepositoriesServiceTests extends ESTestCase {
)
.andThenAccept(
ignored -> assertTrue(
repositoriesService.repository(repoName)
repositoriesService.repository(projectId, repoName)
.getMetadata()
.settings()
.getAsBoolean(BlobStoreRepository.READONLY_SETTING_KEY, null)
@ -455,17 +479,120 @@ public class RepositoriesServiceTests extends ESTestCase {
);
}
public void testRepositoryUpdatesForMultipleProjects() {
assertThat(repositoriesService.getRepositories(), empty());
// 1. Initial project
final var repoName = "repo";
final var state0 = createClusterStateWithRepo(repoName, TestRepository.TYPE);
repositoriesService.applyClusterState(new ClusterChangedEvent("test", state0, emptyState()));
assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(1));
final var initialProjectRepo = (TestRepository) repositoriesService.getProjectRepositories(projectId).values().iterator().next();
assertThat(repositoriesService.getRepositories(), contains(initialProjectRepo));
if (ProjectId.DEFAULT.equals(projectId) == false) {
assertFalse(repositoriesService.hasRepositoryTrackingForProject(ProjectId.DEFAULT));
}
// 2. Add a new project
final var anotherProjectId = randomUniqueProjectId();
final var anotherRepoName = "another-repo";
final var state1 = ClusterState.builder(state0)
.putProjectMetadata(
ProjectMetadata.builder(anotherProjectId)
.putCustom(
RepositoriesMetadata.TYPE,
new RepositoriesMetadata(
List.of(
new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.EMPTY),
new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY)
)
)
)
)
.build();
repositoriesService.applyClusterState(new ClusterChangedEvent("test", state1, state0));
assertThat(repositoriesService.getProjectRepositories(anotherProjectId), aMapWithSize(2));
assertThat(repositoriesService.getRepositories(), hasSize(3));
assertThat(repositoriesService.getRepositories(), hasItem(initialProjectRepo));
final Collection<Repository> anotherProjectRepos = repositoriesService.getProjectRepositories(anotherProjectId).values();
assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new)));
// 3. Update existing project
assertFalse(initialProjectRepo.isClosed);
final var state2 = ClusterState.builder(state1)
.putProjectMetadata(
ProjectMetadata.builder(projectId)
.putCustom(
RepositoriesMetadata.TYPE,
new RepositoriesMetadata(
List.of(
new RepositoryMetadata(repoName, TestRepository.TYPE, Settings.builder().put("foo", "bar").build()),
new RepositoryMetadata(anotherRepoName, TestRepository.TYPE, Settings.EMPTY)
)
)
)
)
.build();
repositoriesService.applyClusterState(new ClusterChangedEvent("test", state2, state1));
assertTrue(initialProjectRepo.isClosed);
assertThat(repositoriesService.getProjectRepositories(projectId), aMapWithSize(2));
assertThat(repositoriesService.getRepositories(), hasSize(4));
assertThat(
repositoriesService.getRepositories(),
hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new))
);
assertThat(repositoriesService.getRepositories(), hasItems(anotherProjectRepos.toArray(Repository[]::new)));
// 4. Remove another project
anotherProjectRepos.forEach(repo -> assertFalse(((TestRepository) repo).isClosed));
final var state3 = ClusterState.builder(state2)
.metadata(Metadata.builder(state2.metadata()).removeProject(anotherProjectId))
.routingTable(GlobalRoutingTable.builder(state2.globalRoutingTable()).removeProject(anotherProjectId).build())
.build();
repositoriesService.applyClusterState(new ClusterChangedEvent("test", state3, state2));
anotherProjectRepos.forEach(repo -> assertTrue(((TestRepository) repo).isClosed));
assertFalse(repositoriesService.hasRepositoryTrackingForProject(anotherProjectId));
assertThat(repositoriesService.getRepositories(), hasSize(2));
assertThat(
repositoriesService.getRepositories(),
hasItems(repositoriesService.getProjectRepositories(projectId).values().toArray(Repository[]::new))
);
}
public void testInternalRepositoryForMultiProjects() {
assertThat(repositoriesService.getRepositories(), empty());
String repoName = "name";
repositoriesService.registerInternalRepository(projectId, repoName, TestRepository.TYPE);
final TestRepository initialProjectRepo = (TestRepository) repositoriesService.repository(projectId, repoName);
// Repo of the same name but different project is a different repository instance
final var anotherProjectId = randomUniqueProjectId();
repositoriesService.registerInternalRepository(anotherProjectId, repoName, TestRepository.TYPE);
final TestRepository anotherProjectRepo = (TestRepository) repositoriesService.repository(anotherProjectId, repoName);
assertThat(initialProjectRepo, not(sameInstance(anotherProjectRepo)));
// Remove the project repository, the repo should be closed and the project is removed from tracking
repositoriesService.unregisterInternalRepository(projectId, repoName);
assertFalse(repositoriesService.hasRepositoryTrackingForProject(projectId));
assertTrue(initialProjectRepo.isClosed);
assertThat(repositoriesService.repository(anotherProjectId, repoName), sameInstance(anotherProjectRepo));
assertTrue(anotherProjectRepo.isStarted);
}
private void updateGenerations(String repositoryName, long safeGeneration, long pendingGeneration, ActionListener<?> listener) {
clusterService.submitUnbatchedStateUpdateTask("update repo generations", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return new ClusterState.Builder(currentState).metadata(
Metadata.builder(currentState.metadata())
final ProjectMetadata projectMetadata = currentState.getMetadata().getProject(projectId);
return ClusterState.builder(currentState)
.putProjectMetadata(
ProjectMetadata.builder(projectMetadata)
.putCustom(
RepositoriesMetadata.TYPE,
RepositoriesMetadata.get(currentState).withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration)
RepositoriesMetadata.get(projectMetadata)
.withUpdatedGeneration(repositoryName, safeGeneration, pendingGeneration)
)
).build();
)
.build();
}
@Override
@ -482,12 +609,13 @@ public class RepositoriesServiceTests extends ESTestCase {
private ClusterState createClusterStateWithRepo(String repoName, String repoType) {
ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
Metadata.Builder mdBuilder = Metadata.builder();
mdBuilder.putCustom(
state.putProjectMetadata(
ProjectMetadata.builder(projectId)
.putCustom(
RepositoriesMetadata.TYPE,
new RepositoriesMetadata(Collections.singletonList(new RepositoryMetadata(repoName, repoType, Settings.EMPTY)))
)
);
state.metadata(mdBuilder);
return state.build();
}
@ -500,6 +628,7 @@ public class RepositoriesServiceTests extends ESTestCase {
expectThrows(
RepositoryException.class,
() -> repositoriesService.registerRepository(
projectId,
new PutRepositoryRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT, repoName),
null
)
@ -625,7 +754,7 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override
public void updateState(final ClusterState state) {
metadata = RepositoriesMetadata.get(state).repository(metadata.name());
metadata = RepositoriesMetadata.get(state.metadata().getProject(getProjectId())).repository(metadata.name());
}
@Override

View file

@ -10,6 +10,8 @@
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
@ -23,25 +25,36 @@ import java.util.List;
public class ResolvedRepositoriesTests extends ESTestCase {
private ProjectId projectId;
@Override
public void setUp() throws Exception {
super.setUp();
projectId = randomProjectIdOrDefault();
}
public void testAll() {
runMatchAllTest();
runMatchAllTest("*");
runMatchAllTest("_all");
}
private static void runMatchAllTest(String... patterns) {
private void runMatchAllTest(String... patterns) {
final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new));
final var result = getRepositories(state, patterns);
assertEquals(RepositoriesMetadata.get(state).repositories(), result.repositoryMetadata());
assertEquals(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories(), result.repositoryMetadata());
assertThat(result.missing(), Matchers.empty());
assertFalse(result.hasMissingRepositories());
}
public void testMatchingName() {
final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new));
final var name = randomFrom(RepositoriesMetadata.get(state).repositories()).name();
final var name = randomFrom(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories()).name();
final var result = getRepositories(state, name);
assertEquals(List.of(RepositoriesMetadata.get(state).repository(name)), result.repositoryMetadata());
assertEquals(
List.of(RepositoriesMetadata.get(state.metadata().getProject(projectId)).repository(name)),
result.repositoryMetadata()
);
assertThat(result.missing(), Matchers.empty());
assertFalse(result.hasMissingRepositories());
}
@ -49,7 +62,7 @@ public class ResolvedRepositoriesTests extends ESTestCase {
public void testMismatchingName() {
final var state = clusterStateWithRepositories(randomList(1, 4, ESTestCase::randomIdentifier).toArray(String[]::new));
final var notAName = randomValueOtherThanMany(
n -> RepositoriesMetadata.get(state).repositories().stream().anyMatch(m -> n.equals(m.name())),
n -> RepositoriesMetadata.get(state.metadata().getProject(projectId)).repositories().stream().anyMatch(m -> n.equals(m.name())),
ESTestCase::randomIdentifier
);
final var result = getRepositories(state, notAName);
@ -70,25 +83,29 @@ public class ResolvedRepositoriesTests extends ESTestCase {
runWildcardTest(state, List.of("other-repo", "test-match-1", "test-match-2"), "other-repo", "test-*", "-*-exclude");
}
private static void runWildcardTest(ClusterState clusterState, List<String> expectedNames, String... patterns) {
private void runWildcardTest(ClusterState clusterState, List<String> expectedNames, String... patterns) {
final var result = getRepositories(clusterState, patterns);
final var description = Strings.format("%s should yield %s", Arrays.toString(patterns), expectedNames);
assertFalse(description, result.hasMissingRepositories());
assertEquals(description, expectedNames, result.repositoryMetadata().stream().map(RepositoryMetadata::name).toList());
}
private static ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) {
return ResolvedRepositories.resolve(clusterState, patterns);
private ResolvedRepositories getRepositories(ClusterState clusterState, String... patterns) {
return ResolvedRepositories.resolve(clusterState.metadata().getProject(projectId), patterns);
}
private static ClusterState clusterStateWithRepositories(String... repoNames) {
private ClusterState clusterStateWithRepositories(String... repoNames) {
final var repositories = new ArrayList<RepositoryMetadata>(repoNames.length);
for (final var repoName : repoNames) {
repositories.add(new RepositoryMetadata(repoName, "test", Settings.EMPTY));
}
return ClusterState.EMPTY_STATE.copyAndUpdateMetadata(
b -> b.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories))
);
return ClusterState.EMPTY_STATE.copyAndUpdateMetadata(b -> {
ProjectMetadata.Builder projectBuilder = b.getProject(projectId);
if (projectBuilder == null) {
projectBuilder = ProjectMetadata.builder(projectId);
}
b.put(projectBuilder.putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositories)));
});
}
}

View file

@ -12,6 +12,7 @@ package org.elasticsearch.repositories.blobstore;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.util.TestUtil;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -21,6 +22,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
@ -219,11 +221,13 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
/** Create a {@link Repository} with a random name **/
private Repository createRepository() {
@FixForMultiProject(description = "randomize when snapshot and restore support multiple projects, see also ES-10225, ES-10228")
final ProjectId projectId = ProjectId.DEFAULT;
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata);
final FsRepository repository = new FsRepository(
randomProjectIdOrDefault(),
projectId,
repositoryMetadata,
createEnvironment(),
xContentRegistry(),

View file

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
@ -520,11 +521,12 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
}
public void testEnsureUploadListenerIsResolvedWhenAFileSnapshotTaskFails() throws Exception {
final ProjectId projectId = randomProjectIdOrDefault();
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata);
final FsRepository repository = new FsRepository(
randomProjectIdOrDefault(),
projectId,
repositoryMetadata,
createEnvironment(),
xContentRegistry(),

View file

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
@ -259,7 +260,7 @@ public class RestoreServiceTests extends ESTestCase {
}
final RepositoriesService repositoriesService = mock(RepositoriesService.class);
when(repositoriesService.getRepositories()).thenReturn(repositories);
when(repositoriesService.getProjectRepositories(eq(ProjectId.DEFAULT))).thenReturn(repositories);
final AtomicBoolean completed = new AtomicBoolean();
RestoreService.refreshRepositoryUuids(
true,

View file

@ -2522,7 +2522,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
actionFilters,
threadPool,
clusterService,
repositoriesService
repositoriesService,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
@ -2769,7 +2770,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
);
actions.put(
TransportPutRepositoryAction.TYPE,
new TransportPutRepositoryAction(transportService, clusterService, repositoriesService, threadPool, actionFilters)
new TransportPutRepositoryAction(
transportService,
clusterService,
repositoriesService,
threadPool,
actionFilters,
TestProjectResolvers.DEFAULT_PROJECT_ONLY
)
);
actions.put(
TransportCleanupRepositoryAction.TYPE,

View file

@ -398,14 +398,14 @@ public final class BlobStoreTestUtil {
* @param repositoryMetadata RepositoryMetadata to initialize the cluster state with
* @return Mock ClusterService
*/
public static ClusterService mockClusterService(RepositoryMetadata repositoryMetadata) {
public static ClusterService mockClusterService(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
return mockClusterService(
ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(
Metadata.builder()
Metadata.builder(ClusterState.EMPTY_STATE.metadata())
.clusterUUID(UUIDs.randomBase64UUID(random()))
.put(
ProjectMetadata.builder(ProjectId.DEFAULT)
ProjectMetadata.builder(projectId)
.putCustom(
RepositoriesMetadata.TYPE,
new RepositoriesMetadata(Collections.singletonList(repositoryMetadata))

View file

@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.tasks.Task;
@ -45,7 +47,9 @@ public class DeleteInternalCcrRepositoryAction extends ActionType<ActionResponse
@Override
protected void doExecute(Task task, DeleteInternalCcrRepositoryRequest request, ActionListener<ActionResponse.Empty> listener) {
repositoriesService.unregisterInternalRepository(request.getName());
@FixForMultiProject(description = "resolve the actual projectId, ES-12139")
final var projectId = ProjectId.DEFAULT;
repositoriesService.unregisterInternalRepository(projectId, request.getName());
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}

View file

@ -12,7 +12,9 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.tasks.Task;
@ -45,7 +47,9 @@ public class PutInternalCcrRepositoryAction extends ActionType<ActionResponse.Em
@Override
protected void doExecute(Task task, PutInternalCcrRepositoryRequest request, ActionListener<ActionResponse.Empty> listener) {
repositoriesService.registerInternalRepository(request.getName(), request.getType());
@FixForMultiProject(description = "resolve the actual projectId, ES-12139")
final var projectId = ProjectId.DEFAULT;
repositoriesService.registerInternalRepository(projectId, request.getName(), request.getType());
listener.onResponse(ActionResponse.Empty.INSTANCE);
}
}

View file

@ -44,6 +44,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
@ -95,6 +96,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.hamcrest.Matchers.equalTo;
@FixForMultiProject(description = "Randomizing projectId once snapshot and restore support multiple projects, ES-10225, ES-10228")
public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
public void testSourceIncomplete() throws IOException {
@ -123,7 +125,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
final var projectId = randomProjectIdOrDefault();
final var projectId = ProjectId.DEFAULT;
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
@ -174,7 +176,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
recoverShardFromStore(shard);
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
final var projectId = randomProjectIdOrDefault();
final var projectId = ProjectId.DEFAULT;
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
@ -215,7 +217,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
final var projectId = randomProjectIdOrDefault();
final var projectId = ProjectId.DEFAULT;
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
@ -344,7 +346,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
final var projectId = randomProjectIdOrDefault();
final var projectId = ProjectId.DEFAULT;
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
@ -571,7 +573,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
private Repository createRepository(ProjectId projectId) {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata);
final Repository repository = new FsRepository(
projectId,
repositoryMetadata,

View file

@ -9,6 +9,8 @@ package org.elasticsearch.xpack.searchablesnapshots.store;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -46,14 +48,16 @@ public class RepositorySupplier implements Supplier<BlobStoreRepository> {
}
private Repository getRepository() {
@FixForMultiProject(description = "resolve the actual projectId, ES-12138")
final var projectId = ProjectId.DEFAULT;
if (repositoryUuid == null) {
// repository containing pre-7.12 snapshots has no UUID so we assume it matches by name
final Repository repository = repositoriesService.repository(repositoryName);
final Repository repository = repositoriesService.repository(projectId, repositoryName);
assert repository.getMetadata().name().equals(repositoryName) : repository.getMetadata().name() + " vs " + repositoryName;
return repository;
}
final Map<String, Repository> repositoriesByName = repositoriesService.getRepositories();
final Map<String, Repository> repositoriesByName = repositoriesService.getProjectRepositories(projectId);
final String currentRepositoryNameHint = repositoryNameHint;
final Repository repositoryByLastKnownName = repositoriesByName.get(currentRepositoryNameHint);

View file

@ -41,6 +41,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.blobcache.shared.SharedBlobCacheService;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.UUIDs;
@ -592,8 +593,9 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
repositorySettings.build()
);
final ProjectId projectId = randomProjectIdOrDefault();
final BlobStoreRepository repository = new FsRepository(
randomProjectIdOrDefault(),
projectId,
repositoryMetadata,
new Environment(
Settings.builder()
@ -604,7 +606,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
null
),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(repositoryMetadata),
BlobStoreTestUtil.mockClusterService(projectId, repositoryMetadata),
MockBigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
);

View file

@ -47,8 +47,10 @@ tasks.named("yamlRestTest").configure {
ArrayList<String> blacklist = [
/* These tests don't work on multi-project yet - we need to go through each of them and make them work */
'^cat.recovery/*/*',
'^cat.repositories/*/*',
'^cat.snapshots/*/*',
'^cluster.desired_balance/10_basic/*',
'^cluster.stats/10_basic/snapshot stats reported in get cluster stats',
'^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API
'^health/10_basic/*',
'^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security
@ -57,11 +59,12 @@ tasks.named("yamlRestTest").configure {
'^indices.resolve_cluster/*/*/*',
'^indices.shard_stores/*/*',
'^migration/*/*',
'^nodes.stats/70_repository_throttling_stats/Repository throttling stats (some repositories exist)',
'^snapshot.clone/*/*',
'^snapshot.create/*/*',
'^snapshot.delete/*/*',
'^snapshot.get/*/*',
'^snapshot.get_repository/20_repository_uuid/*',
'^snapshot.get_repository/*/*',
'^snapshot.restore/*/*',
'^snapshot.status/*/*',
'^synonyms/*/*',