Retry throttled snapshot deletions (#113237)

Closes ES-8562
This commit is contained in:
Nick Tindall 2024-10-16 09:08:49 +11:00 committed by GitHub
parent 837c0e8d0e
commit 16864e985b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 542 additions and 43 deletions

View file

@ -0,0 +1,5 @@
pr: 113237
summary: Retry throttled snapshot deletions
area: Snapshot/Restore
type: bug
issues: []

View file

@ -329,6 +329,20 @@ include::repository-shared-settings.asciidoc[]
`1000` which is the maximum number supported by the https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up dangling multipart uploads.
`throttled_delete_retry.delay_increment`::
(<<time-units,time value>>) This value is used as the delay before the first retry and the amount the delay is incremented by on each subsequent retry. Default is 50ms, minimum is 0ms.
`throttled_delete_retry.maximum_delay`::
(<<time-units,time value>>) This is the upper bound on how long the delays between retries will grow to. Default is 5s, minimum is 0ms.
`throttled_delete_retry.maximum_number_of_retries`::
(integer) Sets the number times to retry a throttled snapshot deletion. Defaults to `10`, minimum value is `0` which
will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries
comprises that many client-level retries.
NOTE: The option of defining client settings in the repository settings as
documented below is considered deprecated, and will be removed in a future
version.

View file

@ -20,6 +20,7 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -31,6 +32,8 @@ import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -38,6 +41,7 @@ import java.util.Map;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import static org.elasticsearch.repositories.RepositoriesMetrics.HTTP_REQUEST_TIME_IN_MILLIS_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_EXCEPTIONS_HISTOGRAM;
@ -48,9 +52,11 @@ import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_REQUESTS
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_HISTOGRAM;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_TOTAL;
import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_UNSUCCESSFUL_OPERATIONS_TOTAL;
import static org.elasticsearch.repositories.s3.S3RepositoriesMetrics.METRIC_DELETE_RETRIES_HISTOGRAM;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED;
import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -61,14 +67,22 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTests {
private final Queue<RestStatus> errorStatusQueue = new LinkedBlockingQueue<>();
private static final S3ErrorResponse S3_SLOW_DOWN_RESPONSE = new S3ErrorResponse(SERVICE_UNAVAILABLE, """
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>SlowDown</Code>
<Message>This is a throttling message</Message>
<Resource>/bucket/</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""");
private final Queue<S3ErrorResponse> errorResponseQueue = new LinkedBlockingQueue<>();
// Always create erroneous handler
@Override
protected Map<String, HttpHandler> createHttpHandlers() {
return Collections.singletonMap(
"/bucket",
new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorStatusQueue))
new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorResponseQueue))
);
}
@ -244,8 +258,74 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest
}
}
public void testRetrySnapshotDeleteMetricsOnEventualSuccess() throws IOException {
final int maxRetries = 5;
final String repositoryName = randomRepositoryName();
// Disable retries in the client for this repo
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0)
.put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10))
.put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries)
.build(),
false
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName);
final TestTelemetryPlugin plugin = getPlugin(dataNodeName);
final int numberOfDeletes = randomIntBetween(1, 3);
final List<Long> numberOfRetriesPerAttempt = new ArrayList<>();
for (int i = 0; i < numberOfDeletes; i++) {
int numFailures = randomIntBetween(1, maxRetries);
numberOfRetriesPerAttempt.add((long) numFailures);
IntStream.range(0, numFailures).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE));
blobContainer.deleteBlobsIgnoringIfNotExists(
randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA),
List.of(randomIdentifier()).iterator()
);
}
List<Measurement> longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM);
assertThat(longHistogramMeasurement.stream().map(Measurement::getLong).toList(), equalTo(numberOfRetriesPerAttempt));
}
public void testRetrySnapshotDeleteMetricsWhenRetriesExhausted() {
final String repositoryName = randomRepositoryName();
// Disable retries in the client for this repo
int maxRetries = 3;
createRepository(
repositoryName,
Settings.builder()
.put(repositorySettings(repositoryName))
.put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0)
.put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10))
.put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries)
.build(),
false
);
final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData);
final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName);
final TestTelemetryPlugin plugin = getPlugin(dataNodeName);
// Keep throttling past the max number of retries
IntStream.range(0, maxRetries + 1).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE));
assertThrows(
IOException.class,
() -> blobContainer.deleteBlobsIgnoringIfNotExists(
randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA),
List.of(randomIdentifier()).iterator()
)
);
List<Measurement> longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM);
assertThat(longHistogramMeasurement.get(0).getLong(), equalTo(3L));
}
private void addErrorStatus(RestStatus... statuses) {
errorStatusQueue.addAll(Arrays.asList(statuses));
errorResponseQueue.addAll(Arrays.stream(statuses).map(S3ErrorResponse::new).toList());
}
private void addErrorStatus(S3ErrorResponse... responses) {
errorResponseQueue.addAll(Arrays.asList(responses));
}
private long getLongCounterValue(TestTelemetryPlugin plugin, String instrumentName, Operation operation) {
@ -275,25 +355,25 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest
private static class S3MetricErroneousHttpHandler implements DelegatingHttpHandler {
private final HttpHandler delegate;
private final Queue<RestStatus> errorStatusQueue;
private final Queue<S3ErrorResponse> errorResponseQueue;
S3MetricErroneousHttpHandler(HttpHandler delegate, Queue<RestStatus> errorStatusQueue) {
S3MetricErroneousHttpHandler(HttpHandler delegate, Queue<S3ErrorResponse> errorResponseQueue) {
this.delegate = delegate;
this.errorStatusQueue = errorStatusQueue;
this.errorResponseQueue = errorResponseQueue;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
final RestStatus status = errorStatusQueue.poll();
if (status == null) {
final S3ErrorResponse errorResponse = errorResponseQueue.poll();
if (errorResponse == null) {
delegate.handle(exchange);
} else if (status == INTERNAL_SERVER_ERROR) {
} else if (errorResponse.status == INTERNAL_SERVER_ERROR) {
// Simulate an retryable exception
throw new IOException("ouch");
} else {
try (exchange) {
drainInputStream(exchange.getRequestBody());
exchange.sendResponseHeaders(status.getStatus(), -1);
errorResponse.writeResponse(exchange);
}
}
}
@ -302,4 +382,22 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest
return delegate;
}
}
record S3ErrorResponse(RestStatus status, String responseBody) {
S3ErrorResponse(RestStatus status) {
this(status, null);
}
@SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint")
public void writeResponse(HttpExchange exchange) throws IOException {
if (responseBody != null) {
byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(status.getStatus(), responseBytes.length);
exchange.getResponseBody().write(responseBytes);
} else {
exchange.sendResponseHeaders(status.getStatus(), -1);
}
}
}
}

View file

@ -14,6 +14,7 @@ import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.metrics.RequestMetricCollector;
import com.amazonaws.retry.RetryUtils;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
@ -25,6 +26,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -91,6 +93,7 @@ class S3BlobStore implements BlobStore {
private final StatsCollectors statsCollectors = new StatsCollectors();
private final int bulkDeletionBatchSize;
private final BackoffPolicy retryThrottledDeleteBackoffPolicy;
S3BlobStore(
S3Service service,
@ -102,7 +105,8 @@ class S3BlobStore implements BlobStore {
RepositoryMetadata repositoryMetadata,
BigArrays bigArrays,
ThreadPool threadPool,
S3RepositoriesMetrics s3RepositoriesMetrics
S3RepositoriesMetrics s3RepositoriesMetrics,
BackoffPolicy retryThrottledDeleteBackoffPolicy
) {
this.service = service;
this.bigArrays = bigArrays;
@ -116,7 +120,7 @@ class S3BlobStore implements BlobStore {
this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
this.s3RepositoriesMetrics = s3RepositoriesMetrics;
this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings());
this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy;
}
RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) {
@ -255,7 +259,8 @@ class S3BlobStore implements BlobStore {
private static long getCountForMetric(TimingInfo info, AWSRequestMetrics.Field field) {
var count = info.getCounter(field.name());
if (count == null) {
if (field == AWSRequestMetrics.Field.RequestCount) {
// This can be null if the thread was interrupted
if (field == AWSRequestMetrics.Field.RequestCount && Thread.currentThread().isInterrupted() == false) {
final String message = "Expected request count to be tracked but found not count.";
assert false : message;
logger.warn(message);
@ -331,18 +336,18 @@ class S3BlobStore implements BlobStore {
}
final List<String> partition = new ArrayList<>();
try (AmazonS3Reference clientReference = clientReference()) {
try {
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
final AtomicReference<Exception> aex = new AtomicReference<>();
blobNames.forEachRemaining(key -> {
partition.add(key);
if (partition.size() == bulkDeletionBatchSize) {
deletePartition(purpose, clientReference, partition, aex);
deletePartition(purpose, partition, aex);
partition.clear();
}
});
if (partition.isEmpty() == false) {
deletePartition(purpose, clientReference, partition, aex);
deletePartition(purpose, partition, aex);
}
if (aex.get() != null) {
throw aex.get();
@ -352,32 +357,86 @@ class S3BlobStore implements BlobStore {
}
}
private void deletePartition(
OperationPurpose purpose,
AmazonS3Reference clientReference,
List<String> partition,
AtomicReference<Exception> aex
) {
try {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(bulkDelete(purpose, this, partition)));
} catch (MultiObjectDeleteException e) {
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
// first remove all keys that were sent in the request and then add back those that ran into an exception.
logger.warn(
() -> format(
"Failed to delete some blobs %s",
e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList()
),
e
);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
} catch (AmazonClientException e) {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
/**
* Delete one partition of a batch of blobs
*
* @param purpose The {@link OperationPurpose} of the deletion
* @param partition The list of blobs to delete
* @param aex A holder for any exception(s) thrown during the deletion
*/
private void deletePartition(OperationPurpose purpose, List<String> partition, AtomicReference<Exception> aex) {
final Iterator<TimeValue> retries = retryThrottledDeleteBackoffPolicy.iterator();
int retryCounter = 0;
while (true) {
try (AmazonS3Reference clientReference = clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(bulkDelete(purpose, this, partition)));
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
return;
} catch (MultiObjectDeleteException e) {
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
// first remove all keys that were sent in the request and then add back those that ran into an exception.
logger.warn(
() -> format(
"Failed to delete some blobs %s",
e.getErrors()
.stream()
.map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]")
.toList()
),
e
);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
return;
} catch (AmazonClientException e) {
if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) {
// S3 is asking us to slow down. Pause for a bit and retry
if (maybeDelayAndRetryDelete(retries)) {
retryCounter++;
} else {
s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter);
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
return;
}
} else {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
return;
}
}
}
}
/**
* If there are remaining retries, pause for the configured interval then return true
*
* @param retries The retries iterator
* @return true to try the deletion again, false otherwise
*/
private boolean maybeDelayAndRetryDelete(Iterator<TimeValue> retries) {
if (retries.hasNext()) {
try {
Thread.sleep(retries.next().millis());
return true;
} catch (InterruptedException iex) {
Thread.currentThread().interrupt();
// If we're interrupted, record the exception and abort retries
logger.warn("Aborting tenacious snapshot delete retries due to interrupt");
}
} else {
logger.warn(
"Exceeded maximum tenacious snapshot delete retries, aborting. Using back-off policy "
+ retryThrottledDeleteBackoffPolicy
+ ", see the throttled_delete_retry.* S3 repository properties to configure the back-off parameters"
);
}
return false;
}
private boolean shouldRetryDelete(OperationPurpose operationPurpose) {
return operationPurpose == OperationPurpose.SNAPSHOT_DATA || operationPurpose == OperationPurpose.SNAPSHOT_METADATA;
}
private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobStore blobStore, List<String> blobs) {
final DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(blobStore.bucket()).withKeys(
blobs.toArray(Strings.EMPTY_ARRAY)

View file

@ -17,7 +17,8 @@ public record S3RepositoriesMetrics(
RepositoriesMetrics common,
LongCounter retryStartedCounter,
LongCounter retryCompletedCounter,
LongHistogram retryHistogram
LongHistogram retryHistogram,
LongHistogram retryDeletesHistogram
) {
public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP);
@ -25,6 +26,7 @@ public record S3RepositoriesMetrics(
public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total";
public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total";
public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram";
public static final String METRIC_DELETE_RETRIES_HISTOGRAM = "es.repositories.s3.delete.retry.attempts.histogram";
public S3RepositoriesMetrics(RepositoriesMetrics common) {
this(
@ -32,7 +34,8 @@ public record S3RepositoriesMetrics(
common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"),
common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"),
common.meterRegistry()
.registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit")
.registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit"),
common.meterRegistry().registerLongHistogram(METRIC_DELETE_RETRIES_HISTOGRAM, "s3 delete retry attempts histogram", "unit")
);
}
}

View file

@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.ReferenceDocs;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
@ -202,6 +203,26 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.Dynamic
);
/**
* We will retry deletes that fail due to throttling. We use an {@link BackoffPolicy#linearBackoff(TimeValue, int, TimeValue)}
* with the following parameters
*/
static final Setting<TimeValue> RETRY_THROTTLED_DELETE_DELAY_INCREMENT = Setting.timeSetting(
"throttled_delete_retry.delay_increment",
TimeValue.timeValueMillis(50),
TimeValue.ZERO
);
static final Setting<TimeValue> RETRY_THROTTLED_DELETE_MAXIMUM_DELAY = Setting.timeSetting(
"throttled_delete_retry.maximum_delay",
TimeValue.timeValueSeconds(5),
TimeValue.ZERO
);
static final Setting<Integer> RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES = Setting.intSetting(
"throttled_delete_retry.maximum_number_of_retries",
10,
0
);
private final S3Service service;
private final String bucket;
@ -424,7 +445,12 @@ class S3Repository extends MeteredBlobStoreRepository {
metadata,
bigArrays,
threadPool,
s3RepositoriesMetrics
s3RepositoriesMetrics,
BackoffPolicy.linearBackoff(
RETRY_THROTTLED_DELETE_DELAY_INCREMENT.get(metadata.settings()),
RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.get(metadata.settings()),
RETRY_THROTTLED_DELETE_MAXIMUM_DELAY.get(metadata.settings())
)
);
}

View file

@ -10,10 +10,12 @@ package org.elasticsearch.repositories.s3;
import fixture.s3.S3HttpHandler;
import com.amazonaws.AbortedException;
import com.amazonaws.DnsResolver;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
@ -21,6 +23,8 @@ import com.sun.net.httpserver.HttpHandler;
import org.apache.http.HttpStatus;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.OperationPurpose;
@ -62,15 +66,18 @@ import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
@ -98,6 +105,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
@SuppressForbidden(reason = "use a http server")
public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
private static final int MAX_NUMBER_SNAPSHOT_DELETE_RETRIES = 10;
private S3Service service;
private AtomicBoolean shouldErrorOnDns;
private RecordingMeterRegistry recordingMeterRegistry;
@ -196,7 +204,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
repositoryMetadata,
BigArrays.NON_RECYCLING_INSTANCE,
new DeterministicTaskQueue().getThreadPool(),
new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry))
new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)),
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(1), MAX_NUMBER_SNAPSHOT_DELETE_RETRIES)
);
return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) {
@Override
@ -771,6 +780,171 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
assertThat(getRetryHistogramMeasurements(), empty());
}
public void testSnapshotDeletesRetryOnThrottlingError() throws IOException {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < numBlobsToDelete; i++) {
blobsToDelete.add(randomIdentifier());
}
int throttleTimesBeforeSuccess = randomIntBetween(1, MAX_NUMBER_SNAPSHOT_DELETE_RETRIES);
logger.info("--> Throttling {} times before success", throttleTimesBeforeSuccess);
ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(throttleTimesBeforeSuccess, attempt -> {});
httpServer.createContext("/", handler);
blobContainer.deleteBlobsIgnoringIfNotExists(randomFrom(operationPurposesThatRetryOnDelete()), blobsToDelete.iterator());
int expectedNumberOfBatches = expectedNumberOfBatches(numBlobsToDelete);
assertThat(handler.numberOfDeleteAttempts.get(), equalTo(throttleTimesBeforeSuccess + expectedNumberOfBatches));
assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(expectedNumberOfBatches));
}
public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < numBlobsToDelete; i++) {
blobsToDelete.add(randomIdentifier());
}
final Thread clientThread = Thread.currentThread();
int interruptBeforeAttempt = randomIntBetween(0, randomIntBetween(1, 10));
logger.info("--> Deleting {} blobs, interrupting before attempt {}", numBlobsToDelete, interruptBeforeAttempt);
ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> {
if (attempt == interruptBeforeAttempt) {
clientThread.interrupt();
}
});
httpServer.createContext("/", handler);
try {
IOException exception = assertThrows(
IOException.class,
() -> blobContainer.deleteBlobsIgnoringIfNotExists(
randomFrom(operationPurposesThatRetryOnDelete()),
blobsToDelete.iterator()
)
);
assertThat(exception.getCause(), instanceOf(AbortedException.class));
assertThat(handler.numberOfDeleteAttempts.get(), equalTo(interruptBeforeAttempt + 1));
assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0));
} finally {
// interrupt should be preserved, clear it to prevent it leaking between tests
assertTrue(Thread.interrupted());
}
}
public void testNonSnapshotDeletesAreNotRetried() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < numBlobsToDelete; i++) {
blobsToDelete.add(randomIdentifier());
}
ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> {});
httpServer.createContext("/", handler);
IOException exception = assertThrows(
IOException.class,
() -> blobContainer.deleteBlobsIgnoringIfNotExists(
randomValueOtherThanMany(
op -> operationPurposesThatRetryOnDelete().contains(op),
() -> randomFrom(OperationPurpose.values())
),
blobsToDelete.iterator()
)
);
assertEquals(
ThrottlingDeleteHandler.THROTTLING_ERROR_CODE,
asInstanceOf(AmazonS3Exception.class, exception.getCause()).getErrorCode()
);
assertThat(handler.numberOfDeleteAttempts.get(), equalTo(expectedNumberOfBatches(numBlobsToDelete)));
assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0));
}
public void testNonThrottlingErrorsAreNotRetried() {
// disable AWS-client retries
final BlobContainer blobContainer = createBlobContainer(0, null, true, null);
int numBlobsToDelete = randomIntBetween(500, 3000);
List<String> blobsToDelete = new ArrayList<>();
for (int i = 0; i < numBlobsToDelete; i++) {
blobsToDelete.add(randomIdentifier());
}
ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> {}, "NotThrottling");
httpServer.createContext("/", handler);
assertThrows(
IOException.class,
() -> blobContainer.deleteBlobsIgnoringIfNotExists(randomFrom(operationPurposesThatRetryOnDelete()), blobsToDelete.iterator())
);
assertThat(handler.numberOfDeleteAttempts.get(), equalTo(expectedNumberOfBatches(numBlobsToDelete)));
assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0));
}
private int expectedNumberOfBatches(int blobsToDelete) {
return (blobsToDelete / 1_000) + (blobsToDelete % 1_000 == 0 ? 0 : 1);
}
@SuppressForbidden(reason = "use a http server")
private class ThrottlingDeleteHandler extends S3HttpHandler {
private static final String THROTTLING_ERROR_CODE = "SlowDown";
private final AtomicInteger throttleTimesBeforeSuccess;
private final AtomicInteger numberOfDeleteAttempts;
private final AtomicInteger numberOfSuccessfulDeletes;
private final IntConsumer onAttemptCallback;
private final String errorCode;
ThrottlingDeleteHandler(int throttleTimesBeforeSuccess, IntConsumer onAttemptCallback) {
this(throttleTimesBeforeSuccess, onAttemptCallback, THROTTLING_ERROR_CODE);
}
ThrottlingDeleteHandler(int throttleTimesBeforeSuccess, IntConsumer onAttemptCallback, String errorCode) {
super("bucket");
this.numberOfDeleteAttempts = new AtomicInteger();
this.numberOfSuccessfulDeletes = new AtomicInteger();
this.throttleTimesBeforeSuccess = new AtomicInteger(throttleTimesBeforeSuccess);
this.onAttemptCallback = onAttemptCallback;
this.errorCode = errorCode;
}
@Override
public void handle(HttpExchange exchange) throws IOException {
if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) {
onAttemptCallback.accept(numberOfDeleteAttempts.get());
numberOfDeleteAttempts.incrementAndGet();
if (throttleTimesBeforeSuccess.getAndDecrement() > 0) {
final byte[] responseBytes = Strings.format("""
<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>%s</Code>
<Message>This is a throttling message</Message>
<Resource>/bucket/</Resource>
<RequestId>4442587FB7D0A2F9</RequestId>
</Error>""", errorCode).getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HttpStatus.SC_SERVICE_UNAVAILABLE, responseBytes.length);
exchange.getResponseBody().write(responseBytes);
exchange.close();
} else {
numberOfSuccessfulDeletes.incrementAndGet();
super.handle(exchange);
}
} else {
super.handle(exchange);
}
}
}
private Set<OperationPurpose> operationPurposesThatRetryOnDelete() {
return Set.of(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA);
}
@Override
protected Matcher<Integer> getMaxRetriesMatcher(int maxRetries) {
// some attempts make meaningful progress and do not count towards the max retry limit

View file

@ -8,6 +8,7 @@
*/
package org.elasticsearch.common;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import java.util.Collections;
@ -81,6 +82,18 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries);
}
/**
* Creates a new linear backoff policy with the provided configuration
*
* @param delayIncrement The amount by which to increment the delay on each retry
* @param maxNumberOfRetries The maximum number of retries
* @param maximumDelay The maximum delay
* @return A backoff policy with linear increase in wait time for retries.
*/
public static BackoffPolicy linearBackoff(TimeValue delayIncrement, int maxNumberOfRetries, TimeValue maximumDelay) {
return new LinearBackoff(delayIncrement, maxNumberOfRetries, maximumDelay);
}
/**
* Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy.
*/
@ -100,6 +113,11 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
public Iterator<TimeValue> iterator() {
return Collections.emptyIterator();
}
@Override
public String toString() {
return "NoBackoff";
}
}
private static class ExponentialBackoff extends BackoffPolicy {
@ -118,6 +136,11 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
public Iterator<TimeValue> iterator() {
return new ExponentialBackoffIterator(start, numberOfElements);
}
@Override
public String toString() {
return "ExponentialBackoff{start=" + start + ", numberOfElements=" + numberOfElements + '}';
}
}
private static class ExponentialBackoffIterator implements Iterator<TimeValue> {
@ -163,6 +186,11 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
public Iterator<TimeValue> iterator() {
return new ConstantBackoffIterator(delay, numberOfElements);
}
@Override
public String toString() {
return "ConstantBackoff{delay=" + delay + ", numberOfElements=" + numberOfElements + '}';
}
}
private static final class ConstantBackoffIterator implements Iterator<TimeValue> {
@ -203,6 +231,11 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
public Iterator<TimeValue> iterator() {
return new WrappedBackoffIterator(delegate.iterator(), onBackoff);
}
@Override
public String toString() {
return "WrappedBackoffPolicy{delegate=" + delegate + ", onBackoff=" + onBackoff + '}';
}
}
private static final class WrappedBackoffIterator implements Iterator<TimeValue> {
@ -228,4 +261,60 @@ public abstract class BackoffPolicy implements Iterable<TimeValue> {
return delegate.next();
}
}
private static final class LinearBackoff extends BackoffPolicy {
private final TimeValue delayIncrement;
private final int maxNumberOfRetries;
private final TimeValue maximumDelay;
private LinearBackoff(TimeValue delayIncrement, int maxNumberOfRetries, @Nullable TimeValue maximumDelay) {
this.delayIncrement = delayIncrement;
this.maxNumberOfRetries = maxNumberOfRetries;
this.maximumDelay = maximumDelay;
}
@Override
public Iterator<TimeValue> iterator() {
return new LinearBackoffIterator(delayIncrement, maxNumberOfRetries, maximumDelay);
}
@Override
public String toString() {
return "LinearBackoff{"
+ "delayIncrement="
+ delayIncrement
+ ", maxNumberOfRetries="
+ maxNumberOfRetries
+ ", maximumDelay="
+ maximumDelay
+ '}';
}
}
private static final class LinearBackoffIterator implements Iterator<TimeValue> {
private final TimeValue delayIncrement;
private final int maxNumberOfRetries;
private final TimeValue maximumDelay;
private int curr;
private LinearBackoffIterator(TimeValue delayIncrement, int maxNumberOfRetries, @Nullable TimeValue maximumDelay) {
this.delayIncrement = delayIncrement;
this.maxNumberOfRetries = maxNumberOfRetries;
this.maximumDelay = maximumDelay;
}
@Override
public boolean hasNext() {
return curr < maxNumberOfRetries;
}
@Override
public TimeValue next() {
curr++;
TimeValue timeValue = TimeValue.timeValueMillis(curr * delayIncrement.millis());
return maximumDelay == null ? timeValue : timeValue.compareTo(maximumDelay) < 0 ? timeValue : maximumDelay;
}
}
}

