mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Limit number of suppressed S3 deletion errors (#123630)
We've seen this being an issue on 7.x although can happen on all versions (I'm pretty sure this PR doesn't cleanly back-port to 7.x though). Closes https://github.com/elastic/elasticsearch/issues/123354
This commit is contained in:
parent
a320809843
commit
113f0c17cc
6 changed files with 104 additions and 49 deletions
6
docs/changelog/123630.yaml
Normal file
6
docs/changelog/123630.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 123630
|
||||||
|
summary: Limit number of suppressed S3 deletion errors
|
||||||
|
area: Snapshot/Restore
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 123354
|
|
@ -115,7 +115,8 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final @Nullable Integer maxRetries,
|
final @Nullable Integer maxRetries,
|
||||||
final @Nullable TimeValue readTimeout,
|
final @Nullable TimeValue readTimeout,
|
||||||
final @Nullable Boolean disableChunkedEncoding,
|
final @Nullable Boolean disableChunkedEncoding,
|
||||||
final @Nullable ByteSizeValue bufferSize
|
final @Nullable ByteSizeValue bufferSize,
|
||||||
|
final @Nullable Integer maxBulkDeletes
|
||||||
) {
|
) {
|
||||||
final Settings.Builder clientSettings = Settings.builder();
|
final Settings.Builder clientSettings = Settings.builder();
|
||||||
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
final String client = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||||
|
@ -176,7 +177,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final int maxRetries = randomIntBetween(2, 10);
|
final int maxRetries = randomIntBetween(2, 10);
|
||||||
final AtomicInteger countDown = new AtomicInteger(maxRetries);
|
final AtomicInteger countDown = new AtomicInteger(maxRetries);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
||||||
|
|
||||||
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
|
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
|
||||||
final byte[] bytes = randomBytes(1 << 22);
|
final byte[] bytes = randomBytes(1 << 22);
|
||||||
|
@ -205,7 +206,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
final int maxRetries = randomIntBetween(2, 10);
|
final int maxRetries = randomIntBetween(2, 10);
|
||||||
final CountDown countDown = new CountDown(maxRetries);
|
final CountDown countDown = new CountDown(maxRetries);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
assertThat(exchange.getRequestURI().getQuery(), containsString("uploadType=multipart"));
|
||||||
|
@ -247,7 +248,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
public void testWriteBlobWithReadTimeouts() {
|
public void testWriteBlobWithReadTimeouts() {
|
||||||
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", exchange -> {
|
||||||
|
@ -300,7 +301,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
logger.debug("starting with resumable upload id [{}]", sessionUploadId.get());
|
||||||
|
|
||||||
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
final TimeValue readTimeout = allowReadTimeout.get() ? TimeValue.timeValueSeconds(3) : null;
|
||||||
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(nbErrors + 1, readTimeout, null, null, null);
|
||||||
|
|
||||||
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
httpServer.createContext("/upload/storage/v1/b/bucket/o", safeHandler(exchange -> {
|
||||||
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
|
final BytesReference requestBody = Streams.readFully(exchange.getRequestBody());
|
||||||
|
@ -440,7 +441,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
return Integer.toString(totalDeletesSent++);
|
return Integer.toString(totalDeletesSent++);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(1, null, null, null, null);
|
||||||
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
|
httpServer.createContext("/batch/storage/v1", safeHandler(exchange -> {
|
||||||
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
|
assert pendingDeletes.get() <= MAX_DELETES_PER_BATCH;
|
||||||
|
|
||||||
|
@ -476,7 +477,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
|
||||||
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));
|
httpServer.createContext("/", new ResponseInjectingHttpHandler(requestHandlers, new GoogleCloudStorageHttpHandler("bucket")));
|
||||||
|
|
||||||
final int maxRetries = randomIntBetween(1, 3);
|
final int maxRetries = randomIntBetween(1, 3);
|
||||||
final BlobContainer container = createBlobContainer(maxRetries, null, null, null);
|
final BlobContainer container = createBlobContainer(maxRetries, null, null, null, null);
|
||||||
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
final byte[] data = randomBytes(randomIntBetween(1, BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||||
final String key = randomIdentifier();
|
final String key = randomIdentifier();
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,6 @@ import java.util.Optional;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.Executor;
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -69,6 +68,8 @@ class S3BlobStore implements BlobStore {
|
||||||
*/
|
*/
|
||||||
static final int MAX_BULK_DELETES = 1000;
|
static final int MAX_BULK_DELETES = 1000;
|
||||||
|
|
||||||
|
static final int MAX_DELETE_EXCEPTIONS = 10;
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
|
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
|
||||||
|
|
||||||
private final S3Service service;
|
private final S3Service service;
|
||||||
|
@ -340,6 +341,18 @@ class S3BlobStore implements BlobStore {
|
||||||
return new S3BlobContainer(path, this);
|
return new S3BlobContainer(path, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class DeletionExceptions {
|
||||||
|
Exception exception = null;
|
||||||
|
private int count = 0;
|
||||||
|
|
||||||
|
void useOrMaybeSuppress(Exception e) {
|
||||||
|
if (count < MAX_DELETE_EXCEPTIONS) {
|
||||||
|
exception = ExceptionsHelper.useOrSuppress(exception, e);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
|
void deleteBlobs(OperationPurpose purpose, Iterator<String> blobNames) throws IOException {
|
||||||
if (blobNames.hasNext() == false) {
|
if (blobNames.hasNext() == false) {
|
||||||
return;
|
return;
|
||||||
|
@ -348,19 +361,19 @@ class S3BlobStore implements BlobStore {
|
||||||
final List<String> partition = new ArrayList<>();
|
final List<String> partition = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
|
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
|
||||||
final AtomicReference<Exception> aex = new AtomicReference<>();
|
final var deletionExceptions = new DeletionExceptions();
|
||||||
blobNames.forEachRemaining(key -> {
|
blobNames.forEachRemaining(key -> {
|
||||||
partition.add(key);
|
partition.add(key);
|
||||||
if (partition.size() == bulkDeletionBatchSize) {
|
if (partition.size() == bulkDeletionBatchSize) {
|
||||||
deletePartition(purpose, partition, aex);
|
deletePartition(purpose, partition, deletionExceptions);
|
||||||
partition.clear();
|
partition.clear();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (partition.isEmpty() == false) {
|
if (partition.isEmpty() == false) {
|
||||||
deletePartition(purpose, partition, aex);
|
deletePartition(purpose, partition, deletionExceptions);
|
||||||
}
|
}
|
||||||
if (aex.get() != null) {
|
if (deletionExceptions.exception != null) {
|
||||||
throw aex.get();
|
throw deletionExceptions.exception;
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
|
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
|
||||||
|
@ -372,9 +385,9 @@ class S3BlobStore implements BlobStore {
|
||||||
*
|
*
|
||||||
* @param purpose The {@link OperationPurpose} of the deletion
|
* @param purpose The {@link OperationPurpose} of the deletion
|
||||||
* @param partition The list of blobs to delete
|
* @param partition The list of blobs to delete
|
||||||
* @param aex A holder for any exception(s) thrown during the deletion
|
* @param deletionExceptions A holder for any exception(s) thrown during the deletion
|
||||||
*/
|
*/
|
||||||
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
|
private void deletePartition(OperationPurpose purpose, List<String> partition, DeletionExceptions deletionExceptions) {
|
||||||
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
|
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
|
||||||
int retryCounter = 0;
|
int retryCounter = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -395,7 +408,7 @@ class S3BlobStore implements BlobStore {
|
||||||
),
|
),
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
deletionExceptions.useOrMaybeSuppress(e);
|
||||||
return;
|
return;
|
||||||
} catch (AmazonClientException e) {
|
} catch (AmazonClientException e) {
|
||||||
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
|
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
|
||||||
|
@ -404,13 +417,13 @@ class S3BlobStore implements BlobStore {
|
||||||
retryCounter++;
|
retryCounter++;
|
||||||
} else {
|
} else {
|
||||||
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
|
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
|
||||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
deletionExceptions.useOrMaybeSuppress(e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
|
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
|
||||||
// remove any keys from the outstanding deletes set.
|
// remove any keys from the outstanding deletes set.
|
||||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
deletionExceptions.useOrMaybeSuppress(e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.telemetry.InstrumentType;
|
import org.elasticsearch.telemetry.InstrumentType;
|
||||||
import org.elasticsearch.telemetry.Measurement;
|
import org.elasticsearch.telemetry.Measurement;
|
||||||
import org.elasticsearch.telemetry.RecordingMeterRegistry;
|
import org.elasticsearch.telemetry.RecordingMeterRegistry;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.watcher.ResourceWatcherService;
|
import org.elasticsearch.watcher.ResourceWatcherService;
|
||||||
import org.hamcrest.Matcher;
|
import org.hamcrest.Matcher;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -161,7 +162,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final @Nullable Integer maxRetries,
|
final @Nullable Integer maxRetries,
|
||||||
final @Nullable TimeValue readTimeout,
|
final @Nullable TimeValue readTimeout,
|
||||||
final @Nullable Boolean disableChunkedEncoding,
|
final @Nullable Boolean disableChunkedEncoding,
|
||||||
final @Nullable ByteSizeValue bufferSize
|
final @Nullable ByteSizeValue bufferSize,
|
||||||
|
final @Nullable Integer maxBulkDeletes
|
||||||
) {
|
) {
|
||||||
final Settings.Builder clientSettings = Settings.builder();
|
final Settings.Builder clientSettings = Settings.builder();
|
||||||
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||||
|
@ -192,14 +194,13 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
clientSettings.setSecureSettings(secureSettings);
|
clientSettings.setSecureSettings(secureSettings);
|
||||||
service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));
|
service.refreshAndClearCache(S3ClientSettings.load(clientSettings.build()));
|
||||||
|
|
||||||
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata(
|
final var repositorySettings = Settings.builder()
|
||||||
"repository",
|
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
|
||||||
S3Repository.TYPE,
|
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO);
|
||||||
Settings.builder()
|
if (maxBulkDeletes != null) {
|
||||||
.put(S3Repository.CLIENT_NAME.getKey(), clientName)
|
repositorySettings.put(S3Repository.DELETION_BATCH_SIZE_SETTING.getKey(), maxBulkDeletes);
|
||||||
.put(S3Repository.GET_REGISTER_RETRY_DELAY.getKey(), TimeValue.ZERO)
|
}
|
||||||
.build()
|
final RepositoryMetadata repositoryMetadata = new RepositoryMetadata("repository", S3Repository.TYPE, repositorySettings.build());
|
||||||
);
|
|
||||||
|
|
||||||
final S3BlobStore s3BlobStore = new S3BlobStore(
|
final S3BlobStore s3BlobStore = new S3BlobStore(
|
||||||
service,
|
service,
|
||||||
|
@ -255,7 +256,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final int maxRetries = randomInt(5);
|
final int maxRetries = randomInt(5);
|
||||||
final CountDown countDown = new CountDown(maxRetries + 1);
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
||||||
|
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_max_retries"), exchange -> {
|
||||||
|
@ -305,7 +306,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
public void testWriteBlobWithReadTimeouts() {
|
public void testWriteBlobWithReadTimeouts() {
|
||||||
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "write_blob_timeout"), exchange -> {
|
||||||
|
@ -343,7 +344,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
var maxRetries = randomInt(3);
|
var maxRetries = randomInt(3);
|
||||||
var blobLength = randomIntBetween(1, 4096 * 3);
|
var blobLength = randomIntBetween(1, 4096 * 3);
|
||||||
var blobName = getTestName().toLowerCase(Locale.ROOT);
|
var blobName = getTestName().toLowerCase(Locale.ROOT);
|
||||||
var blobContainer = createBlobContainer(maxRetries, null, true, null);
|
var blobContainer = createBlobContainer(maxRetries, null, true, null, null);
|
||||||
|
|
||||||
var uploadedBytes = new AtomicReference<BytesReference>();
|
var uploadedBytes = new AtomicReference<BytesReference>();
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, blobName), exchange -> {
|
||||||
|
@ -390,7 +391,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final boolean useTimeout = rarely();
|
final boolean useTimeout = rarely();
|
||||||
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
||||||
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
||||||
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
|
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
|
||||||
|
|
||||||
final int parts = randomIntBetween(1, 5);
|
final int parts = randomIntBetween(1, 5);
|
||||||
final long lastPartSize = randomLongBetween(10, 512);
|
final long lastPartSize = randomLongBetween(10, 512);
|
||||||
|
@ -491,7 +492,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
final boolean useTimeout = rarely();
|
final boolean useTimeout = rarely();
|
||||||
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
final TimeValue readTimeout = useTimeout ? TimeValue.timeValueMillis(randomIntBetween(100, 500)) : null;
|
||||||
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
final ByteSizeValue bufferSize = ByteSizeValue.of(5, ByteSizeUnit.MB);
|
||||||
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
|
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize, null);
|
||||||
|
|
||||||
final int parts = randomIntBetween(1, 5);
|
final int parts = randomIntBetween(1, 5);
|
||||||
final long lastPartSize = randomLongBetween(10, 512);
|
final long lastPartSize = randomLongBetween(10, 512);
|
||||||
|
@ -605,7 +606,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
||||||
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
|
@ -678,7 +679,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
|
|
||||||
|
@ -716,7 +717,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
0,
|
0,
|
||||||
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
randomFrom(1000, Math.toIntExact(S3Repository.BUFFER_SIZE_SETTING.get(Settings.EMPTY).getBytes()))
|
||||||
);
|
);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes));
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, ByteSizeValue.ofBytes(bufferSizeBytes), null);
|
||||||
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
final int meaningfulProgressBytes = Math.max(1, bufferSizeBytes / 100);
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent(512);
|
final byte[] bytes = randomBlobContent(512);
|
||||||
|
@ -809,7 +810,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testDoesNotRetryOnNotFound() {
|
public void testDoesNotRetryOnNotFound() {
|
||||||
final int maxRetries = between(3, 5);
|
final int maxRetries = between(3, 5);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null, null);
|
||||||
|
|
||||||
final AtomicInteger numberOfReads = new AtomicInteger(0);
|
final AtomicInteger numberOfReads = new AtomicInteger(0);
|
||||||
@SuppressForbidden(reason = "use a http server")
|
@SuppressForbidden(reason = "use a http server")
|
||||||
|
@ -841,7 +842,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
|
public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -861,7 +862,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
|
public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -898,7 +899,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testNonSnapshotDeletesAreNotRetried() {
|
public void testNonSnapshotDeletesAreNotRetried() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -927,7 +928,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testNonThrottlingErrorsAreNotRetried() {
|
public void testNonThrottlingErrorsAreNotRetried() {
|
||||||
// disable AWS-client retries
|
// disable AWS-client retries
|
||||||
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
|
final BlobContainer blobContainer = createBlobContainer(0, null, true, null, null);
|
||||||
|
|
||||||
int numBlobsToDelete = randomIntBetween(500, 3000);
|
int numBlobsToDelete = randomIntBetween(500, 3000);
|
||||||
List<String> blobsToDelete = new ArrayList<>();
|
List<String> blobsToDelete = new ArrayList<>();
|
||||||
|
@ -1006,7 +1007,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
|
|
||||||
public void testGetRegisterRetries() {
|
public void testGetRegisterRetries() {
|
||||||
final var maxRetries = between(0, 3);
|
final var maxRetries = between(0, 3);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
||||||
|
|
||||||
interface FailingHandlerFactory {
|
interface FailingHandlerFactory {
|
||||||
void addHandler(String blobName, Integer... responseCodes);
|
void addHandler(String blobName, Integer... responseCodes);
|
||||||
|
@ -1073,6 +1074,38 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSuppressedDeletionErrorsAreCapped() {
|
||||||
|
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
|
||||||
|
int maxBulkDeleteSize = randomIntBetween(1, 10);
|
||||||
|
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null, maxBulkDeleteSize);
|
||||||
|
httpServer.createContext("/", exchange -> {
|
||||||
|
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
|
||||||
|
exchange.sendResponseHeaders(
|
||||||
|
randomFrom(
|
||||||
|
HttpStatus.SC_INTERNAL_SERVER_ERROR,
|
||||||
|
HttpStatus.SC_BAD_GATEWAY,
|
||||||
|
HttpStatus.SC_SERVICE_UNAVAILABLE,
|
||||||
|
HttpStatus.SC_GATEWAY_TIMEOUT,
|
||||||
|
HttpStatus.SC_NOT_FOUND,
|
||||||
|
HttpStatus.SC_UNAUTHORIZED
|
||||||
|
),
|
||||||
|
-1
|
||||||
|
);
|
||||||
|
exchange.close();
|
||||||
|
} else {
|
||||||
|
fail("expected only deletions");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
var maxNoOfDeletions = 2 * S3BlobStore.MAX_DELETE_EXCEPTIONS;
|
||||||
|
var blobs = randomList(1, maxNoOfDeletions * maxBulkDeleteSize, ESTestCase::randomIdentifier);
|
||||||
|
var exception = expectThrows(
|
||||||
|
IOException.class,
|
||||||
|
"deletion should not succeed",
|
||||||
|
() -> blobContainer.deleteBlobsIgnoringIfNotExists(randomPurpose(), blobs.iterator())
|
||||||
|
);
|
||||||
|
assertThat(exception.getCause().getSuppressed().length, lessThan(S3BlobStore.MAX_DELETE_EXCEPTIONS));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
|
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
|
||||||
// some attempts make meaningful progress and do not count towards the max retry limit
|
// some attempts make meaningful progress and do not count towards the max retry limit
|
||||||
|
|
|
@ -76,7 +76,8 @@ public class URLBlobContainerRetriesTests extends AbstractBlobContainerRetriesTe
|
||||||
Integer maxRetries,
|
Integer maxRetries,
|
||||||
TimeValue readTimeout,
|
TimeValue readTimeout,
|
||||||
Boolean disableChunkedEncoding,
|
Boolean disableChunkedEncoding,
|
||||||
ByteSizeValue bufferSize
|
ByteSizeValue bufferSize,
|
||||||
|
Integer maxBulkDeletes
|
||||||
) {
|
) {
|
||||||
Settings.Builder settingsBuilder = Settings.builder();
|
Settings.Builder settingsBuilder = Settings.builder();
|
||||||
|
|
||||||
|
|
|
@ -82,7 +82,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
@Nullable Integer maxRetries,
|
@Nullable Integer maxRetries,
|
||||||
@Nullable TimeValue readTimeout,
|
@Nullable TimeValue readTimeout,
|
||||||
@Nullable Boolean disableChunkedEncoding,
|
@Nullable Boolean disableChunkedEncoding,
|
||||||
@Nullable ByteSizeValue bufferSize
|
@Nullable ByteSizeValue bufferSize,
|
||||||
|
@Nullable Integer maxBulkDeletes
|
||||||
);
|
);
|
||||||
|
|
||||||
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
|
protected org.hamcrest.Matcher<Object> readTimeoutExceptionMatcher() {
|
||||||
|
@ -91,7 +92,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
public void testReadNonexistentBlobThrowsNoSuchFileException() {
|
||||||
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null, null);
|
||||||
final long position = randomLongBetween(0, MAX_RANGE_VAL);
|
final long position = randomLongBetween(0, MAX_RANGE_VAL);
|
||||||
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
final int length = randomIntBetween(1, Math.toIntExact(Math.min(Integer.MAX_VALUE, MAX_RANGE_VAL - position)));
|
||||||
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
final Exception exception = expectThrows(NoSuchFileException.class, () -> {
|
||||||
|
@ -118,7 +119,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(1, 3));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_max_retries"), exchange -> {
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
if (countDown.countDown()) {
|
if (countDown.countDown()) {
|
||||||
|
@ -175,7 +176,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
final CountDown countDown = new CountDown(maxRetries + 1);
|
final CountDown countDown = new CountDown(maxRetries + 1);
|
||||||
|
|
||||||
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
final TimeValue readTimeout = TimeValue.timeValueSeconds(between(5, 10));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
||||||
final byte[] bytes = randomBlobContent();
|
final byte[] bytes = randomBlobContent();
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_range_blob_max_retries"), exchange -> {
|
||||||
Streams.readFully(exchange.getRequestBody());
|
Streams.readFully(exchange.getRequestBody());
|
||||||
|
@ -247,7 +248,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
public void testReadBlobWithReadTimeouts() {
|
public void testReadBlobWithReadTimeouts() {
|
||||||
final int maxRetries = randomInt(5);
|
final int maxRetries = randomInt(5);
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null, null);
|
||||||
|
|
||||||
// HTTP server does not send a response
|
// HTTP server does not send a response
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_unresponsive"), exchange -> {});
|
||||||
|
@ -304,7 +305,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
public void testReadBlobWithNoHttpResponse() {
|
public void testReadBlobWithNoHttpResponse() {
|
||||||
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
|
||||||
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null);
|
final BlobContainer blobContainer = createBlobContainer(randomInt(5), readTimeout, null, null, null);
|
||||||
|
|
||||||
// HTTP server closes connection immediately
|
// HTTP server closes connection immediately
|
||||||
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
|
httpServer.createContext(downloadStorageEndpoint(blobContainer, "read_blob_no_response"), HttpExchange::close);
|
||||||
|
@ -324,7 +325,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
|
||||||
|
|
||||||
public void testReadBlobWithPrematureConnectionClose() {
|
public void testReadBlobWithPrematureConnectionClose() {
|
||||||
final int maxRetries = randomInt(20);
|
final int maxRetries = randomInt(20);
|
||||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
|
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null, null);
|
||||||
|
|
||||||
final boolean alwaysFlushBody = randomBoolean();
|
final boolean alwaysFlushBody = randomBoolean();
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue