mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Passing in project-id when creating s3 client (#129301)
Enables creating different s3 clients for different projects Relates: #127631
This commit is contained in:
parent
a0b8737380
commit
92b32b535b
8 changed files with 24 additions and 37 deletions
|
@ -18,6 +18,7 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
|
||||||
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
|
||||||
|
|
||||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
import org.elasticsearch.common.blobstore.OptionalBytesReference;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
@ -147,7 +148,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
||||||
// construct our own repo instance so we can inject a threadpool that allows to control the passage of time
|
// construct our own repo instance so we can inject a threadpool that allows to control the passage of time
|
||||||
try (
|
try (
|
||||||
var repository = new S3Repository(
|
var repository = new S3Repository(
|
||||||
randomProjectIdOrDefault(),
|
ProjectId.DEFAULT,
|
||||||
node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(),
|
node().injector().getInstance(RepositoriesService.class).repository(TEST_REPO_NAME).getMetadata(),
|
||||||
xContentRegistry(),
|
xContentRegistry(),
|
||||||
node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).findFirst().get().getService(),
|
node().injector().getInstance(PluginsService.class).filterPlugins(S3RepositoryPlugin.class).findFirst().get().getService(),
|
||||||
|
|
|
@ -140,7 +140,7 @@ class S3BlobStore implements BlobStore {
|
||||||
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
|
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
|
||||||
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
|
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
|
||||||
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
|
this.getRegisterRetryDelay = S3Repository.GET_REGISTER_RETRY_DELAY.get(repositoryMetadata.settings());
|
||||||
this.addPurposeCustomQueryParameter = service.settings(repositoryMetadata).addPurposeCustomQueryParameter;
|
this.addPurposeCustomQueryParameter = service.settings(projectId, repositoryMetadata).addPurposeCustomQueryParameter;
|
||||||
}
|
}
|
||||||
|
|
||||||
MetricPublisher getMetricPublisher(Operation operation, OperationPurpose purpose) {
|
MetricPublisher getMetricPublisher(Operation operation, OperationPurpose purpose) {
|
||||||
|
@ -263,12 +263,11 @@ class S3BlobStore implements BlobStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
public AmazonS3Reference clientReference() {
|
public AmazonS3Reference clientReference() {
|
||||||
// TODO: use service.client(ProjectId, RepositoryMetadata), see https://github.com/elastic/elasticsearch/pull/127631
|
return service.client(projectId, repositoryMetadata);
|
||||||
return service.client(repositoryMetadata);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final int getMaxRetries() {
|
final int getMaxRetries() {
|
||||||
return service.settings(repositoryMetadata).maxRetries;
|
return service.settings(projectId, repositoryMetadata).maxRetries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String bucket() {
|
public String bucket() {
|
||||||
|
@ -441,7 +440,7 @@ class S3BlobStore implements BlobStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
service.onBlobStoreClose();
|
service.onBlobStoreClose(projectId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||||
import org.elasticsearch.core.FixForMultiProject;
|
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.core.Releasable;
|
import org.elasticsearch.core.Releasable;
|
||||||
import org.elasticsearch.core.Releasables;
|
import org.elasticsearch.core.Releasables;
|
||||||
|
@ -157,15 +156,6 @@ class S3Service extends AbstractLifecycleComponent {
|
||||||
s3ClientsManager.refreshAndClearCacheForClusterClients(clientsSettings);
|
s3ClientsManager.refreshAndClearCacheForClusterClients(clientsSettings);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Attempts to retrieve a client by its repository metadata and settings from the cache.
|
|
||||||
* If the client does not exist it will be created.
|
|
||||||
*/
|
|
||||||
@FixForMultiProject(description = "can be removed once blobstore is project aware")
|
|
||||||
public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
|
|
||||||
return client(ProjectId.DEFAULT, repositoryMetadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Attempts to retrieve either a cluster or project client from the client manager. Throws if project-id or
|
* Attempts to retrieve either a cluster or project client from the client manager. Throws if project-id or
|
||||||
* the client name does not exist. The client maybe initialized lazily.
|
* the client name does not exist. The client maybe initialized lazily.
|
||||||
|
@ -196,11 +186,6 @@ class S3Service extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@FixForMultiProject(description = "can be removed once blobstore is project aware")
|
|
||||||
S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
|
|
||||||
return settings(ProjectId.DEFAULT, repositoryMetadata);
|
|
||||||
}
|
|
||||||
|
|
||||||
S3ClientSettings settings(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
|
S3ClientSettings settings(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
|
||||||
return s3ClientsManager.settingsForClient(effectiveProjectId(projectId), repositoryMetadata);
|
return s3ClientsManager.settingsForClient(effectiveProjectId(projectId), repositoryMetadata);
|
||||||
}
|
}
|
||||||
|
@ -420,11 +405,6 @@ class S3Service extends AbstractLifecycleComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@FixForMultiProject(description = "can be removed once blobstore is project aware")
|
|
||||||
public void onBlobStoreClose() {
|
|
||||||
onBlobStoreClose(ProjectId.DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Release clients for the specified project.
|
* Release clients for the specified project.
|
||||||
* @param projectId The project associated with the client, or null if the client is cluster level
|
* @param projectId The project associated with the client, or null if the client is cluster level
|
||||||
|
|
|
@ -21,6 +21,7 @@ import software.amazon.awssdk.regions.Region;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||||
import org.elasticsearch.common.settings.MockSecureSettings;
|
import org.elasticsearch.common.settings.MockSecureSettings;
|
||||||
|
@ -246,7 +247,10 @@ public class AwsS3ServiceImplTests extends ESTestCase {
|
||||||
s3Service.start();
|
s3Service.start();
|
||||||
final String endpointOverride = "http://first";
|
final String endpointOverride = "http://first";
|
||||||
final Settings settings = Settings.builder().put("endpoint", endpointOverride).build();
|
final Settings settings = Settings.builder().put("endpoint", endpointOverride).build();
|
||||||
final AmazonS3Reference reference = s3Service.client(new RepositoryMetadata("first", "s3", settings));
|
final AmazonS3Reference reference = s3Service.client(
|
||||||
|
randomFrom(ProjectId.DEFAULT, null),
|
||||||
|
new RepositoryMetadata("first", "s3", settings)
|
||||||
|
);
|
||||||
|
|
||||||
assertEquals(endpointOverride, reference.client().serviceClientConfiguration().endpointOverride().get().toString());
|
assertEquals(endpointOverride, reference.client().serviceClientConfiguration().endpointOverride().get().toString());
|
||||||
assertEquals("es-test-region", reference.client().serviceClientConfiguration().region().toString());
|
assertEquals("es-test-region", reference.client().serviceClientConfiguration().region().toString());
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.apache.http.conn.ConnectionPoolTimeoutException;
|
||||||
import org.apache.http.conn.DnsResolver;
|
import org.apache.http.conn.DnsResolver;
|
||||||
import org.apache.logging.log4j.Level;
|
import org.apache.logging.log4j.Level;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||||
import org.elasticsearch.common.BackoffPolicy;
|
import org.elasticsearch.common.BackoffPolicy;
|
||||||
|
@ -239,7 +240,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
|
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
|
||||||
|
|
||||||
final S3BlobStore s3BlobStore = new S3BlobStore(
|
final S3BlobStore s3BlobStore = new S3BlobStore(
|
||||||
randomProjectIdOrDefault(),
|
ProjectId.DEFAULT,
|
||||||
service,
|
service,
|
||||||
"bucket",
|
"bucket",
|
||||||
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
|
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.MockBigArrays;
|
import org.elasticsearch.common.util.MockBigArrays;
|
||||||
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
|
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
|
||||||
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
|
@ -71,7 +72,7 @@ public class S3RepositoryTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AmazonS3Reference client(RepositoryMetadata repositoryMetadata) {
|
public AmazonS3Reference client(@Nullable ProjectId projectId, RepositoryMetadata repositoryMetadata) {
|
||||||
return new AmazonS3Reference(new DummyS3Client(), mock(SdkHttpClient.class));
|
return new AmazonS3Reference(new DummyS3Client(), mock(SdkHttpClient.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import software.amazon.awssdk.services.s3.endpoints.internal.DefaultS3EndpointPr
|
||||||
import software.amazon.awssdk.services.s3.model.S3Exception;
|
import software.amazon.awssdk.services.s3.model.S3Exception;
|
||||||
|
|
||||||
import org.apache.logging.log4j.Level;
|
import org.apache.logging.log4j.Level;
|
||||||
|
import org.elasticsearch.cluster.metadata.ProjectId;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
import org.elasticsearch.cluster.project.TestProjectResolvers;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
@ -53,17 +54,17 @@ public class S3ServiceTests extends ESTestCase {
|
||||||
final Settings settings = Settings.builder().put("endpoint", "http://first").build();
|
final Settings settings = Settings.builder().put("endpoint", "http://first").build();
|
||||||
final RepositoryMetadata metadata1 = new RepositoryMetadata("first", "s3", settings);
|
final RepositoryMetadata metadata1 = new RepositoryMetadata("first", "s3", settings);
|
||||||
final RepositoryMetadata metadata2 = new RepositoryMetadata("second", "s3", settings);
|
final RepositoryMetadata metadata2 = new RepositoryMetadata("second", "s3", settings);
|
||||||
final S3ClientSettings clientSettings = s3Service.settings(metadata2);
|
final S3ClientSettings clientSettings = s3Service.settings(ProjectId.DEFAULT, metadata2);
|
||||||
final S3ClientSettings otherClientSettings = s3Service.settings(metadata2);
|
final S3ClientSettings otherClientSettings = s3Service.settings(ProjectId.DEFAULT, metadata2);
|
||||||
assertSame(clientSettings, otherClientSettings);
|
assertSame(clientSettings, otherClientSettings);
|
||||||
final AmazonS3Reference reference = s3Service.client(metadata1);
|
final AmazonS3Reference reference = s3Service.client(randomFrom(ProjectId.DEFAULT, null), metadata1);
|
||||||
reference.close();
|
reference.close();
|
||||||
s3Service.onBlobStoreClose();
|
s3Service.onBlobStoreClose(ProjectId.DEFAULT);
|
||||||
final AmazonS3Reference referenceReloaded = s3Service.client(metadata1);
|
final AmazonS3Reference referenceReloaded = s3Service.client(randomFrom(ProjectId.DEFAULT, null), metadata1);
|
||||||
assertNotSame(referenceReloaded, reference);
|
assertNotSame(referenceReloaded, reference);
|
||||||
referenceReloaded.close();
|
referenceReloaded.close();
|
||||||
s3Service.onBlobStoreClose();
|
s3Service.onBlobStoreClose(ProjectId.DEFAULT);
|
||||||
final S3ClientSettings clientSettingsReloaded = s3Service.settings(metadata1);
|
final S3ClientSettings clientSettingsReloaded = s3Service.settings(ProjectId.DEFAULT, metadata1);
|
||||||
assertNotSame(clientSettings, clientSettingsReloaded);
|
assertNotSame(clientSettings, clientSettingsReloaded);
|
||||||
s3Service.close();
|
s3Service.close();
|
||||||
}
|
}
|
||||||
|
|
|
@ -2743,7 +2743,7 @@ public abstract class ESTestCase extends LuceneTestCase {
|
||||||
future.run();
|
future.run();
|
||||||
} else {
|
} else {
|
||||||
threads[i] = new Thread(future);
|
threads[i] = new Thread(future);
|
||||||
threads[i].setName("runInParallel-T#" + i);
|
threads[i].setName("TEST-runInParallel-T#" + i);
|
||||||
threads[i].start();
|
threads[i].start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue