diff --git a/docs/changelog/113237.yaml b/docs/changelog/113237.yaml new file mode 100644 index 000000000000..45343dbf1711 --- /dev/null +++ b/docs/changelog/113237.yaml @@ -0,0 +1,5 @@ +pr: 113237 +summary: Retry throttled snapshot deletions +area: Snapshot/Restore +type: bug +issues: [] diff --git a/docs/reference/snapshot-restore/repository-s3.asciidoc b/docs/reference/snapshot-restore/repository-s3.asciidoc index 71a9fd8b87c9..b48bb5c4f059 100644 --- a/docs/reference/snapshot-restore/repository-s3.asciidoc +++ b/docs/reference/snapshot-restore/repository-s3.asciidoc @@ -329,6 +329,20 @@ include::repository-shared-settings.asciidoc[] `1000` which is the maximum number supported by the https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up dangling multipart uploads. +`throttled_delete_retry.delay_increment`:: + + (<>) This value is used as the delay before the first retry and the amount the delay is incremented by on each subsequent retry. Default is 50ms, minimum is 0ms. + +`throttled_delete_retry.maximum_delay`:: + + (<>) This is the upper bound on how long the delays between retries will grow to. Default is 5s, minimum is 0ms. + +`throttled_delete_retry.maximum_number_of_retries`:: + + (integer) Sets the number times to retry a throttled snapshot deletion. Defaults to `10`, minimum value is `0` which + will disable retries altogether. Note that if retries are enabled in the Azure client, each of these retries + comprises that many client-level retries. + NOTE: The option of defining client settings in the repository settings as documented below is considered deprecated, and will be removed in a future version. diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryMetricsTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryMetricsTests.java index e55668adea10..21f42bf9eb99 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryMetricsTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryMetricsTests.java @@ -20,6 +20,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -31,6 +32,8 @@ import org.elasticsearch.telemetry.TestTelemetryPlugin; import org.elasticsearch.test.ESIntegTestCase; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -38,6 +41,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import static org.elasticsearch.repositories.RepositoriesMetrics.HTTP_REQUEST_TIME_IN_MILLIS_HISTOGRAM; import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_EXCEPTIONS_HISTOGRAM; @@ -48,9 +52,11 @@ import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_REQUESTS import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_HISTOGRAM; import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_THROTTLES_TOTAL; import static org.elasticsearch.repositories.RepositoriesMetrics.METRIC_UNSUCCESSFUL_OPERATIONS_TOTAL; +import static org.elasticsearch.repositories.s3.S3RepositoriesMetrics.METRIC_DELETE_RETRIES_HISTOGRAM; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.elasticsearch.rest.RestStatus.NOT_FOUND; import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED; +import static org.elasticsearch.rest.RestStatus.SERVICE_UNAVAILABLE; import static org.elasticsearch.rest.RestStatus.TOO_MANY_REQUESTS; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -61,14 +67,22 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTests { - private final Queue errorStatusQueue = new LinkedBlockingQueue<>(); + private static final S3ErrorResponse S3_SLOW_DOWN_RESPONSE = new S3ErrorResponse(SERVICE_UNAVAILABLE, """ + + + SlowDown + This is a throttling message + /bucket/ + 4442587FB7D0A2F9 + """); + private final Queue errorResponseQueue = new LinkedBlockingQueue<>(); // Always create erroneous handler @Override protected Map createHttpHandlers() { return Collections.singletonMap( "/bucket", - new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorStatusQueue)) + new S3StatsCollectorHttpHandler(new S3MetricErroneousHttpHandler(new S3BlobStoreHttpHandler("bucket"), errorResponseQueue)) ); } @@ -244,8 +258,74 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest } } + public void testRetrySnapshotDeleteMetricsOnEventualSuccess() throws IOException { + final int maxRetries = 5; + final String repositoryName = randomRepositoryName(); + // Disable retries in the client for this repo + createRepository( + repositoryName, + Settings.builder() + .put(repositorySettings(repositoryName)) + .put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0) + .put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10)) + .put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries) + .build(), + false + ); + final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData); + final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName); + final TestTelemetryPlugin plugin = getPlugin(dataNodeName); + final int numberOfDeletes = randomIntBetween(1, 3); + final List numberOfRetriesPerAttempt = new ArrayList<>(); + for (int i = 0; i < numberOfDeletes; i++) { + int numFailures = randomIntBetween(1, maxRetries); + numberOfRetriesPerAttempt.add((long) numFailures); + IntStream.range(0, numFailures).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE)); + blobContainer.deleteBlobsIgnoringIfNotExists( + randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA), + List.of(randomIdentifier()).iterator() + ); + } + List longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM); + assertThat(longHistogramMeasurement.stream().map(Measurement::getLong).toList(), equalTo(numberOfRetriesPerAttempt)); + } + + public void testRetrySnapshotDeleteMetricsWhenRetriesExhausted() { + final String repositoryName = randomRepositoryName(); + // Disable retries in the client for this repo + int maxRetries = 3; + createRepository( + repositoryName, + Settings.builder() + .put(repositorySettings(repositoryName)) + .put(S3ClientSettings.MAX_RETRIES_SETTING.getConcreteSettingForNamespace("placeholder").getKey(), 0) + .put(S3Repository.RETRY_THROTTLED_DELETE_DELAY_INCREMENT.getKey(), TimeValue.timeValueMillis(10)) + .put(S3Repository.RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.getKey(), maxRetries) + .build(), + false + ); + final String dataNodeName = internalCluster().getNodeNameThat(DiscoveryNode::canContainData); + final BlobContainer blobContainer = getBlobContainer(dataNodeName, repositoryName); + final TestTelemetryPlugin plugin = getPlugin(dataNodeName); + // Keep throttling past the max number of retries + IntStream.range(0, maxRetries + 1).forEach(ignored -> addErrorStatus(S3_SLOW_DOWN_RESPONSE)); + assertThrows( + IOException.class, + () -> blobContainer.deleteBlobsIgnoringIfNotExists( + randomFrom(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA), + List.of(randomIdentifier()).iterator() + ) + ); + List longHistogramMeasurement = plugin.getLongHistogramMeasurement(METRIC_DELETE_RETRIES_HISTOGRAM); + assertThat(longHistogramMeasurement.get(0).getLong(), equalTo(3L)); + } + private void addErrorStatus(RestStatus... statuses) { - errorStatusQueue.addAll(Arrays.asList(statuses)); + errorResponseQueue.addAll(Arrays.stream(statuses).map(S3ErrorResponse::new).toList()); + } + + private void addErrorStatus(S3ErrorResponse... responses) { + errorResponseQueue.addAll(Arrays.asList(responses)); } private long getLongCounterValue(TestTelemetryPlugin plugin, String instrumentName, Operation operation) { @@ -275,25 +355,25 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest private static class S3MetricErroneousHttpHandler implements DelegatingHttpHandler { private final HttpHandler delegate; - private final Queue errorStatusQueue; + private final Queue errorResponseQueue; - S3MetricErroneousHttpHandler(HttpHandler delegate, Queue errorStatusQueue) { + S3MetricErroneousHttpHandler(HttpHandler delegate, Queue errorResponseQueue) { this.delegate = delegate; - this.errorStatusQueue = errorStatusQueue; + this.errorResponseQueue = errorResponseQueue; } @Override public void handle(HttpExchange exchange) throws IOException { - final RestStatus status = errorStatusQueue.poll(); - if (status == null) { + final S3ErrorResponse errorResponse = errorResponseQueue.poll(); + if (errorResponse == null) { delegate.handle(exchange); - } else if (status == INTERNAL_SERVER_ERROR) { + } else if (errorResponse.status == INTERNAL_SERVER_ERROR) { // Simulate an retryable exception throw new IOException("ouch"); } else { try (exchange) { drainInputStream(exchange.getRequestBody()); - exchange.sendResponseHeaders(status.getStatus(), -1); + errorResponse.writeResponse(exchange); } } } @@ -302,4 +382,22 @@ public class S3BlobStoreRepositoryMetricsTests extends S3BlobStoreRepositoryTest return delegate; } } + + record S3ErrorResponse(RestStatus status, String responseBody) { + + S3ErrorResponse(RestStatus status) { + this(status, null); + } + + @SuppressForbidden(reason = "this test uses a HttpServer to emulate an S3 endpoint") + public void writeResponse(HttpExchange exchange) throws IOException { + if (responseBody != null) { + byte[] responseBytes = responseBody.getBytes(StandardCharsets.UTF_8); + exchange.sendResponseHeaders(status.getStatus(), responseBytes.length); + exchange.getResponseBody().write(responseBytes); + } else { + exchange.sendResponseHeaders(status.getStatus(), -1); + } + } + } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index 3e6b7c356cb1..e2efc926f7e3 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -14,6 +14,7 @@ import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.Request; import com.amazonaws.Response; import com.amazonaws.metrics.RequestMetricCollector; +import com.amazonaws.retry.RetryUtils; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.MultiObjectDeleteException; @@ -25,6 +26,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; @@ -91,6 +93,7 @@ class S3BlobStore implements BlobStore { private final StatsCollectors statsCollectors = new StatsCollectors(); private final int bulkDeletionBatchSize; + private final BackoffPolicy retryThrottledDeleteBackoffPolicy; S3BlobStore( S3Service service, @@ -102,7 +105,8 @@ class S3BlobStore implements BlobStore { RepositoryMetadata repositoryMetadata, BigArrays bigArrays, ThreadPool threadPool, - S3RepositoriesMetrics s3RepositoriesMetrics + S3RepositoriesMetrics s3RepositoriesMetrics, + BackoffPolicy retryThrottledDeleteBackoffPolicy ) { this.service = service; this.bigArrays = bigArrays; @@ -116,7 +120,7 @@ class S3BlobStore implements BlobStore { this.snapshotExecutor = threadPool.executor(ThreadPool.Names.SNAPSHOT); this.s3RepositoriesMetrics = s3RepositoriesMetrics; this.bulkDeletionBatchSize = S3Repository.DELETION_BATCH_SIZE_SETTING.get(repositoryMetadata.settings()); - + this.retryThrottledDeleteBackoffPolicy = retryThrottledDeleteBackoffPolicy; } RequestMetricCollector getMetricCollector(Operation operation, OperationPurpose purpose) { @@ -255,7 +259,8 @@ class S3BlobStore implements BlobStore { private static long getCountForMetric(TimingInfo info, AWSRequestMetrics.Field field) { var count = info.getCounter(field.name()); if (count == null) { - if (field == AWSRequestMetrics.Field.RequestCount) { + // This can be null if the thread was interrupted + if (field == AWSRequestMetrics.Field.RequestCount && Thread.currentThread().isInterrupted() == false) { final String message = "Expected request count to be tracked but found not count."; assert false : message; logger.warn(message); @@ -331,18 +336,18 @@ class S3BlobStore implements BlobStore { } final List partition = new ArrayList<>(); - try (AmazonS3Reference clientReference = clientReference()) { + try { // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes final AtomicReference aex = new AtomicReference<>(); blobNames.forEachRemaining(key -> { partition.add(key); if (partition.size() == bulkDeletionBatchSize) { - deletePartition(purpose, clientReference, partition, aex); + deletePartition(purpose, partition, aex); partition.clear(); } }); if (partition.isEmpty() == false) { - deletePartition(purpose, clientReference, partition, aex); + deletePartition(purpose, partition, aex); } if (aex.get() != null) { throw aex.get(); @@ -352,32 +357,86 @@ class S3BlobStore implements BlobStore { } } - private void deletePartition( - OperationPurpose purpose, - AmazonS3Reference clientReference, - List partition, - AtomicReference aex - ) { - try { - SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(bulkDelete(purpose, this, partition))); - } catch (MultiObjectDeleteException e) { - // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead - // first remove all keys that were sent in the request and then add back those that ran into an exception. - logger.warn( - () -> format( - "Failed to delete some blobs %s", - e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList() - ), - e - ); - aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); - } catch (AmazonClientException e) { - // The AWS client threw any unexpected exception and did not execute the request at all so we do not - // remove any keys from the outstanding deletes set. - aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + /** + * Delete one partition of a batch of blobs + * + * @param purpose The {@link OperationPurpose} of the deletion + * @param partition The list of blobs to delete + * @param aex A holder for any exception(s) thrown during the deletion + */ + private void deletePartition(OperationPurpose purpose, List partition, AtomicReference aex) { + final Iterator retries = retryThrottledDeleteBackoffPolicy.iterator(); + int retryCounter = 0; + while (true) { + try (AmazonS3Reference clientReference = clientReference()) { + SocketAccess.doPrivilegedVoid(() -> clientReference.client().deleteObjects(bulkDelete(purpose, this, partition))); + s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter); + return; + } catch (MultiObjectDeleteException e) { + // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead + // first remove all keys that were sent in the request and then add back those that ran into an exception. + logger.warn( + () -> format( + "Failed to delete some blobs %s", + e.getErrors() + .stream() + .map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]") + .toList() + ), + e + ); + aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + return; + } catch (AmazonClientException e) { + if (shouldRetryDelete(purpose) && RetryUtils.isThrottlingException(e)) { + // S3 is asking us to slow down. Pause for a bit and retry + if (maybeDelayAndRetryDelete(retries)) { + retryCounter++; + } else { + s3RepositoriesMetrics.retryDeletesHistogram().record(retryCounter); + aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + return; + } + } else { + // The AWS client threw any unexpected exception and did not execute the request at all so we do not + // remove any keys from the outstanding deletes set. + aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + return; + } + } } } + /** + * If there are remaining retries, pause for the configured interval then return true + * + * @param retries The retries iterator + * @return true to try the deletion again, false otherwise + */ + private boolean maybeDelayAndRetryDelete(Iterator retries) { + if (retries.hasNext()) { + try { + Thread.sleep(retries.next().millis()); + return true; + } catch (InterruptedException iex) { + Thread.currentThread().interrupt(); + // If we're interrupted, record the exception and abort retries + logger.warn("Aborting tenacious snapshot delete retries due to interrupt"); + } + } else { + logger.warn( + "Exceeded maximum tenacious snapshot delete retries, aborting. Using back-off policy " + + retryThrottledDeleteBackoffPolicy + + ", see the throttled_delete_retry.* S3 repository properties to configure the back-off parameters" + ); + } + return false; + } + + private boolean shouldRetryDelete(OperationPurpose operationPurpose) { + return operationPurpose == OperationPurpose.SNAPSHOT_DATA || operationPurpose == OperationPurpose.SNAPSHOT_METADATA; + } + private static DeleteObjectsRequest bulkDelete(OperationPurpose purpose, S3BlobStore blobStore, List blobs) { final DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(blobStore.bucket()).withKeys( blobs.toArray(Strings.EMPTY_ARRAY) diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java index 74682ca190a0..03106c26c9a2 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3RepositoriesMetrics.java @@ -17,7 +17,8 @@ public record S3RepositoriesMetrics( RepositoriesMetrics common, LongCounter retryStartedCounter, LongCounter retryCompletedCounter, - LongHistogram retryHistogram + LongHistogram retryHistogram, + LongHistogram retryDeletesHistogram ) { public static S3RepositoriesMetrics NOOP = new S3RepositoriesMetrics(RepositoriesMetrics.NOOP); @@ -25,6 +26,7 @@ public record S3RepositoriesMetrics( public static final String METRIC_RETRY_EVENT_TOTAL = "es.repositories.s3.input_stream.retry.event.total"; public static final String METRIC_RETRY_SUCCESS_TOTAL = "es.repositories.s3.input_stream.retry.success.total"; public static final String METRIC_RETRY_ATTEMPTS_HISTOGRAM = "es.repositories.s3.input_stream.retry.attempts.histogram"; + public static final String METRIC_DELETE_RETRIES_HISTOGRAM = "es.repositories.s3.delete.retry.attempts.histogram"; public S3RepositoriesMetrics(RepositoriesMetrics common) { this( @@ -32,7 +34,8 @@ public record S3RepositoriesMetrics( common.meterRegistry().registerLongCounter(METRIC_RETRY_EVENT_TOTAL, "s3 input stream retry event count", "unit"), common.meterRegistry().registerLongCounter(METRIC_RETRY_SUCCESS_TOTAL, "s3 input stream retry success count", "unit"), common.meterRegistry() - .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit") + .registerLongHistogram(METRIC_RETRY_ATTEMPTS_HISTOGRAM, "s3 input stream retry attempts histogram", "unit"), + common.meterRegistry().registerLongHistogram(METRIC_DELETE_RETRIES_HISTOGRAM, "s3 delete retry attempts histogram", "unit") ); } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index af385eeac6a5..0750f6ab59d5 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -16,6 +16,7 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.BackoffPolicy; import org.elasticsearch.common.ReferenceDocs; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; @@ -202,6 +203,26 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.Dynamic ); + /** + * We will retry deletes that fail due to throttling. We use an {@link BackoffPolicy#linearBackoff(TimeValue, int, TimeValue)} + * with the following parameters + */ + static final Setting RETRY_THROTTLED_DELETE_DELAY_INCREMENT = Setting.timeSetting( + "throttled_delete_retry.delay_increment", + TimeValue.timeValueMillis(50), + TimeValue.ZERO + ); + static final Setting RETRY_THROTTLED_DELETE_MAXIMUM_DELAY = Setting.timeSetting( + "throttled_delete_retry.maximum_delay", + TimeValue.timeValueSeconds(5), + TimeValue.ZERO + ); + static final Setting RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES = Setting.intSetting( + "throttled_delete_retry.maximum_number_of_retries", + 10, + 0 + ); + private final S3Service service; private final String bucket; @@ -424,7 +445,12 @@ class S3Repository extends MeteredBlobStoreRepository { metadata, bigArrays, threadPool, - s3RepositoriesMetrics + s3RepositoriesMetrics, + BackoffPolicy.linearBackoff( + RETRY_THROTTLED_DELETE_DELAY_INCREMENT.get(metadata.settings()), + RETRY_THROTTLED_DELETE_MAX_NUMBER_OF_RETRIES.get(metadata.settings()), + RETRY_THROTTLED_DELETE_MAXIMUM_DELAY.get(metadata.settings()) + ) ); } diff --git a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index 1443ff704efd..76d980c222a9 100644 --- a/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/modules/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -10,10 +10,12 @@ package org.elasticsearch.repositories.s3; import fixture.s3.S3HttpHandler; +import com.amazonaws.AbortedException; import com.amazonaws.DnsResolver; import com.amazonaws.SdkClientException; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.util.Base16; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; @@ -21,6 +23,8 @@ import com.sun.net.httpserver.HttpHandler; import org.apache.http.HttpStatus; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.BackoffPolicy; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.OperationPurpose; @@ -62,15 +66,18 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.OptionalInt; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntConsumer; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomNonDataPurpose; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; @@ -98,6 +105,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; @SuppressForbidden(reason = "use a http server") public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase { + private static final int MAX_NUMBER_SNAPSHOT_DELETE_RETRIES = 10; private S3Service service; private AtomicBoolean shouldErrorOnDns; private RecordingMeterRegistry recordingMeterRegistry; @@ -196,7 +204,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes repositoryMetadata, BigArrays.NON_RECYCLING_INSTANCE, new DeterministicTaskQueue().getThreadPool(), - new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)) + new S3RepositoriesMetrics(new RepositoriesMetrics(recordingMeterRegistry)), + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(1), MAX_NUMBER_SNAPSHOT_DELETE_RETRIES) ); return new S3BlobContainer(randomBoolean() ? BlobPath.EMPTY : BlobPath.EMPTY.add("foo"), s3BlobStore) { @Override @@ -771,6 +780,171 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes assertThat(getRetryHistogramMeasurements(), empty()); } + public void testSnapshotDeletesRetryOnThrottlingError() throws IOException { + // disable AWS-client retries + final BlobContainer blobContainer = createBlobContainer(0, null, true, null); + + int numBlobsToDelete = randomIntBetween(500, 3000); + List blobsToDelete = new ArrayList<>(); + for (int i = 0; i < numBlobsToDelete; i++) { + blobsToDelete.add(randomIdentifier()); + } + int throttleTimesBeforeSuccess = randomIntBetween(1, MAX_NUMBER_SNAPSHOT_DELETE_RETRIES); + logger.info("--> Throttling {} times before success", throttleTimesBeforeSuccess); + ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(throttleTimesBeforeSuccess, attempt -> {}); + httpServer.createContext("/", handler); + blobContainer.deleteBlobsIgnoringIfNotExists(randomFrom(operationPurposesThatRetryOnDelete()), blobsToDelete.iterator()); + + int expectedNumberOfBatches = expectedNumberOfBatches(numBlobsToDelete); + assertThat(handler.numberOfDeleteAttempts.get(), equalTo(throttleTimesBeforeSuccess + expectedNumberOfBatches)); + assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(expectedNumberOfBatches)); + } + + public void testSnapshotDeletesAbortRetriesWhenThreadIsInterrupted() { + // disable AWS-client retries + final BlobContainer blobContainer = createBlobContainer(0, null, true, null); + + int numBlobsToDelete = randomIntBetween(500, 3000); + List blobsToDelete = new ArrayList<>(); + for (int i = 0; i < numBlobsToDelete; i++) { + blobsToDelete.add(randomIdentifier()); + } + + final Thread clientThread = Thread.currentThread(); + int interruptBeforeAttempt = randomIntBetween(0, randomIntBetween(1, 10)); + logger.info("--> Deleting {} blobs, interrupting before attempt {}", numBlobsToDelete, interruptBeforeAttempt); + ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> { + if (attempt == interruptBeforeAttempt) { + clientThread.interrupt(); + } + }); + httpServer.createContext("/", handler); + + try { + IOException exception = assertThrows( + IOException.class, + () -> blobContainer.deleteBlobsIgnoringIfNotExists( + randomFrom(operationPurposesThatRetryOnDelete()), + blobsToDelete.iterator() + ) + ); + assertThat(exception.getCause(), instanceOf(AbortedException.class)); + assertThat(handler.numberOfDeleteAttempts.get(), equalTo(interruptBeforeAttempt + 1)); + assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0)); + } finally { + // interrupt should be preserved, clear it to prevent it leaking between tests + assertTrue(Thread.interrupted()); + } + } + + public void testNonSnapshotDeletesAreNotRetried() { + // disable AWS-client retries + final BlobContainer blobContainer = createBlobContainer(0, null, true, null); + + int numBlobsToDelete = randomIntBetween(500, 3000); + List blobsToDelete = new ArrayList<>(); + for (int i = 0; i < numBlobsToDelete; i++) { + blobsToDelete.add(randomIdentifier()); + } + ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> {}); + httpServer.createContext("/", handler); + IOException exception = assertThrows( + IOException.class, + () -> blobContainer.deleteBlobsIgnoringIfNotExists( + randomValueOtherThanMany( + op -> operationPurposesThatRetryOnDelete().contains(op), + () -> randomFrom(OperationPurpose.values()) + ), + blobsToDelete.iterator() + ) + ); + assertEquals( + ThrottlingDeleteHandler.THROTTLING_ERROR_CODE, + asInstanceOf(AmazonS3Exception.class, exception.getCause()).getErrorCode() + ); + assertThat(handler.numberOfDeleteAttempts.get(), equalTo(expectedNumberOfBatches(numBlobsToDelete))); + assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0)); + } + + public void testNonThrottlingErrorsAreNotRetried() { + // disable AWS-client retries + final BlobContainer blobContainer = createBlobContainer(0, null, true, null); + + int numBlobsToDelete = randomIntBetween(500, 3000); + List blobsToDelete = new ArrayList<>(); + for (int i = 0; i < numBlobsToDelete; i++) { + blobsToDelete.add(randomIdentifier()); + } + ThrottlingDeleteHandler handler = new ThrottlingDeleteHandler(Integer.MAX_VALUE, attempt -> {}, "NotThrottling"); + httpServer.createContext("/", handler); + assertThrows( + IOException.class, + () -> blobContainer.deleteBlobsIgnoringIfNotExists(randomFrom(operationPurposesThatRetryOnDelete()), blobsToDelete.iterator()) + ); + assertThat(handler.numberOfDeleteAttempts.get(), equalTo(expectedNumberOfBatches(numBlobsToDelete))); + assertThat(handler.numberOfSuccessfulDeletes.get(), equalTo(0)); + } + + private int expectedNumberOfBatches(int blobsToDelete) { + return (blobsToDelete / 1_000) + (blobsToDelete % 1_000 == 0 ? 0 : 1); + } + + @SuppressForbidden(reason = "use a http server") + private class ThrottlingDeleteHandler extends S3HttpHandler { + + private static final String THROTTLING_ERROR_CODE = "SlowDown"; + + private final AtomicInteger throttleTimesBeforeSuccess; + private final AtomicInteger numberOfDeleteAttempts; + private final AtomicInteger numberOfSuccessfulDeletes; + private final IntConsumer onAttemptCallback; + private final String errorCode; + + ThrottlingDeleteHandler(int throttleTimesBeforeSuccess, IntConsumer onAttemptCallback) { + this(throttleTimesBeforeSuccess, onAttemptCallback, THROTTLING_ERROR_CODE); + } + + ThrottlingDeleteHandler(int throttleTimesBeforeSuccess, IntConsumer onAttemptCallback, String errorCode) { + super("bucket"); + this.numberOfDeleteAttempts = new AtomicInteger(); + this.numberOfSuccessfulDeletes = new AtomicInteger(); + this.throttleTimesBeforeSuccess = new AtomicInteger(throttleTimesBeforeSuccess); + this.onAttemptCallback = onAttemptCallback; + this.errorCode = errorCode; + } + + @Override + public void handle(HttpExchange exchange) throws IOException { + if (exchange.getRequestMethod().equals("POST") && exchange.getRequestURI().toString().startsWith("/bucket/?delete")) { + onAttemptCallback.accept(numberOfDeleteAttempts.get()); + numberOfDeleteAttempts.incrementAndGet(); + if (throttleTimesBeforeSuccess.getAndDecrement() > 0) { + final byte[] responseBytes = Strings.format(""" + + + %s + This is a throttling message + /bucket/ + 4442587FB7D0A2F9 + """, errorCode).getBytes(StandardCharsets.UTF_8); + + exchange.sendResponseHeaders(HttpStatus.SC_SERVICE_UNAVAILABLE, responseBytes.length); + exchange.getResponseBody().write(responseBytes); + exchange.close(); + } else { + numberOfSuccessfulDeletes.incrementAndGet(); + super.handle(exchange); + } + } else { + super.handle(exchange); + } + } + } + + private Set operationPurposesThatRetryOnDelete() { + return Set.of(OperationPurpose.SNAPSHOT_DATA, OperationPurpose.SNAPSHOT_METADATA); + } + @Override protected Matcher getMaxRetriesMatcher(int maxRetries) { // some attempts make meaningful progress and do not count towards the max retry limit diff --git a/server/src/main/java/org/elasticsearch/common/BackoffPolicy.java b/server/src/main/java/org/elasticsearch/common/BackoffPolicy.java index 27d98f9ade20..cacad64ab1d4 100644 --- a/server/src/main/java/org/elasticsearch/common/BackoffPolicy.java +++ b/server/src/main/java/org/elasticsearch/common/BackoffPolicy.java @@ -8,6 +8,7 @@ */ package org.elasticsearch.common; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import java.util.Collections; @@ -81,6 +82,18 @@ public abstract class BackoffPolicy implements Iterable { return new ExponentialBackoff((int) checkDelay(initialDelay).millis(), maxNumberOfRetries); } + /** + * Creates a new linear backoff policy with the provided configuration + * + * @param delayIncrement The amount by which to increment the delay on each retry + * @param maxNumberOfRetries The maximum number of retries + * @param maximumDelay The maximum delay + * @return A backoff policy with linear increase in wait time for retries. + */ + public static BackoffPolicy linearBackoff(TimeValue delayIncrement, int maxNumberOfRetries, TimeValue maximumDelay) { + return new LinearBackoff(delayIncrement, maxNumberOfRetries, maximumDelay); + } + /** * Wraps the backoff policy in one that calls a method every time a new backoff is taken from the policy. */ @@ -100,6 +113,11 @@ public abstract class BackoffPolicy implements Iterable { public Iterator iterator() { return Collections.emptyIterator(); } + + @Override + public String toString() { + return "NoBackoff"; + } } private static class ExponentialBackoff extends BackoffPolicy { @@ -118,6 +136,11 @@ public abstract class BackoffPolicy implements Iterable { public Iterator iterator() { return new ExponentialBackoffIterator(start, numberOfElements); } + + @Override + public String toString() { + return "ExponentialBackoff{start=" + start + ", numberOfElements=" + numberOfElements + '}'; + } } private static class ExponentialBackoffIterator implements Iterator { @@ -163,6 +186,11 @@ public abstract class BackoffPolicy implements Iterable { public Iterator iterator() { return new ConstantBackoffIterator(delay, numberOfElements); } + + @Override + public String toString() { + return "ConstantBackoff{delay=" + delay + ", numberOfElements=" + numberOfElements + '}'; + } } private static final class ConstantBackoffIterator implements Iterator { @@ -203,6 +231,11 @@ public abstract class BackoffPolicy implements Iterable { public Iterator iterator() { return new WrappedBackoffIterator(delegate.iterator(), onBackoff); } + + @Override + public String toString() { + return "WrappedBackoffPolicy{delegate=" + delegate + ", onBackoff=" + onBackoff + '}'; + } } private static final class WrappedBackoffIterator implements Iterator { @@ -228,4 +261,60 @@ public abstract class BackoffPolicy implements Iterable { return delegate.next(); } } + + private static final class LinearBackoff extends BackoffPolicy { + + private final TimeValue delayIncrement; + private final int maxNumberOfRetries; + private final TimeValue maximumDelay; + + private LinearBackoff(TimeValue delayIncrement, int maxNumberOfRetries, @Nullable TimeValue maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public Iterator iterator() { + return new LinearBackoffIterator(delayIncrement, maxNumberOfRetries, maximumDelay); + } + + @Override + public String toString() { + return "LinearBackoff{" + + "delayIncrement=" + + delayIncrement + + ", maxNumberOfRetries=" + + maxNumberOfRetries + + ", maximumDelay=" + + maximumDelay + + '}'; + } + } + + private static final class LinearBackoffIterator implements Iterator { + + private final TimeValue delayIncrement; + private final int maxNumberOfRetries; + private final TimeValue maximumDelay; + private int curr; + + private LinearBackoffIterator(TimeValue delayIncrement, int maxNumberOfRetries, @Nullable TimeValue maximumDelay) { + this.delayIncrement = delayIncrement; + this.maxNumberOfRetries = maxNumberOfRetries; + this.maximumDelay = maximumDelay; + } + + @Override + public boolean hasNext() { + return curr < maxNumberOfRetries; + } + + @Override + public TimeValue next() { + curr++; + TimeValue timeValue = TimeValue.timeValueMillis(curr * delayIncrement.millis()); + return maximumDelay == null ? timeValue : timeValue.compareTo(maximumDelay) < 0 ? timeValue : maximumDelay; + } + } } diff --git a/server/src/test/java/org/elasticsearch/common/BackoffPolicyTests.java b/server/src/test/java/org/elasticsearch/common/BackoffPolicyTests.java index 0cbbcdc0f167..9ffd05a5336d 100644 --- a/server/src/test/java/org/elasticsearch/common/BackoffPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/common/BackoffPolicyTests.java @@ -77,6 +77,37 @@ public class BackoffPolicyTests extends ESTestCase { } } + public void testLinearBackoffWithLimit() { + long incrementMillis = randomIntBetween(10, 500); + long limitMillis = randomIntBetween(1000, 5000); + int maxNumberOfRetries = randomIntBetween(0, 30); + BackoffPolicy timeValues = BackoffPolicy.linearBackoff( + timeValueMillis(incrementMillis), + maxNumberOfRetries, + timeValueMillis(limitMillis) + ); + int counter = 0; + for (TimeValue timeValue : timeValues) { + counter++; + long unlimitedValue = counter * incrementMillis; + long expectedValue = Math.min(unlimitedValue, limitMillis); + assertEquals(timeValueMillis(expectedValue), timeValue); + } + assertEquals(counter, maxNumberOfRetries); + } + + public void testLinearBackoffWithoutLimit() { + long incrementMillis = randomIntBetween(10, 500); + int maxNumberOfRetries = randomIntBetween(0, 30); + BackoffPolicy timeValues = BackoffPolicy.linearBackoff(timeValueMillis(incrementMillis), maxNumberOfRetries, null); + int counter = 0; + for (TimeValue timeValue : timeValues) { + counter++; + assertEquals(timeValueMillis(counter * incrementMillis), timeValue); + } + assertEquals(counter, maxNumberOfRetries); + } + public void testNoBackoff() { BackoffPolicy noBackoff = BackoffPolicy.noBackoff(); int numberOfBackoffsToPerform = randomIntBetween(1, 3);