View file

@ -77,6 +77,37 @@ public class BackoffPolicyTests extends ESTestCase {
}
}
public void testLinearBackoffWithLimit() {
long incrementMillis = randomIntBetween(10, 500);
long limitMillis = randomIntBetween(1000, 5000);
int maxNumberOfRetries = randomIntBetween(0, 30);
BackoffPolicy timeValues = BackoffPolicy.linearBackoff(
timeValueMillis(incrementMillis),
maxNumberOfRetries,
timeValueMillis(limitMillis)
);
int counter = 0;
for (TimeValue timeValue : timeValues) {
counter++;
long unlimitedValue = counter * incrementMillis;
long expectedValue = Math.min(unlimitedValue, limitMillis);
assertEquals(timeValueMillis(expectedValue), timeValue);
}
assertEquals(counter, maxNumberOfRetries);
}
public void testLinearBackoffWithoutLimit() {
long incrementMillis = randomIntBetween(10, 500);
int maxNumberOfRetries = randomIntBetween(0, 30);
BackoffPolicy timeValues = BackoffPolicy.linearBackoff(timeValueMillis(incrementMillis), maxNumberOfRetries, null);
int counter = 0;
for (TimeValue timeValue : timeValues) {
counter++;
assertEquals(timeValueMillis(counter * incrementMillis), timeValue);
}
assertEquals(counter, maxNumberOfRetries);
}
public void testNoBackoff() {
BackoffPolicy noBackoff = BackoffPolicy.noBackoff();
int numberOfBackoffsToPerform = randomIntBetween(1, 3);