From 268e39b05b3cc69d7787f4c8b91ef2b23faeca0c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 22 May 2025 17:00:20 +1000 Subject: [PATCH] Make GoogleCloudStorageRetryingInputStream request same generation on resume (#127626) --- .../GoogleCloudStorageThirdPartyTests.java | 41 +++++++++++++ ...GoogleCloudStorageRetryingInputStream.java | 56 +++++++++++++++-- .../repositories/gcs/MeteredStorage.java | 4 ++ ...CloudStorageBlobContainerRetriesTests.java | 60 +++++++++++++++++++ ...eCloudStorageRetryingInputStreamTests.java | 1 + .../gcs/GoogleCloudStorageHttpHandler.java | 11 ++-- .../java/fixture/gcs/MockGcsBlobStore.java | 36 +++++++---- .../AbstractBlobContainerRetriesTestCase.java | 8 +++ 8 files changed, 197 insertions(+), 20 deletions(-) diff --git a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java index f99c488a86b0..520f922280eb 100644 --- a/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java +++ b/modules/repository-gcs/src/internalClusterTest/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageThirdPartyTests.java @@ -15,19 +15,27 @@ import fixture.gcs.TestUtils; import com.google.cloud.storage.StorageException; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.Booleans; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.RestStatus; import org.junit.ClassRule; +import java.io.InputStream; +import java.nio.file.NoSuchFileException; import java.util.Base64; import java.util.Collection; +import static org.elasticsearch.common.io.Streams.readFully; +import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; @@ -95,4 +103,37 @@ public class GoogleCloudStorageThirdPartyTests extends AbstractThirdPartyReposit e -> asInstanceOf(StorageException.class, e.getCause()).getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus() ); } + + public void testResumeAfterUpdate() { + + // The blob needs to be large enough that it won't be entirely buffered on the first request + final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes()); + + final BlobStoreRepository repo = getRepository(); + final String blobKey = randomIdentifier(); + final byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); + executeOnBlobStore(repo, container -> { + container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true); + + try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) { + // Trigger the first request for the blob, partially read it + int read = inputStream.read(); + assert read != -1; + + // Close the current underlying stream (this will force a resume) + asInstanceOf(GoogleCloudStorageRetryingInputStream.class, inputStream).closeCurrentStream(); + + // Update the file + byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); + container.writeBlob(randomPurpose(), blobKey, new BytesArray(updatedValue), false); + + // Read the rest of the stream, it should throw because the contents changed + String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage(); + assertThat(message, containsString("unavailable on resume (contents changed, or object deleted):")); + } catch (Exception e) { + fail(e); + } + return null; + }); + } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index 69ee604a065d..a74e86d8ee67 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { private List failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS); private long currentOffset; private boolean closed; + private Long lastGeneration; // Used for testing only GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException { @@ -83,6 +84,9 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { try { final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName()); meteredGet.setReturnRawInputStream(true); + if (lastGeneration != null) { + meteredGet.setGeneration(lastGeneration); + } if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { if (meteredGet.getRequestHeaders() != null) { @@ -90,6 +94,12 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } final HttpResponse resp = meteredGet.executeMedia(); + // Store the generation of the first response we received, so we can detect + // if the file has changed if we need to resume + if (lastGeneration == null) { + lastGeneration = parseGenerationHeader(resp); + } + final Long contentLength = resp.getHeaders().getContentLength(); InputStream content = resp.getContent(); if (contentLength != null) { @@ -105,9 +115,22 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } catch (StorageException storageException) { if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) { - throw addSuppressedExceptions( - new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()) - ); + if (lastGeneration != null) { + throw addSuppressedExceptions( + new NoSuchFileException( + "Blob object [" + + blobId.getName() + + "] generation [" + + lastGeneration + + "] unavailable on resume (contents changed, or object deleted): " + + storageException.getMessage() + ) + ); + } else { + throw addSuppressedExceptions( + new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage()) + ); + } } if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) { long currentPosition = Math.addExact(start, currentOffset); @@ -124,6 +147,24 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } + private Long parseGenerationHeader(HttpResponse response) { + final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation"); + if (generationHeader != null) { + try { + return Long.parseLong(generationHeader); + } catch (NumberFormatException e) { + final String message = "Unexpected value for x-goog-generation header: " + generationHeader; + logger.warn(message); + assert false : message; + } + } else { + String message = "Missing x-goog-generation header"; + logger.warn(message); + assert false : message; + } + return null; + } + // Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream // We have to implement our own validation logic here static final class ContentLengthValidatingInputStream extends FilterInputStream { @@ -203,6 +244,14 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } + /** + * Close the current stream, used to test resume + */ + // @VisibleForTesting + void closeCurrentStream() throws IOException { + currentStream.close(); + } + private void ensureOpen() { if (closed) { assert false : "using GoogleCloudStorageRetryingInputStream after close"; @@ -210,7 +259,6 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } - // TODO: check that object did not change when stream is reopened (e.g. based on etag) private void reopenStreamOrFail(StorageException e) throws IOException { if (attempt >= maxAttempts) { throw addSuppressedExceptions(e); diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java index 95b5254f1a40..c951deae54f8 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/MeteredStorage.java @@ -140,6 +140,10 @@ public class MeteredStorage { get.setReturnRawInputStream(b); } + public void setGeneration(Long generation) { + get.setGeneration(generation); + } + public HttpHeaders getRequestHeaders() { return get.getRequestHeaders(); } diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 2d6590a17f1d..7b25d6b8ef4e 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -19,6 +19,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.http.HttpTransportOptions; import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; +import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import org.apache.http.HttpStatus; @@ -45,6 +46,7 @@ import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.core.TimeValue; import org.elasticsearch.http.ResponseInjectingHttpHandler; +import org.elasticsearch.mocksocket.MockHttpServer; import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase; import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase; import org.elasticsearch.rest.RestStatus; @@ -56,6 +58,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; @@ -71,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference; import static fixture.gcs.TestUtils.createServiceAccount; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.elasticsearch.common.io.Streams.readFully; import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose; import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes; import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH; @@ -86,6 +90,7 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; @SuppressForbidden(reason = "use a http server") public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase { @@ -212,6 +217,11 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon ); } + @Override + protected void addSuccessfulDownloadHeaders(HttpExchange exchange) { + exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(randomNonNegativeInt())); + } + public void testShouldRetryOnConnectionRefused() { // port 1 should never be open endpointUrlOverride = "http://127.0.0.1:1"; @@ -242,6 +252,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> { Streams.readFully(exchange.getRequestBody()); exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); + addSuccessfulDownloadHeaders(exchange); final HttpHeaderParser.Range range = getRange(exchange); final int offset = Math.toIntExact(range.start()); final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.end() + 1, bytes.length))); @@ -570,6 +581,55 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon container.delete(randomPurpose()); } + public void testContentsChangeWhileStreaming() throws IOException { + GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket"); + httpServer.createContext("/", handler); + // The blob needs to be large enough that it won't be entirely buffered on the first request + final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes()); + + final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null); + + final String key = randomIdentifier(); + byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); + container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true); + + BytesReference reference = readFully(container.readBlob(randomPurpose(), key)); + assertEquals(new BytesArray(initialValue), reference); + + try (InputStream inputStream = container.readBlob(randomPurpose(), key)) { + // Trigger the first chunk to load + int read = inputStream.read(); + assert read != -1; + + // Restart the server (this triggers a retry) + restartHttpServer(); + httpServer.createContext("/", handler); + + // Update the file + byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered); + container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false); + + // Read the rest of the stream, it should throw because the contents changed + String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage(); + assertThat( + message, + startsWith( + "Blob object [" + + container.path().buildAsString() + + key + + "] generation [1] unavailable on resume (contents changed, or object deleted):" + ) + ); + } + } + + private void restartHttpServer() throws IOException { + InetSocketAddress currentAddress = httpServer.getAddress(); + httpServer.stop(0); + httpServer = MockHttpServer.createHttp(currentAddress, 0); + httpServer.start(); + } + private HttpHandler safeHandler(HttpHandler handler) { final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger); return exchange -> { diff --git a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java index 91ea4930c742..2b223c7a1634 100644 --- a/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java +++ b/modules/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStreamTests.java @@ -170,6 +170,7 @@ public class GoogleCloudStorageRetryingInputStreamTests extends ESTestCase { result.setContent(content); result.setContentLength(contentLength); result.setContentType("application/octet-stream"); + result.addHeader("x-goog-generation", String.valueOf(randomNonNegativeInt())); result.setStatusCode(RestStatus.OK.getStatus()); return result; } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index b83d3381b6f9..7a888afac206 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -11,8 +11,6 @@ package fixture.gcs; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; @@ -44,8 +42,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; @SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint") public class GoogleCloudStorageHttpHandler implements HttpHandler { - private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class); private static final String IF_GENERATION_MATCH = "ifGenerationMatch"; + private static final String GENERATION = "generation"; private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000); private final MockGcsBlobStore mockGcsBlobStore; @@ -82,7 +80,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { } else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) { final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", ""); final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); - final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch); + final Long generation = parseOptionalLongParameter(exchange, GENERATION); + final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation); writeBlobVersionAsJson(exchange, blob); } else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) { // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list @@ -116,7 +115,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { // Download Object https://cloud.google.com/storage/docs/request-body final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""); final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH); - final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch); + final Long generation = parseOptionalLongParameter(exchange, GENERATION); + final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation); if (blob != null) { final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); final BytesReference response; @@ -144,6 +144,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { // we implement "metageneration", at that point we must incorporate both // See: https://cloud.google.com/storage/docs/metadata#etags exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation())); + exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(blob.generation())); exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); exchange.sendResponseHeaders(statusCode, response.length()); response.writeTo(exchange.getResponseBody()); diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlobStore.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlobStore.java index b6c4f4ec3d88..3aa62d410927 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlobStore.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/MockGcsBlobStore.java @@ -88,21 +88,31 @@ public class MockGcsBlobStore { } } - BlobVersion getBlob(String path, Long ifGenerationMatch) { + /** + * Get the blob at the specified path + * + * @param path The path + * @param ifGenerationMatch The ifGenerationMatch parameter value (if present) + * @param generation The generation parameter value (if present) + * @return The blob if it exists + * @throws BlobNotFoundException if there is no blob at the path, or its generation does not match the generation parameter + * @throws GcsRestException if the blob's generation does not match the ifGenerationMatch parameter + */ + BlobVersion getBlob(String path, Long ifGenerationMatch, Long generation) { final BlobVersion blob = blobs.get(path); if (blob == null) { throw new BlobNotFoundException(path); - } else { - if (ifGenerationMatch != null) { - if (blob.generation != ifGenerationMatch) { - throw new GcsRestException( - RestStatus.PRECONDITION_FAILED, - "Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation - ); - } - } - return blob; } + if (generation != null && generation != blob.generation) { + throw new BlobNotFoundException(blob.path, blob.generation); + } + if (ifGenerationMatch != null && ifGenerationMatch != blob.generation) { + throw new GcsRestException( + RestStatus.PRECONDITION_FAILED, + "Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation + ); + } + return blob; } BlobVersion updateBlob(String path, Long ifGenerationMatch, BytesReference contents) { @@ -324,6 +334,10 @@ public class MockGcsBlobStore { BlobNotFoundException(String path) { super(RestStatus.NOT_FOUND, "Blob not found: " + path); } + + BlobNotFoundException(String path, long generation) { + super(RestStatus.NOT_FOUND, "Blob not found: " + path + ", generation " + generation); + } } static class GcsRestException extends RuntimeException { diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index 398a7b3db3ee..a70ce9340ad0 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -73,6 +73,11 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { super.tearDown(); } + /** + * Override to add any headers you expect on a successful download + */ + protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {} + protected abstract String downloadStorageEndpoint(BlobContainer container, String blob); protected abstract String bytesContentType(); @@ -118,6 +123,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { if (countDown.countDown()) { final int rangeStart = getRangeStart(exchange); assertThat(rangeStart, lessThan(bytes.length)); + addSuccessfulDownloadHeaders(exchange); exchange.getResponseHeaders().add("Content-Type", bytesContentType()); exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart); exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart); @@ -183,6 +189,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd); final int length = (effectiveRangeEnd - rangeStart) + 1; exchange.getResponseHeaders().add("Content-Type", bytesContentType()); + addSuccessfulDownloadHeaders(exchange); exchange.sendResponseHeaders(HttpStatus.SC_OK, length); exchange.getResponseBody().write(bytes, rangeStart, length); exchange.close(); @@ -401,6 +408,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { length = bytes.length - rangeStart; } exchange.getResponseHeaders().add("Content-Type", bytesContentType()); + addSuccessfulDownloadHeaders(exchange); exchange.sendResponseHeaders(HttpStatus.SC_OK, length); int minSend = Math.min(0, length - 1); final int bytesToSend = randomIntBetween(minSend, length - 1);