Make repositories project aware (#128285)

Pass project-id explicitly to repository factory and make it part of the
repository interface.

Relates: ES-11839
This commit is contained in:
Yang Wang 2025-05-28 17:29:39 +10:00 committed by GitHub
parent 790be1ea28
commit 6bc1452b43
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
68 changed files with 642 additions and 118 deletions

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories.azure;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -110,6 +111,7 @@ public class AzureRepository extends MeteredBlobStoreRepository {
private final RepositoriesMetrics repositoriesMetrics;
public AzureRepository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final AzureStorageService storageService,
@ -119,6 +121,7 @@ public class AzureRepository extends MeteredBlobStoreRepository {
final RepositoriesMetrics repositoriesMetrics
) {
super(
projectId,
metadata,
namedXContentRegistry,
clusterService,

View file

@ -62,10 +62,11 @@ public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, R
RecoverySettings recoverySettings,
RepositoriesMetrics repositoriesMetrics
) {
return Collections.singletonMap(AzureRepository.TYPE, metadata -> {
return Collections.singletonMap(AzureRepository.TYPE, (projectId, metadata) -> {
AzureStorageService storageService = azureStoreService.get();
assert storageService != null;
return new AzureRepository(
projectId,
metadata,
namedXContentRegistry,
storageService,

View file

@ -9,6 +9,7 @@
package org.elasticsearch.repositories.azure;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -23,6 +24,7 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
@ -35,7 +37,9 @@ public class AzureRepositorySettingsTests extends ESTestCase {
.putList(Environment.PATH_DATA_SETTING.getKey(), tmpPaths())
.put(settings)
.build();
final ProjectId projectId = randomProjectIdOrDefault();
final AzureRepository azureRepository = new AzureRepository(
projectId,
new RepositoryMetadata("foo", "azure", internalSettings),
NamedXContentRegistry.EMPTY,
mock(AzureStorageService.class),
@ -44,6 +48,7 @@ public class AzureRepositorySettingsTests extends ESTestCase {
new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
RepositoriesMetrics.NOOP
);
assertThat(azureRepository.getProjectId(), equalTo(projectId));
assertThat(azureRepository.getBlobStore(), is(nullValue()));
return azureRepository;
}

View file

@ -276,7 +276,8 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
) {
return Collections.singletonMap(
GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(
(projectId, metadata) -> new GoogleCloudStorageRepository(
projectId,
metadata,
registry,
this.storageService,

View file

@ -57,7 +57,8 @@ public class GoogleCloudStoragePlugin extends Plugin implements RepositoryPlugin
) {
return Collections.singletonMap(
GoogleCloudStorageRepository.TYPE,
metadata -> new GoogleCloudStorageRepository(
(projectId, metadata) -> new GoogleCloudStorageRepository(
projectId,
metadata,
namedXContentRegistry,
this.storageService,

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories.gcs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
@ -88,6 +89,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
private final GcsRepositoryStatsCollector statsCollector;
GoogleCloudStorageRepository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final GoogleCloudStorageService storageService,
@ -97,6 +99,7 @@ class GoogleCloudStorageRepository extends MeteredBlobStoreRepository {
final GcsRepositoryStatsCollector statsCollector
) {
super(
projectId,
metadata,
namedXContentRegistry,
clusterService,

View file

@ -9,13 +9,22 @@
package org.elasticsearch.repositories.gcs;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.junit.Assert;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
public class GoogleCloudStoragePluginTests extends ESTestCase {
public void testExposedSettings() {
@ -37,4 +46,26 @@ public class GoogleCloudStoragePluginTests extends ESTestCase {
settings.stream().map(Setting::getKey).toList()
);
}
public void testRepositoryProjectId() {
final var projectId = randomProjectIdOrDefault();
final var repository = new GoogleCloudStorageRepository(
projectId,
new RepositoryMetadata(
randomIdentifier(),
GoogleCloudStorageRepository.TYPE,
Settings.builder()
.put(GoogleCloudStorageRepository.BUCKET.getKey(), randomIdentifier())
.put(GoogleCloudStorageRepository.BASE_PATH.getKey(), randomIdentifier())
.build()
),
NamedXContentRegistry.EMPTY,
mock(GoogleCloudStorageService.class),
BlobStoreTestUtil.mockClusterService(),
MockBigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
mock(GcsRepositoryStatsCollector.class)
);
assertThat(repository.getProjectId(), equalTo(projectId));
}
}

View file

@ -147,6 +147,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
// construct our own repo instance so we can inject a threadpool that allows to control the passage of time
try (
var repository = new S3Repository(
randomProjectIdOrDefault(),
node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(),
xContentRegistry(),
node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).findFirst().get().getService(),

View file

@ -22,6 +22,7 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -598,6 +599,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
@Override
protected S3Repository createRepository(
ProjectId projectId,
RepositoryMetadata metadata,
NamedXContentRegistry registry,
ClusterService clusterService,
@ -605,7 +607,16 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
RecoverySettings recoverySettings,
S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics) {
return new S3Repository(
projectId,
metadata,
registry,
getService(),
clusterService,
bigArrays,
recoverySettings,
s3RepositoriesMetrics
) {
@Override
public BlobStore blobStore() {

View file

@ -27,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
@ -38,6 +39,7 @@ import org.elasticsearch.common.blobstore.BlobStoreException;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.rest.RestStatus;
@ -74,6 +76,8 @@ class S3BlobStore implements BlobStore {
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
@Nullable // if the blobstore is at the cluster level
private final ProjectId projectId;
private final S3Service service;
private final BigArrays bigArrays;
@ -106,6 +110,7 @@ class S3BlobStore implements BlobStore {
private final boolean addPurposeCustomQueryParameter;
S3BlobStore(
@Nullable ProjectId projectId,
S3Service service,
String bucket,
boolean serverSideEncryption,
@ -119,6 +124,7 @@ class S3BlobStore implements BlobStore {
S3RepositoriesMetrics s3RepositoriesMetrics,
BackoffPolicy retryThrottledDeleteBackoffPolicy
) {
this.projectId = projectId;
this.service = service;
this.bigArrays = bigArrays;
this.bucket = bucket;
@ -257,6 +263,7 @@ class S3BlobStore implements BlobStore {
}
public AmazonS3Reference clientReference() {
// TODO: use service.client(ProjectId, RepositoryMetadata), see https://github.com/elastic/elasticsearch/pull/127631
return service.client(repositoryMetadata);
}

View file

@ -14,6 +14,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
@ -275,6 +276,7 @@ class S3Repository extends MeteredBlobStoreRepository {
* Constructs an s3 backed repository
*/
S3Repository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final S3Service service,
@ -284,6 +286,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
super(
projectId,
metadata,
namedXContentRegistry,
clusterService,
@ -468,6 +471,7 @@ class S3Repository extends MeteredBlobStoreRepository {
@Override
protected S3BlobStore createBlobStore() {
return new S3BlobStore(
getProjectId(),
service,
bucket,
serverSideEncryption,

View file

@ -13,6 +13,7 @@ import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
@ -57,6 +58,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
// proxy method for testing
protected S3Repository createRepository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final NamedXContentRegistry registry,
final ClusterService clusterService,
@ -64,7 +66,16 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
final RecoverySettings recoverySettings,
final S3RepositoriesMetrics s3RepositoriesMetrics
) {
return new S3Repository(metadata, registry, service.get(), clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics);
return new S3Repository(
projectId,
metadata,
registry,
service.get(),
clusterService,
bigArrays,
recoverySettings,
s3RepositoriesMetrics
);
}
@Override
@ -99,7 +110,15 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
final S3RepositoriesMetrics s3RepositoriesMetrics = new S3RepositoriesMetrics(repositoriesMetrics);
return Collections.singletonMap(
S3Repository.TYPE,
metadata -> createRepository(metadata, registry, clusterService, bigArrays, recoverySettings, s3RepositoriesMetrics)
(projectId, metadata) -> createRepository(
projectId,
metadata,
registry,
clusterService,
bigArrays,
recoverySettings,
s3RepositoriesMetrics
)
);
}

View file

@ -231,6 +231,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
final S3BlobStore s3BlobStore = new S3BlobStore(
randomProjectIdOrDefault(),
service,
"bucket",
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),

View file

@ -12,6 +12,7 @@ package org.elasticsearch.repositories.s3;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.services.s3.S3Client;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.settings.ClusterSettings;
@ -32,6 +33,7 @@ import java.util.Map;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
@ -152,7 +154,9 @@ public class S3RepositoryTests extends ESTestCase {
}
private S3Repository createS3Repo(RepositoryMetadata metadata) {
return new S3Repository(
final ProjectId projectId = randomProjectIdOrDefault();
final S3Repository s3Repository = new S3Repository(
projectId,
metadata,
NamedXContentRegistry.EMPTY,
new DummyS3Service(mock(Environment.class), mock(ResourceWatcherService.class)),
@ -161,6 +165,8 @@ public class S3RepositoryTests extends ESTestCase {
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
S3RepositoriesMetrics.NOOP
);
assertThat(s3Repository.getProjectId(), equalTo(projectId));
return s3Repository;
}
public void testAnalysisFailureDetail() {

View file

@ -53,7 +53,8 @@ public class URLRepositoryPlugin extends Plugin implements RepositoryPlugin {
) {
return Collections.singletonMap(
URLRepository.TYPE,
metadata -> new URLRepository(
(projectId, metadata) -> new URLRepository(
projectId,
metadata,
env,
namedXContentRegistry,

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories.url;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -92,6 +93,7 @@ public class URLRepository extends BlobStoreRepository {
* Constructs a read-only URL-based repository
*/
public URLRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -100,7 +102,7 @@ public class URLRepository extends BlobStoreRepository {
RecoverySettings recoverySettings,
URLHttpClient.Factory httpClientFactory
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
if (URL_SETTING.exists(metadata.settings()) == false && REPOSITORIES_URL_SETTING.exists(environment.settings()) == false) {
throw new RepositoryException(metadata.name(), "missing url");

View file

@ -9,6 +9,7 @@
package org.elasticsearch.repositories.url;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.blobstore.url.http.URLHttpClient;
import org.elasticsearch.common.settings.ClusterSettings;
@ -26,6 +27,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
@ -34,7 +36,9 @@ import static org.mockito.Mockito.mock;
public class URLRepositoryTests extends ESTestCase {
private URLRepository createRepository(Settings baseSettings, RepositoryMetadata repositoryMetadata) {
return new URLRepository(
final ProjectId projectId = randomProjectIdOrDefault();
final URLRepository repository = new URLRepository(
projectId,
repositoryMetadata,
TestEnvironment.newEnvironment(baseSettings),
new NamedXContentRegistry(Collections.emptyList()),
@ -43,6 +47,8 @@ public class URLRepositoryTests extends ESTestCase {
new RecoverySettings(baseSettings, new ClusterSettings(baseSettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
mock(URLHttpClient.Factory.class)
);
assertThat(repository.getProjectId(), equalTo(projectId));
return repository;
}
public void testWhiteListingRepoURL() throws IOException {

View file

@ -71,7 +71,15 @@ public final class HdfsPlugin extends Plugin implements RepositoryPlugin {
) {
return Collections.singletonMap(
"hdfs",
(metadata) -> new HdfsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new HdfsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}

View file

@ -17,6 +17,7 @@ import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -56,6 +57,7 @@ public final class HdfsRepository extends BlobStoreRepository {
private final String pathSetting;
public HdfsRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -63,7 +65,7 @@ public final class HdfsRepository extends BlobStoreRepository {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
this.environment = environment;
this.chunkSize = metadata.settings().getAsBytesSize("chunk_size", null);

View file

@ -19,13 +19,21 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.env.Environment;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.fixtures.hdfs.HdfsClientThreadLeakFilter;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.hamcrest.CoreMatchers;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
@ -49,6 +57,8 @@ import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomP
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.readBlobFully;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.writeBlob;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
@ThreadLeakFilters(filters = { HdfsClientThreadLeakFilter.class })
public class HdfsBlobStoreContainerTests extends ESTestCase {
@ -107,6 +117,20 @@ public class HdfsBlobStoreContainerTests extends ESTestCase {
});
}
public void testRepositoryProjectId() {
final var projectId = randomProjectIdOrDefault();
final var repository = new HdfsRepository(
projectId,
new RepositoryMetadata(randomIdentifier(), "hdfs", Settings.builder().put("uri", "hdfs:///").put("path", "foo").build()),
mock(Environment.class),
NamedXContentRegistry.EMPTY,
BlobStoreTestUtil.mockClusterService(),
MockBigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
);
assertThat(repository.getProjectId(), equalTo(projectId));
}
public void testReadOnly() throws Exception {
FileContext fileContext = createTestContext();
// Constructor will not create dir if read only

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -79,7 +80,15 @@ public class ShardSnapshotsServiceIT extends ESIntegTestCase {
) {
return Collections.singletonMap(
TYPE,
metadata -> new FailingRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FailingRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}
@ -94,6 +103,7 @@ public class ShardSnapshotsServiceIT extends ESIntegTestCase {
private final boolean failLoadShardSnapshots;
public FailingRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -101,7 +111,7 @@ public class ShardSnapshotsServiceIT extends ESIntegTestCase {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.failGetRepositoryData = metadata.settings().getAsBoolean(FAIL_GET_REPOSITORY_DATA_SETTING_KEY, false);
this.failLoadShardSnapshot = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOT_SETTING_KEY, false);
this.failLoadShardSnapshots = metadata.settings().getAsBoolean(FAIL_LOAD_SHARD_SNAPSHOTS_SETTING_KEY, false);

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyRepositoryResponse;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
@ -50,6 +51,7 @@ public class InvalidRepositoryIT extends ESIntegTestCase {
);
public UnstableRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -57,7 +59,7 @@ public class InvalidRepositoryIT extends ESIntegTestCase {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
List<String> unstableNodes = UNSTABLE_NODES.get(metadata.settings());
if (unstableNodes.contains(clusterService.getNodeName())) {
throw new RepositoryException(metadata.name(), "Failed to create repository: current node is not stable");
@ -76,7 +78,15 @@ public class InvalidRepositoryIT extends ESIntegTestCase {
) {
return Collections.singletonMap(
TYPE,
(metadata) -> new UnstableRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new UnstableRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}

View file

@ -9,6 +9,7 @@
package org.elasticsearch.repositories.blobstore;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -100,13 +101,22 @@ public class BlobStoreRepositoryOperationPurposeIT extends AbstractSnapshotInteg
) {
return Map.of(
ASSERTING_REPO_TYPE,
metadata -> new AssertingRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new AssertingRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}
private static class AssertingRepository extends FsRepository {
AssertingRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -114,7 +124,7 @@ public class BlobStoreRepositoryOperationPurposeIT extends AbstractSnapshotInteg
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
@Override

View file

@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.BigArrays;
@ -176,6 +177,7 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
final Map<String, AtomicInteger> indicesMetadata = new ConcurrentHashMap<>();
public CountingMockRepository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final Environment environment,
final NamedXContentRegistry namedXContentRegistry,
@ -183,7 +185,7 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
@Override
@ -216,7 +218,15 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
) {
return Collections.singletonMap(
TYPE,
metadata -> new CountingMockRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new CountingMockRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}

View file

@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -212,7 +213,15 @@ public class SnapshotsServiceDoubleFinalizationIT extends AbstractSnapshotIntegT
) {
return Map.of(
REPO_TYPE,
metadata -> new TestRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new TestRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}
@ -223,6 +232,7 @@ public class SnapshotsServiceDoubleFinalizationIT extends AbstractSnapshotIntegT
private final AtomicReference<CyclicBarrier> barrierRef = new AtomicReference<>();
public TestRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -230,7 +240,7 @@ public class SnapshotsServiceDoubleFinalizationIT extends AbstractSnapshotIntegT
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
public CyclicBarrier blockOnceForListBlobs() {

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.Lifecycle;
@ -44,6 +45,11 @@ public class FilterRepository implements Repository {
return in;
}
@Override
public ProjectId getProjectId() {
return in.getProjectId();
}
@Override
public RepositoryMetadata getMetadata() {
return in.getMetadata();

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -35,10 +36,12 @@ import java.util.function.BooleanSupplier;
*/
public class InvalidRepository extends AbstractLifecycleComponent implements Repository {
private final ProjectId projectId;
private final RepositoryMetadata repositoryMetadata;
private final RepositoryException creationException;
public InvalidRepository(RepositoryMetadata repositoryMetadata, RepositoryException creationException) {
public InvalidRepository(ProjectId projectId, RepositoryMetadata repositoryMetadata, RepositoryException creationException) {
this.projectId = projectId;
this.repositoryMetadata = repositoryMetadata;
this.creationException = creationException;
}
@ -51,6 +54,11 @@ public class InvalidRepository extends AbstractLifecycleComponent implements Rep
);
}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
public RepositoryMetadata getMetadata() {
return repositoryMetadata;

View file

@ -54,7 +54,15 @@ public final class RepositoriesModule {
Map<String, Repository.Factory> factories = new HashMap<>();
factories.put(
FsRepository.TYPE,
metadata -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
for (RepositoryPlugin repoPlugin : repoPlugins) {

View file

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@ -44,7 +45,9 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
@ -65,7 +68,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -653,7 +656,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
// TODO: this catch is bogus, it means the old repo is already closed,
// but we have nothing to replace it
logger.warn(() -> "failed to change repository [" + repositoryMetadata.name() + "]", ex);
repository = new InvalidRepository(repositoryMetadata, ex);
repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex);
}
}
} else {
@ -661,7 +664,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
repository = createRepository(repositoryMetadata, typesRegistry, RepositoriesService::createUnknownTypeRepository);
} catch (RepositoryException ex) {
logger.warn(() -> "failed to create repository [" + repositoryMetadata.name() + "]", ex);
repository = new InvalidRepository(repositoryMetadata, ex);
repository = new InvalidRepository(state.metadata().getProject().id(), repositoryMetadata, ex);
}
}
assert repository != null : "repository should not be null here";
@ -822,19 +825,33 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
/**
* Creates repository holder. This method starts the repository
*/
@FixForMultiProject(description = "resolve the actual ProjectId")
@Deprecated(forRemoval = true)
private static Repository createRepository(
RepositoryMetadata repositoryMetadata,
Map<String, Repository.Factory> factories,
Function<RepositoryMetadata, Repository> defaultFactory
BiFunction<ProjectId, RepositoryMetadata, Repository> defaultFactory
) {
return createRepository(ProjectId.DEFAULT, repositoryMetadata, factories, defaultFactory);
}
/**
* Creates repository holder. This method starts the repository
*/
private static Repository createRepository(
@Nullable ProjectId projectId,
RepositoryMetadata repositoryMetadata,
Map<String, Repository.Factory> factories,
BiFunction<ProjectId, RepositoryMetadata, Repository> defaultFactory
) {
logger.debug("creating repository [{}][{}]", repositoryMetadata.type(), repositoryMetadata.name());
Repository.Factory factory = factories.get(repositoryMetadata.type());
if (factory == null) {
return defaultFactory.apply(repositoryMetadata);
return defaultFactory.apply(projectId, repositoryMetadata);
}
Repository repository = null;
try {
repository = factory.create(repositoryMetadata, factories::get);
repository = factory.create(projectId, repositoryMetadata, factories::get);
repository.start();
return repository;
} catch (Exception e) {
@ -859,21 +876,61 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* @return the started repository
* @throws RepositoryException if repository type is not registered
*/
@FixForMultiProject(description = "resolve the actual ProjectId")
@Deprecated(forRemoval = true)
public Repository createRepository(RepositoryMetadata repositoryMetadata) {
return createRepository(repositoryMetadata, typesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists);
return createRepository(ProjectId.DEFAULT, repositoryMetadata);
}
private static Repository throwRepositoryTypeDoesNotExists(RepositoryMetadata repositoryMetadata) {
throw new RepositoryException(repositoryMetadata.name(), "repository type [" + repositoryMetadata.type() + "] does not exist");
/**
* Creates a repository holder.
*
* <p>WARNING: This method is intended for expert only usage mainly in plugins/modules. Please take note of the following:</p>
*
* <ul>
* <li>This method does not register the repository (e.g., in the cluster state).</li>
* <li>This method starts the repository. The repository should be closed after use.</li>
* <li>The repository metadata should be associated to an already registered non-internal repository type and factory pair.</li>
* </ul>
*
* @param projectId the project that the repository is associated with
* @param repositoryMetadata the repository metadata
* @return the started repository
* @throws RepositoryException if repository type is not registered
*/
public Repository createRepository(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
return createRepository(
Objects.requireNonNull(projectId),
repositoryMetadata,
typesRegistry,
RepositoriesService::throwRepositoryTypeDoesNotExists
);
}
private static Repository createUnknownTypeRepository(RepositoryMetadata repositoryMetadata) {
/**
* Similar to {@link #createRepository(ProjectId, RepositoryMetadata)}, but repository is not associated with a project, i.e. the
* repository is at the cluster level.
*/
public Repository createNonProjectRepository(RepositoryMetadata repositoryMetadata) {
assert DiscoveryNode.isStateless(clusterService.getSettings())
: "outside stateless only project level repositories are allowed: " + repositoryMetadata;
return createRepository(null, repositoryMetadata, typesRegistry, RepositoriesService::throwRepositoryTypeDoesNotExists);
}
private static Repository throwRepositoryTypeDoesNotExists(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
throw new RepositoryException(
repositoryMetadata.name(),
"repository type [" + repositoryMetadata.type() + "] does not exist for project [" + projectId + "]"
);
}
private static Repository createUnknownTypeRepository(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
logger.warn(
"[{}] repository type [{}] is unknown; ensure that all required plugins are installed on this node",
repositoryMetadata.name(),
repositoryMetadata.type()
);
return new UnknownTypeRepository(repositoryMetadata);
return new UnknownTypeRepository(projectId, repositoryMetadata);
}
public static void validateRepositoryName(final String repositoryName) {

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.LifecycleComponent;
@ -57,15 +58,35 @@ public interface Repository extends LifecycleComponent {
interface Factory {
/**
* Constructs a repository.
* @param metadata metadata for the repository including name and settings
*
* @param projectId the project-id for the repository or {@code null} if the repository is at the cluster level.
* @param metadata metadata for the repository including name and settings
*/
Repository create(RepositoryMetadata metadata) throws Exception;
Repository create(@Nullable ProjectId projectId, RepositoryMetadata metadata) throws Exception;
default Repository create(RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup) throws Exception {
return create(metadata);
/**
* Constructs a repository.
* @param projectId the project-id for the repository or {@code null} if the repository is at the cluster level.
* @param metadata metadata for the repository including name and settings
* @param typeLookup a function that returns the repository factory for the given repository type.
*/
default Repository create(
@Nullable ProjectId projectId,
RepositoryMetadata metadata,
Function<String, Repository.Factory> typeLookup
) throws Exception {
return create(projectId, metadata);
}
}
/**
* Get the project-id for the repository.
*
* @return the project-id, or {@code null} if the repository is at the cluster level.
*/
@Nullable
ProjectId getProjectId();
/**
* Returns metadata about this repository.
*/

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -36,9 +37,11 @@ import java.util.function.BooleanSupplier;
*/
public class UnknownTypeRepository extends AbstractLifecycleComponent implements Repository {
private final ProjectId projectId;
private final RepositoryMetadata repositoryMetadata;
public UnknownTypeRepository(RepositoryMetadata repositoryMetadata) {
public UnknownTypeRepository(ProjectId projectId, RepositoryMetadata repositoryMetadata) {
this.projectId = projectId;
this.repositoryMetadata = repositoryMetadata;
}
@ -49,6 +52,11 @@ public class UnknownTypeRepository extends AbstractLifecycleComponent implements
);
}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
public RepositoryMetadata getMetadata() {
return repositoryMetadata;

View file

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -195,6 +196,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final Logger shutdownLogger = LogManager.getLogger(ShutdownLogger.class);
}
private final ProjectId projectId;
protected volatile RepositoryMetadata metadata;
protected final ThreadPool threadPool;
@ -485,6 +487,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
@SuppressWarnings("this-escape")
protected BlobStoreRepository(
final ProjectId projectId,
final RepositoryMetadata metadata,
final NamedXContentRegistry namedXContentRegistry,
final ClusterService clusterService,
@ -492,6 +495,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final RecoverySettings recoverySettings,
final BlobPath basePath
) {
this.projectId = projectId;
this.metadata = metadata;
this.threadPool = clusterService.getClusterApplierService().threadPool();
this.clusterService = clusterService;
@ -525,6 +529,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
);
}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
protected void doStart() {
uncleanStart = metadata.pendingGeneration() > RepositoryData.EMPTY_REPO_GEN

View file

@ -9,6 +9,7 @@
package org.elasticsearch.repositories.blobstore;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
@ -26,6 +27,7 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository {
private final RepositoryInfo repositoryInfo;
public MeteredBlobStoreRepository(
ProjectId projectId,
RepositoryMetadata metadata,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
@ -34,7 +36,7 @@ public abstract class MeteredBlobStoreRepository extends BlobStoreRepository {
BlobPath basePath,
Map<String, String> location
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
ThreadPool threadPool = clusterService.getClusterApplierService().threadPool();
this.repositoryInfo = new RepositoryInfo(
UUIDs.randomBase64UUID(),

View file

@ -11,6 +11,7 @@ package org.elasticsearch.repositories.fs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobPath;
@ -75,6 +76,7 @@ public class FsRepository extends BlobStoreRepository {
* Constructs a shared file system repository.
*/
public FsRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -82,7 +84,7 @@ public class FsRepository extends BlobStoreRepository {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, BlobPath.EMPTY);
this.environment = environment;
String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
if (location.isEmpty()) {

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequ
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -126,7 +127,7 @@ public class ReservedRepositoryActionTests extends ESTestCase {
private RepositoriesService mockRepositoriesService() {
var fsFactory = new Repository.Factory() {
@Override
public Repository create(RepositoryMetadata metadata) {
public Repository create(ProjectId projectId, RepositoryMetadata metadata) {
var repo = mock(Repository.class);
doAnswer(invocation -> metadata).when(repo).getMetadata();
return repo;

View file

@ -68,7 +68,9 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
public void testGetHealthUnknownType() {
var repo = createRepositoryMetadata();
when(repositoriesService.getRepositories()).thenReturn(Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(repo)));
when(repositoriesService.getRepositories()).thenReturn(
Map.of(randomAlphaOfLength(10), new UnknownTypeRepository(randomProjectIdOrDefault(), repo))
);
var health = repositoriesHealthTracker.determineCurrentHealth();
@ -80,7 +82,7 @@ public class RepositoriesHealthTrackerTests extends ESTestCase {
public void testGetHealthInvalid() {
var repo = createRepositoryMetadata();
when(repositoriesService.getRepositories()).thenReturn(
Map.of(repo.name(), new InvalidRepository(repo, new RepositoryException(repo.name(), "Test")))
Map.of(repo.name(), new InvalidRepository(randomProjectIdOrDefault(), repo, new RepositoryException(repo.name(), "Test")))
);
var health = repositoriesHealthTracker.determineCurrentHealth();

View file

@ -2842,7 +2842,7 @@ public class IndexShardTests extends IndexShardTestCase {
DiscoveryNode localNode = DiscoveryNodeUtils.builder("foo").roles(emptySet()).build();
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
target.restoreFromRepository(new RestoreOnlyRepository("test") {
target.restoreFromRepository(new RestoreOnlyRepository(randomProjectIdOrDefault(), "test") {
@Override
public void restoreShard(
Store store,

View file

@ -9,6 +9,7 @@
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.SnapshotId;
@ -19,12 +20,16 @@ import static org.hamcrest.Matchers.isA;
public class InvalidRepositoryTests extends ESTestCase {
private ProjectId projectId = randomProjectIdOrDefault();
private InvalidRepository repository = new InvalidRepository(
projectId,
new RepositoryMetadata("name", "type", Settings.EMPTY),
new RepositoryException("name", "failed to create repository")
);
public void testShouldThrowWhenGettingMetadata() {
assertThat(repository.getProjectId(), equalTo(projectId));
final var expectedException = expectThrows(
RepositoryException.class,
() -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid"))

View file

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -118,15 +119,15 @@ public class RepositoriesServiceTests extends ESTestCase {
Map<String, Repository.Factory> typesRegistry = Map.of(
TestRepository.TYPE,
TestRepository::new,
(projectId, metadata1) -> new TestRepository(projectId, metadata1),
UnstableRepository.TYPE,
UnstableRepository::new,
(projectId, metadata2) -> new UnstableRepository(projectId, metadata2),
VerificationFailRepository.TYPE,
VerificationFailRepository::new,
(projectId, metadata3) -> new VerificationFailRepository(projectId, metadata3),
MeteredRepositoryTypeA.TYPE,
metadata -> new MeteredRepositoryTypeA(metadata, clusterService),
(projectId, metadata) -> new MeteredRepositoryTypeA(projectId, metadata, clusterService),
MeteredRepositoryTypeB.TYPE,
metadata -> new MeteredRepositoryTypeB(metadata, clusterService)
(projectId, metadata) -> new MeteredRepositoryTypeB(projectId, metadata, clusterService)
);
repositoriesService = new RepositoriesService(
Settings.EMPTY,
@ -304,7 +305,7 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override
public void onFailure(Exception e) {
assertThat(e, isA(RepositoryException.class));
assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist"));
assertThat(e.getMessage(), equalTo("[" + repoName + "] repository type [unknown] does not exist for project [default]"));
}
});
}
@ -507,14 +508,21 @@ public class RepositoriesServiceTests extends ESTestCase {
private static class TestRepository implements Repository {
private static final String TYPE = "internal";
private final ProjectId projectId;
private RepositoryMetadata metadata;
private boolean isClosed;
private boolean isStarted;
private TestRepository(RepositoryMetadata metadata) {
private TestRepository(ProjectId projectId, RepositoryMetadata metadata) {
this.projectId = projectId;
this.metadata = metadata;
}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
public RepositoryMetadata getMetadata() {
return metadata;
@ -662,8 +670,8 @@ public class RepositoriesServiceTests extends ESTestCase {
private static class UnstableRepository extends TestRepository {
private static final String TYPE = "unstable";
private UnstableRepository(RepositoryMetadata metadata) {
super(metadata);
private UnstableRepository(ProjectId projectId, RepositoryMetadata metadata) {
super(projectId, metadata);
throw new RepositoryException(TYPE, "failed to create unstable repository");
}
}
@ -671,8 +679,8 @@ public class RepositoriesServiceTests extends ESTestCase {
private static class VerificationFailRepository extends TestRepository {
public static final String TYPE = "verify-fail";
private VerificationFailRepository(RepositoryMetadata metadata) {
super(metadata);
private VerificationFailRepository(ProjectId projectId, RepositoryMetadata metadata) {
super(projectId, metadata);
}
@Override
@ -685,8 +693,9 @@ public class RepositoriesServiceTests extends ESTestCase {
private static final String TYPE = "type-a";
private static final RepositoryStats STATS = new RepositoryStats(Map.of("GET", new BlobStoreActionStats(10, 13)));
private MeteredRepositoryTypeA(RepositoryMetadata metadata, ClusterService clusterService) {
private MeteredRepositoryTypeA(ProjectId projectId, RepositoryMetadata metadata, ClusterService clusterService) {
super(
projectId,
metadata,
mock(NamedXContentRegistry.class),
clusterService,
@ -712,8 +721,9 @@ public class RepositoriesServiceTests extends ESTestCase {
private static final String TYPE = "type-b";
private static final RepositoryStats STATS = new RepositoryStats(Map.of("LIST", new BlobStoreActionStats(20, 25)));
private MeteredRepositoryTypeB(RepositoryMetadata metadata, ClusterService clusterService) {
private MeteredRepositoryTypeB(ProjectId projectId, RepositoryMetadata metadata, ClusterService clusterService) {
super(
projectId,
metadata,
mock(NamedXContentRegistry.class),
clusterService,

View file

@ -9,16 +9,21 @@
package org.elasticsearch.repositories;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
public class UnknownTypeRepositoryTests extends ESTestCase {
private UnknownTypeRepository repository = new UnknownTypeRepository(new RepositoryMetadata("name", "type", Settings.EMPTY));
private ProjectId projectId = randomProjectIdOrDefault();
private UnknownTypeRepository repository = new UnknownTypeRepository(projectId, new RepositoryMetadata("name", "type", Settings.EMPTY));
public void testShouldThrowWhenGettingMetadata() {
assertThat(repository.getProjectId(), equalTo(projectId));
expectThrows(RepositoryException.class, () -> repository.getSnapshotGlobalMetadata(new SnapshotId("name", "uuid")));
}

View file

@ -75,7 +75,15 @@ public class BlobStoreRepositoryDeleteThrottlingTests extends ESSingleNodeTestCa
) {
return Collections.singletonMap(
TEST_REPO_TYPE,
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {
(projectId, metadata) -> new FsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
) {
@Override
protected BlobStore createBlobStore() throws Exception {
return new ConcurrencyLimitingBlobStore(super.createBlobStore());

View file

@ -219,6 +219,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final FsRepository repository = new FsRepository(
randomProjectIdOrDefault(),
repositoryMetadata,
createEnvironment(),
xContentRegistry(),

View file

@ -523,6 +523,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final FsRepository repository = new FsRepository(
randomProjectIdOrDefault(),
repositoryMetadata,
createEnvironment(),
xContentRegistry(),

View file

@ -26,6 +26,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOSupplier;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@ -101,7 +102,9 @@ public class FsRepositoryTests extends ESTestCase {
int numDocs = indexDocs(directory);
RepositoryMetadata metadata = new RepositoryMetadata("test", "fs", settings);
final ProjectId projectId = randomProjectIdOrDefault();
FsRepository repository = new FsRepository(
projectId,
metadata,
new Environment(settings, null),
NamedXContentRegistry.EMPTY,
@ -109,6 +112,7 @@ public class FsRepositoryTests extends ESTestCase {
MockBigArrays.NON_RECYCLING_INSTANCE,
new RecoverySettings(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
);
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
final Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_INDEX_UUID, "myindexUUID").build();
IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("myindex", indexSettings);
@ -227,7 +231,9 @@ public class FsRepositoryTests extends ESTestCase {
final AtomicBoolean canErrorForWriteBlob = new AtomicBoolean();
final AtomicBoolean shouldErrorForWriteMetadataBlob = new AtomicBoolean();
final AtomicBoolean writeBlobErrored = new AtomicBoolean(false);
final ProjectId projectId = randomProjectIdOrDefault();
final var repository = new FsRepository(
projectId,
metadata,
new Environment(settings, null),
NamedXContentRegistry.EMPTY,
@ -284,6 +290,7 @@ public class FsRepositoryTests extends ESTestCase {
};
}
};
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
final IndexSettings idxSettings = IndexSettingsModule.newIndexSettings(

View file

@ -2169,7 +2169,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
clusterService,
Collections.singletonMap(
FsRepository.TYPE,
metadata -> new FsRepository(metadata, environment, xContentRegistry(), clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FsRepository(
projectId,
metadata,
environment,
xContentRegistry(),
clusterService,
bigArrays,
recoverySettings
)
),
emptyMap(),
threadPool,

View file

@ -65,7 +65,8 @@ public class LatencySimulatingBlobStoreRepositoryTests extends AbstractSnapshotI
) {
return Map.of(
REPO_TYPE,
metadata -> new LatencySimulatingBlobStoreRepository(
(projectId, metadata) -> new LatencySimulatingBlobStoreRepository(
projectId,
metadata,
env,
namedXContentRegistry,

View file

@ -9,6 +9,7 @@
package org.elasticsearch.test.simulatedlatencyrepo;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -30,6 +31,7 @@ class LatencySimulatingBlobStoreRepository extends FsRepository {
private final Runnable simulator;
protected LatencySimulatingBlobStoreRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment env,
NamedXContentRegistry namedXContentRegistry,
@ -38,7 +40,7 @@ class LatencySimulatingBlobStoreRepository extends FsRepository {
RecoverySettings recoverySettings,
Runnable simulator
) {
super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.simulator = simulator;
}

View file

@ -41,7 +41,8 @@ public class LatencySimulatingRepositoryPlugin extends Plugin implements Reposit
) {
return Map.of(
TYPE,
metadata -> new LatencySimulatingBlobStoreRepository(
(projectId, metadata) -> new LatencySimulatingBlobStoreRepository(
projectId,
metadata,
env,
namedXContentRegistry,

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -42,9 +43,11 @@ import static org.elasticsearch.repositories.RepositoryData.MISSING_UUID;
/** A dummy repository for testing which just needs restore overridden */
public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent implements Repository {
private final ProjectId projectId;
private final String indexName;
public RestoreOnlyRepository(String indexName) {
public RestoreOnlyRepository(ProjectId projectId, String indexName) {
this.projectId = projectId;
this.indexName = indexName;
}
@ -57,6 +60,11 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
protected void doClose() {}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
public RepositoryMetadata getMetadata() {
return null;

View file

@ -15,6 +15,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.blobstore.BlobContainer;
@ -89,7 +90,15 @@ public class MockRepository extends FsRepository {
) {
return Collections.singletonMap(
"mock",
(metadata) -> new MockRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new MockRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
@ -176,6 +185,7 @@ public class MockRepository extends FsRepository {
private volatile boolean failOnDeleteContainer = false;
public MockRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -183,7 +193,15 @@ public class MockRepository extends FsRepository {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(overrideSettings(metadata, environment), environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(
projectId,
overrideSettings(metadata, environment),
environment,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
);
randomControlIOExceptionRate = metadata.settings().getAsDouble("random_control_io_exception_rate", 0.0);
randomDataFileIOExceptionRate = metadata.settings().getAsDouble("random_data_file_io_exception_rate", 0.0);
randomIOExceptionPattern = Pattern.compile(metadata.settings().get("random_io_exception_pattern", ".*")).asMatchPredicate();

View file

@ -375,7 +375,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
ClusterService clusterService,
RecoverySettings recoverySettings
) {
Repository.Factory repositoryFactory = (metadata) -> new CcrRepository(
Repository.Factory repositoryFactory = (projectId, metadata) -> new CcrRepository(
projectId,
metadata,
client,
settings,

View file

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
@ -138,6 +139,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private static final SnapshotId SNAPSHOT_ID = new SnapshotId(LATEST, LATEST);
private static final String IN_SYNC_ALLOCATION_ID = "ccr_restore";
private final ProjectId projectId;
private final RepositoryMetadata metadata;
private final CcrSettings ccrSettings;
private final String localClusterName;
@ -151,7 +153,15 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
private final SingleResultDeduplicator<ClusterState> csDeduplicator;
public CcrRepository(RepositoryMetadata metadata, Client client, Settings settings, CcrSettings ccrSettings, ThreadPool threadPool) {
public CcrRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Client client,
Settings settings,
CcrSettings ccrSettings,
ThreadPool threadPool
) {
this.projectId = projectId;
this.metadata = metadata;
this.ccrSettings = ccrSettings;
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings).value();
@ -180,6 +190,11 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
protected void doClose() {}
@Override
public ProjectId getProjectId() {
return projectId;
}
@Override
public RepositoryMetadata getMetadata() {
return metadata;

View file

@ -529,7 +529,7 @@ public class ShardFollowTaskReplicationTests extends ESIndexLevelReplicationTest
);
primaryShard.markAsRecovering("remote recovery from leader", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
primaryShard.restoreFromRepository(new RestoreOnlyRepository(index.getName()) {
primaryShard.restoreFromRepository(new RestoreOnlyRepository(randomProjectIdOrDefault(), index.getName()) {
@Override
public void restoreShard(
Store store,

View file

@ -142,7 +142,7 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
DiscoveryNode localNode = DiscoveryNodeUtils.builder("foo").roles(emptySet()).build();
target.markAsRecovering("store", new RecoveryState(routing, localNode, null));
final PlainActionFuture<Boolean> future = new PlainActionFuture<>();
target.restoreFromRepository(new RestoreOnlyRepository("test") {
target.restoreFromRepository(new RestoreOnlyRepository(randomProjectIdOrDefault(), "test") {
@Override
public void restoreShard(
Store store,

View file

@ -110,13 +110,17 @@ public class CcrRepositoryRetentionLeaseTests extends ESTestCase {
final ThreadPool threadPool = mock(ThreadPool.class);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
when(threadPool.getThreadContext()).thenReturn(threadContext);
return new CcrRepository(
final var projectId = randomProjectIdOrDefault();
final CcrRepository ccrRepository = new CcrRepository(
projectId,
repositoryMetadata,
mock(Client.class),
Settings.EMPTY,
new CcrSettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, settings)),
threadPool
);
assertThat(ccrRepository.getProjectId(), equalTo(projectId));
return ccrRepository;
}
public void testWhenRetentionLeaseExpiresBeforeWeCanRenewIt() {

View file

@ -20,6 +20,7 @@ import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
@ -253,19 +254,20 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
return new Repository.Factory() {
@Override
public Repository create(RepositoryMetadata metadata) {
public Repository create(ProjectId projectId, RepositoryMetadata metadata) {
throw new UnsupportedOperationException();
}
@Override
public Repository create(RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup) throws Exception {
public Repository create(ProjectId projectId, RepositoryMetadata metadata, Function<String, Repository.Factory> typeLookup)
throws Exception {
String delegateType = DELEGATE_TYPE.get(metadata.settings());
if (Strings.hasLength(delegateType) == false) {
throw new IllegalArgumentException(DELEGATE_TYPE.getKey() + " must be set");
}
Repository.Factory factory = typeLookup.apply(delegateType);
return new SourceOnlySnapshotRepository(
factory.create(new RepositoryMetadata(metadata.name(), delegateType, metadata.settings()), typeLookup)
factory.create(projectId, new RepositoryMetadata(metadata.name(), delegateType, metadata.settings()), typeLookup)
);
}
};

View file

@ -28,6 +28,7 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@ -91,6 +92,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
import static org.hamcrest.Matchers.equalTo;
public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
@ -120,7 +122,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
final var projectId = randomProjectIdOrDefault();
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(new ShardGeneration(-1L));
@ -169,7 +173,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
recoverShardFromStore(shard);
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
final var projectId = randomProjectIdOrDefault();
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(new ShardGeneration(-1L));
@ -208,7 +214,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
final var projectId = randomProjectIdOrDefault();
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
int totalFileCount;
ShardGeneration shardGeneration;
@ -335,7 +343,9 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
SnapshotId snapshotId = new SnapshotId("test", "test");
IndexId indexId = new IndexId(shard.shardId().getIndexName(), shard.shardId().getIndex().getUUID());
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository());
final var projectId = randomProjectIdOrDefault();
SourceOnlySnapshotRepository repository = new SourceOnlySnapshotRepository(createRepository(projectId));
assertThat(repository.getProjectId(), equalTo(projectId));
repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
@ -557,11 +567,12 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
}
/** Create a {@link Repository} with a random name **/
private Repository createRepository() {
private Repository createRepository(ProjectId projectId) {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetadata repositoryMetadata = new RepositoryMetadata(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetadata);
final Repository repository = new FsRepository(
projectId,
repositoryMetadata,
createEnvironment(),
xContentRegistry(),

View file

@ -8,6 +8,7 @@
package org.elasticsearch.xpack.lucene.bwc;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -66,13 +67,22 @@ public abstract class AbstractArchiveTestCase extends AbstractSnapshotIntegTestC
) {
return Map.of(
FAKE_VERSIONS_TYPE,
metadata -> new FakeVersionsRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FakeVersionsRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
// fakes an old index version format to activate license checks
private static class FakeVersionsRepo extends FsRepository {
FakeVersionsRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment env,
NamedXContentRegistry namedXContentRegistry,
@ -80,7 +90,7 @@ public abstract class AbstractArchiveTestCase extends AbstractSnapshotIntegTestC
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
@Override

View file

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.DiskUsageIntegTestCase;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -380,7 +381,15 @@ public class SearchableSnapshotDiskThresholdIntegTests extends DiskUsageIntegTes
) {
return Collections.singletonMap(
TYPE,
metadata -> new CustomMockRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new CustomMockRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}
@ -390,6 +399,7 @@ public class SearchableSnapshotDiskThresholdIntegTests extends DiskUsageIntegTes
private static final CountDownLatch RESTORE_SHARD_LATCH = new CountDownLatch(1);
public CustomMockRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -397,7 +407,7 @@ public class SearchableSnapshotDiskThresholdIntegTests extends DiskUsageIntegTes
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
private void unlockRestore() {

View file

@ -454,7 +454,15 @@ public class SearchableSnapshotsPrewarmingIntegTests extends ESSingleNodeTestCas
) {
return Collections.singletonMap(
"tracking",
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {
(projectId, metadata) -> new FsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
) {
@Override
protected BlobStore createBlobStore() throws Exception {

View file

@ -248,7 +248,15 @@ public class SearchableSnapshotRecoveryStateIntegrationTests extends BaseSearcha
) {
return Collections.singletonMap(
"test-fs",
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}

View file

@ -593,6 +593,7 @@ public class SearchableSnapshotDirectoryTests extends AbstractSearchableSnapshot
);
final BlobStoreRepository repository = new FsRepository(
randomProjectIdOrDefault(),
repositoryMetadata,
new Environment(
Settings.builder()

View file

@ -12,6 +12,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRe
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -119,15 +120,24 @@ public class SLMHealthBlockedSnapshotIT extends AbstractSnapshotIntegTestCase {
) {
return Map.of(
TestDelayedRepo.TYPE,
metadata -> new TestDelayedRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings, () -> {
if (doDelay.get()) {
try {
assertTrue(delayedRepoLatch.await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
throw new RuntimeException(e);
(projectId, metadata) -> new TestDelayedRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings,
() -> {
if (doDelay.get()) {
try {
assertTrue(delayedRepoLatch.await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
})
)
);
}
}
@ -137,6 +147,7 @@ public class SLMHealthBlockedSnapshotIT extends AbstractSnapshotIntegTestCase {
private final Runnable delayFn;
protected TestDelayedRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment env,
NamedXContentRegistry namedXContentRegistry,
@ -145,7 +156,7 @@ public class SLMHealthBlockedSnapshotIT extends AbstractSnapshotIntegTestCase {
RecoverySettings recoverySettings,
Runnable delayFn
) {
super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.delayFn = delayFn;
}

View file

@ -19,6 +19,7 @@ import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.FollowersChecker;
import org.elasticsearch.cluster.coordination.LagDetector;
import org.elasticsearch.cluster.coordination.LeaderChecker;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
@ -137,16 +138,25 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
) {
return Map.of(
TestDelayedRepo.TYPE,
metadata -> new TestDelayedRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings, () -> {
// Only delay the first request
if (doDelay.getAndSet(false)) {
try {
assertTrue(delayedRepoLatch.await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
throw new RuntimeException(e);
(projectId, metadata) -> new TestDelayedRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings,
() -> {
// Only delay the first request
if (doDelay.getAndSet(false)) {
try {
assertTrue(delayedRepoLatch.await(1, TimeUnit.MINUTES));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
})
)
);
}
}
@ -156,6 +166,7 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
private final Runnable delayFn;
protected TestDelayedRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment env,
NamedXContentRegistry namedXContentRegistry,
@ -164,7 +175,7 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
RecoverySettings recoverySettings,
Runnable delayFn
) {
super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.delayFn = delayFn;
}
@ -199,7 +210,8 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
) {
return Map.of(
TestRestartBeforeListenersRepo.TYPE,
metadata -> new TestRestartBeforeListenersRepo(
(projectId, metadata) -> new TestRestartBeforeListenersRepo(
projectId,
metadata,
env,
namedXContentRegistry,
@ -223,6 +235,7 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
private final Runnable beforeResponseRunnable;
protected TestRestartBeforeListenersRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment env,
NamedXContentRegistry namedXContentRegistry,
@ -231,7 +244,7 @@ public class SLMStatDisruptionIT extends AbstractSnapshotIntegTestCase {
RecoverySettings recoverySettings,
Runnable beforeResponseRunnable
) {
super(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.beforeResponseRunnable = beforeResponseRunnable;
}

View file

@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -151,11 +152,35 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
) {
return Map.of(
FAULTY_TYPE,
metadata -> new FaultyRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings),
(projectId, metadata) -> new FaultyRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
),
INSTRUMENTED_TYPE,
metadata -> new InstrumentedRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings),
(projectId, metadata) -> new InstrumentedRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
),
FILTER_TYPE,
metadata -> new FilterFsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new FilterFsRepository(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
}
@ -164,6 +189,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
AtomicLong totalBytesRead = new AtomicLong();
public InstrumentedRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -171,7 +197,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
@Override
@ -206,6 +232,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
public static class FaultyRepository extends FsRepository {
public FaultyRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -213,7 +240,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
@Override
@ -261,6 +288,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
static final AtomicReference<BiFunction<String, InputStream, InputStream>> delegateSupplierRef = new AtomicReference<>(IDENTITY);
public FilterFsRepository(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -268,7 +296,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
}
static void wrapReadBlobMethod(BiFunction<String, InputStream, InputStream> delegate) {

View file

@ -9,6 +9,7 @@ package org.elasticsearch.repositories.blobstore.testkit.analyze;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ReferenceDocs;
@ -545,7 +546,8 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
) {
return Map.of(
DISRUPTABLE_REPO_TYPE,
metadata -> new DisruptableRepository(
(projectId, metadata) -> new DisruptableRepository(
projectId,
metadata,
namedXContentRegistry,
clusterService,
@ -562,6 +564,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
private final AtomicReference<BlobStore> blobStoreRef = new AtomicReference<>();
DisruptableRepository(
ProjectId projectId,
RepositoryMetadata metadata,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
@ -569,7 +572,7 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
RecoverySettings recoverySettings,
BlobPath basePath
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
}
void setBlobStore(BlobStore blobStore) {

View file

@ -8,6 +8,7 @@
package org.elasticsearch.repositories.blobstore.testkit.analyze;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -187,7 +188,8 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
) {
return Map.of(
ASSERTING_REPO_TYPE,
metadata -> new AssertingRepository(
(projectId, metadata) -> new AssertingRepository(
projectId,
metadata,
namedXContentRegistry,
clusterService,
@ -217,6 +219,7 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
private final AtomicReference<BlobStore> blobStoreRef = new AtomicReference<>();
AssertingRepository(
ProjectId projectId,
RepositoryMetadata metadata,
NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService,
@ -224,7 +227,7 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
RecoverySettings recoverySettings,
BlobPath basePath
) {
super(metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
super(projectId, metadata, namedXContentRegistry, clusterService, bigArrays, recoverySettings, basePath);
}
void setBlobStore(BlobStore blobStore) {

View file

@ -10,6 +10,7 @@ import org.elasticsearch.action.admin.cluster.repositories.verify.VerifyReposito
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@ -277,7 +278,15 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
) {
return Collections.singletonMap(
"verifyaccess-fs",
(metadata) -> new AccessVerifyingRepo(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings)
(projectId, metadata) -> new AccessVerifyingRepo(
projectId,
metadata,
env,
namedXContentRegistry,
clusterService,
bigArrays,
recoverySettings
)
);
}
@ -286,6 +295,7 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
private final ClusterService clusterService;
private AccessVerifyingRepo(
ProjectId projectId,
RepositoryMetadata metadata,
Environment environment,
NamedXContentRegistry namedXContentRegistry,
@ -293,7 +303,7 @@ public class VotingOnlyNodePluginTests extends ESIntegTestCase {
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
super(metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
super(projectId, metadata, environment, namedXContentRegistry, clusterService, bigArrays, recoverySettings);
this.clusterService = clusterService;
}