mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Allow overriding blob container path in tests (#126391)
Some `AbstractBlobContainerRetriesTestCase#createBlobContainer` implementations choose a path for the container randomly, but we have a need for a test which re-creates the same container against a different `S3Service` and `BlobStore` and must therefore specify the same path each time. This commit exposes a parameter that lets callers specify a container path.
This commit is contained in:
parent
5dc7ab77b3
commit
fbbbdd7eec
4 changed files with 47 additions and 35 deletions
|
@ -127,7 +127,8 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final @Nullable TimeValue readTimeout,
|
final @Nullable TimeValue readTimeout,
|
||||||
final @Nullable Boolean disableChunkedEncoding,
|
final @Nullable Boolean disableChunkedEncoding,
|
||||||
final @Nullable ByteSizeValue bufferSize,
|
final @Nullable ByteSizeValue bufferSize,
|
||||||
final @Nullable Integer maxBulkDeletes
|
final @Nullable Integer maxBulkDeletes,
|
||||||
|
final @Nullable BlobPath blobContainerPath
|
||||||
) {
|
) {
|
||||||
final Settings.Builder clientSettings = Settings.builder();
|
final Settings.Builder clientSettings = Settings.builder();
|
||||||
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||||
|
@ -207,7 +208,10 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
new GcsRepositoryStatsCollector()
|
new GcsRepositoryStatsCollector()
|
||||||
);
|
);
|
||||||
|
|
||||||
return new GoogleCloudStorageBlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), blobStore);
|
return new GoogleCloudStorageBlobContainer(
|
||||||
|
Objects.requireNonNullElse(blobContainerPath, randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo")),
|
||||||
|
blobStore
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testShouldRetryOnConnectionRefused() {
|
public void testShouldRetryOnConnectionRefused() {
|
||||||
|
@ -224,7 +228,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
|
|
||||||
private void executeListBlobsAndAssertRetries() {
|
private void executeListBlobsAndAssertRetries() {
|
||||||
final int maxRetries = randomIntBetween(3, 5);
|
final int maxRetries = randomIntBetween(3, 5);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose()));
|
expectThrows(StorageException.class, () -> blobContainer.listBlobs(randomPurpose()));
|
||||||
assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get());
|
assertEquals(maxRetries + 1, requestCounters.get("/storage/v1/b/bucket/o").get());
|
||||||
}
|
}
|
||||||
|
@ -233,7 +237,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final int maxRetries = randomIntBetween(2, 10);
|
final int maxRetries = randomIntBetween(2, 10);
|
||||||
final AtomicInteger countDown = new AtomicInteger(maxRetries);
|
final AtomicInteger countDown = new AtomicInteger(maxRetries);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
|
|
||||||
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
|
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
|
||||||
final byte[] bytes = randomBytes(1 << 22);
|
final byte[] bytes = randomBytes(1 << 22);
|
||||||
|
@ -262,7 +266,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final int maxRetries = randomIntBetween(2, 10);
|
final int maxRetries = randomIntBetween(2, 10);
|
||||||
final CountDown countDown = new CountDown(maxRetries);
|
final CountDown countDown = new CountDown(maxRetries);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
||||||
|
@ -304,7 +308,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
public void testWriteBlobWithReadTimeouts() {
|
public void testWriteBlobWithReadTimeouts() {
|
||||||
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
||||||
|
@ -356,7 +360,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
||||||
|
|
||||||
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
||||||
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null, null);
|
||||||
|
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
|
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
|
||||||
|
@ -503,7 +507,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
return Integer.toString(totalDeletesSent++);
|
return Integer.toString(totalDeletesSent++);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null, null);
|
||||||
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
|
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
|
||||||
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
|
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
|
||||||
|
|
||||||
|
@ -539,7 +543,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));
|
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));
|
||||||
|
|
||||||
final int maxRetries = randomIntBetween(1, 3);
|
final int maxRetries = randomIntBetween(1, 3);
|
||||||
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||||
final String key = randomIdentifier();
|
final String key = randomIdentifier();
|
||||||
|
|
||||||
|
|
|
@ -164,7 +164,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final @Nullable TimeValue readTimeout,
|
final @Nullable TimeValue readTimeout,
|
||||||
final @Nullable Boolean disableChunkedEncoding,
|
final @Nullable Boolean disableChunkedEncoding,
|
||||||
final @Nullable ByteSizeValue bufferSize,
|
final @Nullable ByteSizeValue bufferSize,
|
||||||
final @Nullable Integer maxBulkDeletes
|
final @Nullable Integer maxBulkDeletes,
|
||||||
|
final @Nullable BlobPath blobContainerPath
|
||||||
) {
|
) {
|
||||||
final Settings.Builder clientSettings = Settings.builder();
|
final Settings.Builder clientSettings = Settings.builder();
|
||||||
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||||
|
@ -216,7 +217,10 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)),
|
new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)),
|
||||||
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(1), MAX_NUMBER_SNAPSHOT_DELETE_RETRIES)
|
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(1), MAX_NUMBER_SNAPSHOT_DELETE_RETRIES)
|
||||||
);
|
);
|
||||||
return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) {
|
return new S3BlobContainer(
|
||||||
|
Objects.requireNonNullElse(blobContainerPath, randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo")),
|
||||||
|
s3BlobStore
|
||||||
|
) {
|
||||||
@Override
|
@Override
|
||||||
public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
|
public InputStream readBlob(OperationPurpose purpose, String blobName) throws IOException {
|
||||||
return new AssertingInputStream(new S3RetryingInputStream(purpose, s3BlobStore, buildKey(blobName)) {
|
return new AssertingInputStream(new S3RetryingInputStream(purpose, s3BlobStore, buildKey(blobName)) {
|
||||||
|
@ -261,7 +265,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final int maxRetries = randomInt(5);
|
final int maxRetries = randomInt(5);
|
||||||
final CountDown countDown = new CountDown(maxRetries + 1);
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
|
||||||
|
@ -309,7 +313,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
public void testWriteBlobWithReadTimeouts() {
|
public void testWriteBlobWithReadTimeouts() {
|
||||||
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
|
||||||
|
@ -343,7 +347,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final boolean useTimeout = rarely();
|
final boolean useTimeout = rarely();
|
||||||
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
||||||
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
||||||
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
|
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
|
||||||
|
|
||||||
final int parts = randomIntBetween(1, 5);
|
final int parts = randomIntBetween(1, 5);
|
||||||
final long lastPartSize = randomLongBetween(10, 512);
|
final long lastPartSize = randomLongBetween(10, 512);
|
||||||
|
@ -439,7 +443,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final boolean useTimeout = rarely();
|
final boolean useTimeout = rarely();
|
||||||
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
||||||
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
||||||
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
|
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null, null);
|
||||||
|
|
||||||
final int parts = randomIntBetween(1, 5);
|
final int parts = randomIntBetween(1, 5);
|
||||||
final long lastPartSize = randomLongBetween(10, 512);
|
final long lastPartSize = randomLongBetween(10, 512);
|
||||||
|
@ -548,7 +552,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
|
||||||
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
|
@ -621,7 +625,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
|
|
||||||
|
@ -659,7 +663,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null, null);
|
||||||
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent(512);
|
final byte[] bytes = randomBlobContent(512);
|
||||||
|
@ -752,7 +756,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testDoesNotRetryOnNotFound() {
|
public void testDoesNotRetryOnNotFound() {
|
||||||
final int maxRetries = between(3, 5);
|
final int maxRetries = between(3, 5);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null, null);
|
||||||
|
|
||||||
final AtomicInteger numberOfReads = new AtomicInteger(0);
|
final AtomicInteger numberOfReads = new AtomicInteger(0);
|
||||||
@SuppressForbidden(reason = "use a http server")
|
@SuppressForbidden(reason = "use a http server")
|
||||||
|
@ -784,7 +788,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
|
public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -804,7 +808,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
|
public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -841,7 +845,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testNonSnapshotDeletesAreNotRetried() {
|
public void testNonSnapshotDeletesAreNotRetried() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -870,7 +874,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testNonThrottlingErrorsAreNotRetried() {
|
public void testNonThrottlingErrorsAreNotRetried() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -949,7 +953,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testGetRegisterRetries() {
|
public void testGetRegisterRetries() {
|
||||||
final var maxRetries = between(0, 3);
|
final var maxRetries = between(0, 3);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
|
|
||||||
interface FailingHandlerFactory {
|
interface FailingHandlerFactory {
|
||||||
void addHandler(String blobName, Integer... responseCodes);
|
void addHandler(String blobName, Integer... responseCodes);
|
||||||
|
@ -1019,7 +1023,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
public void testSuppressedDeletionErrorsAreCapped() {
|
public void testSuppressedDeletionErrorsAreCapped() {
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
int maxBulkDeleteSize = randomIntBetween(1, 10);
|
int maxBulkDeleteSize = randomIntBetween(1, 10);
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
|
||||||
httpServer.createContext("/", exchange -> {
|
httpServer.createContext("/", exchange -> {
|
||||||
if (isMultiDeleteRequest(exchange)) {
|
if (isMultiDeleteRequest(exchange)) {
|
||||||
exchange.sendResponseHeaders(
|
exchange.sendResponseHeaders(
|
||||||
|
@ -1051,7 +1055,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() {
|
public void testTrimmedLogAndCappedSuppressedErrorOnMultiObjectDeletionException() {
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
int maxBulkDeleteSize = randomIntBetween(10, 30);
|
int maxBulkDeleteSize = randomIntBetween(10, 30);
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize, null);
|
||||||
|
|
||||||
final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
|
final Pattern pattern = Pattern.compile("<Key>(.+?)</Key>");
|
||||||
httpServer.createContext("/", exchange -> {
|
httpServer.createContext("/", exchange -> {
|
||||||
|
|
|
@ -29,6 +29,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.MalformedURLException;
|
import java.net.MalformedURLException;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.either;
|
import static org.hamcrest.Matchers.either;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
@ -77,7 +78,8 @@ public class URLBlobContainerRetriesTests extends AbstractBlobContainerRetriesTe
|
||||||
TimeValue readTimeout,
|
TimeValue readTimeout,
|
||||||
Boolean disableChunkedEncoding,
|
Boolean disableChunkedEncoding,
|
||||||
ByteSizeValue bufferSize,
|
ByteSizeValue bufferSize,
|
||||||
Integer maxBulkDeletes
|
Integer maxBulkDeletes,
|
||||||
|
BlobPath blobContainerPath
|
||||||
) {
|
) {
|
||||||
Settings.Builder settingsBuilder = Settings.builder();
|
Settings.Builder settingsBuilder = Settings.builder();
|
||||||
|
|
||||||
|
@ -98,7 +100,7 @@ public class URLBlobContainerRetriesTests extends AbstractBlobContainerRetriesTe
|
||||||
factory.create(httpClientSettings),
|
factory.create(httpClientSettings),
|
||||||
httpClientSettings
|
httpClientSettings
|
||||||
);
|
);
|
||||||
return urlBlobStore.blobContainer(BlobPath.EMPTY);
|
return urlBlobStore.blobContainer(Objects.requireNonNullElse(blobContainerPath, BlobPath.EMPTY));
|
||||||
} catch (MalformedURLException e) {
|
} catch (MalformedURLException e) {
|
||||||
throw new RuntimeException("Unable to create URLBlobStore", e);
|
throw new RuntimeException("Unable to create URLBlobStore", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@ import com.sun.net.httpserver.HttpServer;
|
||||||
import org.apache.http.ConnectionClosedException;
|
import org.apache.http.ConnectionClosedException;
|
||||||
import org.apache.http.HttpStatus;
|
import org.apache.http.HttpStatus;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.OperationPurpose;
|
import org.elasticsearch.common.blobstore.OperationPurpose;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
|
@ -83,7 +84,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
@Nullable TimeValue readTimeout,
|
@Nullable TimeValue readTimeout,
|
||||||
@Nullable Boolean disableChunkedEncoding,
|
@Nullable Boolean disableChunkedEncoding,
|
||||||
@Nullable ByteSizeValue bufferSize,
|
@Nullable ByteSizeValue bufferSize,
|
||||||
@Nullable Integer maxBulkDeletes
|
@Nullable Integer maxBulkDeletes,
|
||||||
|
@Nullable BlobPath blobContainerPath
|
||||||
);
|
);
|
||||||
|
|
||||||
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
|
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
|
||||||
|
@ -92,7 +94,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
||||||
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null, null);
|
||||||
final long position = randomLongBetween(0, MAX_RANGE_VAL);
|
final long position = randomLongBetween(0, MAX_RANGE_VAL);
|
||||||
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
||||||
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
||||||
|
@ -119,7 +121,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
|
@ -176,7 +178,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
final CountDown countDown = new CountDown(maxRetries + 1);
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
||||||
|
|
||||||
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
|
@ -248,7 +250,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
public void testReadBlobWithReadTimeouts() {
|
public void testReadBlobWithReadTimeouts() {
|
||||||
final int maxRetries = randomInt(5);
|
final int maxRetries = randomInt(5);
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
|
||||||
|
@ -305,7 +307,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
public void testReadBlobWithNoHttpResponse() {
|
public void testReadBlobWithNoHttpResponse() {
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
||||||
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null, null);
|
||||||
|
|
||||||
// HTTP server closes connection immediately
|
// HTTP server closes connection immediately
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
|
||||||
|
@ -325,7 +327,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
public void testReadBlobWithPrematureConnectionClose() {
|
public void testReadBlobWithPrematureConnectionClose() {
|
||||||
final int maxRetries = randomInt(20);
|
final int maxRetries = randomInt(20);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null, null);
|
||||||
|
|
||||||
final boolean alwaysFlushBody = randomBoolean();
|
final boolean alwaysFlushBody = randomBoolean();
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue