Make GoogleCloudStorageRetryingInputStream request same generation on resume (#127626)

This commit is contained in:
Nick Tindall 2025-05-22 17:00:20 +10:00 committed by GitHub
parent 61faf42c28
commit 268e39b05b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 197 additions and 20 deletions

View file

@ -15,19 +15,27 @@ import fixture.gcs.TestUtils;
import com.google.cloud.storage.StorageException;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Booleans;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.rest.RestStatus;
import org.junit.ClassRule;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Base64;
import java.util.Collection;
import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ -95,4 +103,37 @@ public class GoogleCloudStorageThirdPartyTests extends AbstractThirdPartyReposit
e -> asInstanceOf(StorageException.class, e.getCause()).getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
);
}
public void testResumeAfterUpdate() {
// The blob needs to be large enough that it won't be entirely buffered on the first request
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(5).getBytes());
final BlobStoreRepository repo = getRepository();
final String blobKey = randomIdentifier();
final byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
executeOnBlobStore(repo, container -> {
container.writeBlob(randomPurpose(), blobKey, new BytesArray(initialValue), true);
try (InputStream inputStream = container.readBlob(randomPurpose(), blobKey)) {
// Trigger the first request for the blob, partially read it
int read = inputStream.read();
assert read != -1;
// Close the current underlying stream (this will force a resume)
asInstanceOf(GoogleCloudStorageRetryingInputStream.class, inputStream).closeCurrentStream();
// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), blobKey, new BytesArray(updatedValue), false);
// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(message, containsString("unavailable on resume (contents changed, or object deleted):"));
} catch (Exception e) {
fail(e);
}
return null;
});
}
}

View file

@ -52,6 +52,7 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;
private Long lastGeneration;
// Used for testing only
GoogleCloudStorageRetryingInputStream(OperationPurpose purpose, MeteredStorage client, BlobId blobId) throws IOException {
@ -83,6 +84,9 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
try {
final var meteredGet = client.meteredObjectsGet(purpose, blobId.getBucket(), blobId.getName());
meteredGet.setReturnRawInputStream(true);
if (lastGeneration != null) {
meteredGet.setGeneration(lastGeneration);
}
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
if (meteredGet.getRequestHeaders() != null) {
@ -90,6 +94,12 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
}
final HttpResponse resp = meteredGet.executeMedia();
// Store the generation of the first response we received, so we can detect
// if the file has changed if we need to resume
if (lastGeneration == null) {
lastGeneration = parseGenerationHeader(resp);
}
final Long contentLength = resp.getHeaders().getContentLength();
InputStream content = resp.getContent();
if (contentLength != null) {
@ -105,10 +115,23 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
} catch (StorageException storageException) {
if (storageException.getCode() == RestStatus.NOT_FOUND.getStatus()) {
if (lastGeneration != null) {
throw addSuppressedExceptions(
new NoSuchFileException(
"Blob object ["
+ blobId.getName()
+ "] generation ["
+ lastGeneration
+ "] unavailable on resume (contents changed, or object deleted): "
+ storageException.getMessage()
)
);
} else {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + storageException.getMessage())
);
}
}
if (storageException.getCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()) {
long currentPosition = Math.addExact(start, currentOffset);
throw addSuppressedExceptions(
@ -124,6 +147,24 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
}
private Long parseGenerationHeader(HttpResponse response) {
final String generationHeader = response.getHeaders().getFirstHeaderStringValue("x-goog-generation");
if (generationHeader != null) {
try {
return Long.parseLong(generationHeader);
} catch (NumberFormatException e) {
final String message = "Unexpected value for x-goog-generation header: " + generationHeader;
logger.warn(message);
assert false : message;
}
} else {
String message = "Missing x-goog-generation header";
logger.warn(message);
assert false : message;
}
return null;
}
// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
// We have to implement our own validation logic here
static final class ContentLengthValidatingInputStream extends FilterInputStream {
@ -203,6 +244,14 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
}
/**
* Close the current stream, used to test resume
*/
// @VisibleForTesting
void closeCurrentStream() throws IOException {
currentStream.close();
}
private void ensureOpen() {
if (closed) {
assert false : "using GoogleCloudStorageRetryingInputStream after close";
@ -210,7 +259,6 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
}
// TODO: check that object did not change when stream is reopened (e.g. based on etag)
private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);

View file

@ -140,6 +140,10 @@ public class MeteredStorage {
get.setReturnRawInputStream(b);
}
public void setGeneration(Long generation) {
get.setGeneration(generation);
}
public HttpHeaders getRequestHeaders() {
return get.getRequestHeaders();
}

View file

@ -19,6 +19,7 @@ import com.google.cloud.ServiceOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.http.HttpStatus;
@ -45,6 +46,7 @@ import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.http.ResponseInjectingHttpHandler;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.repositories.blobstore.AbstractBlobContainerRetriesTestCase;
import org.elasticsearch.repositories.blobstore.ESMockAPIBasedRepositoryIntegTestCase;
import org.elasticsearch.rest.RestStatus;
@ -56,6 +58,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
@ -71,6 +74,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static fixture.gcs.TestUtils.createServiceAccount;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.common.io.Streams.readFully;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase.randomBytes;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageBlobStore.MAX_DELETES_PER_BATCH;
@ -86,6 +90,7 @@ import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
@SuppressForbidden(reason = "use a http server")
public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobContainerRetriesTestCase {
@ -212,6 +217,11 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
);
}
@Override
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(randomNonNegativeInt()));
}
public void testShouldRetryOnConnectionRefused() {
// port 1 should never be open
endpointUrlOverride = "http://127.0.0.1:1";
@ -242,6 +252,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
httpServer.createContext(downloadStorageEndpoint(blobContainer, "large_blob_retries"), exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
addSuccessfulDownloadHeaders(exchange);
final HttpHeaderParser.Range range = getRange(exchange);
final int offset = Math.toIntExact(range.start());
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.end() + 1, bytes.length)));
@ -570,6 +581,55 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
container.delete(randomPurpose());
}
public void testContentsChangeWhileStreaming() throws IOException {
GoogleCloudStorageHttpHandler handler = new GoogleCloudStorageHttpHandler("bucket");
httpServer.createContext("/", handler);
// The blob needs to be large enough that it won't be entirely buffered on the first request
final int enoughBytesToNotBeEntirelyBuffered = Math.toIntExact(ByteSizeValue.ofMb(30).getBytes());
final BlobContainer container = createBlobContainer(1, null, null, null, null, null, null);
final String key = randomIdentifier();
byte[] initialValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), key, new BytesArray(initialValue), true);
BytesReference reference = readFully(container.readBlob(randomPurpose(), key));
assertEquals(new BytesArray(initialValue), reference);
try (InputStream inputStream = container.readBlob(randomPurpose(), key)) {
// Trigger the first chunk to load
int read = inputStream.read();
assert read != -1;
// Restart the server (this triggers a retry)
restartHttpServer();
httpServer.createContext("/", handler);
// Update the file
byte[] updatedValue = randomByteArrayOfLength(enoughBytesToNotBeEntirelyBuffered);
container.writeBlob(randomPurpose(), key, new BytesArray(updatedValue), false);
// Read the rest of the stream, it should throw because the contents changed
String message = assertThrows(NoSuchFileException.class, () -> readFully(inputStream)).getMessage();
assertThat(
message,
startsWith(
"Blob object ["
+ container.path().buildAsString()
+ key
+ "] generation [1] unavailable on resume (contents changed, or object deleted):"
)
);
}
}
private void restartHttpServer() throws IOException {
InetSocketAddress currentAddress = httpServer.getAddress();
httpServer.stop(0);
httpServer = MockHttpServer.createHttp(currentAddress, 0);
httpServer.start();
}
private HttpHandler safeHandler(HttpHandler handler) {
final HttpHandler loggingHandler = ESMockAPIBasedRepositoryIntegTestCase.wrap(handler, logger);
return exchange -> {

View file

@ -170,6 +170,7 @@ public class GoogleCloudStorageRetryingInputStreamTests extends ESTestCase {
result.setContent(content);
result.setContentLength(contentLength);
result.setContentType("application/octet-stream");
result.addHeader("x-goog-generation", String.valueOf(randomNonNegativeInt()));
result.setStatusCode(RestStatus.OK.getStatus());
return result;
}

View file

@ -11,8 +11,6 @@ package fixture.gcs;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
@ -44,8 +42,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
private static final String GENERATION = "generation";
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
private final MockGcsBlobStore mockGcsBlobStore;
@ -82,7 +80,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o/*", request)) {
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch, generation);
writeBlobVersionAsJson(exchange, blob);
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
@ -116,7 +115,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
// Download Object https://cloud.google.com/storage/docs/request-body
final String path = exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch);
final Long generation = parseOptionalLongParameter(exchange, GENERATION);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(path, ifGenerationMatch, generation);
if (blob != null) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
final BytesReference response;
@ -144,6 +144,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
// we implement "metageneration", at that point we must incorporate both
// See: https://cloud.google.com/storage/docs/metadata#etags
exchange.getResponseHeaders().add("ETag", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("x-goog-generation", String.valueOf(blob.generation()));
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
exchange.sendResponseHeaders(statusCode, response.length());
response.writeTo(exchange.getResponseBody());

View file

@ -88,22 +88,32 @@ public class MockGcsBlobStore {
}
}
BlobVersion getBlob(String path, Long ifGenerationMatch) {
/**
* Get the blob at the specified path
*
* @param path The path
* @param ifGenerationMatch The ifGenerationMatch parameter value (if present)
* @param generation The generation parameter value (if present)
* @return The blob if it exists
* @throws BlobNotFoundException if there is no blob at the path, or its generation does not match the generation parameter
* @throws GcsRestException if the blob's generation does not match the ifGenerationMatch parameter
*/
BlobVersion getBlob(String path, Long ifGenerationMatch, Long generation) {
final BlobVersion blob = blobs.get(path);
if (blob == null) {
throw new BlobNotFoundException(path);
} else {
if (ifGenerationMatch != null) {
if (blob.generation != ifGenerationMatch) {
}
if (generation != null && generation != blob.generation) {
throw new BlobNotFoundException(blob.path, blob.generation);
}
if (ifGenerationMatch != null && ifGenerationMatch != blob.generation) {
throw new GcsRestException(
RestStatus.PRECONDITION_FAILED,
"Generation mismatch, expected " + ifGenerationMatch + " but got " + blob.generation
);
}
}
return blob;
}
}
BlobVersion updateBlob(String path, Long ifGenerationMatch, BytesReference contents) {
return blobs.compute(path, (name, existing) -> {
@ -324,6 +334,10 @@ public class MockGcsBlobStore {
BlobNotFoundException(String path) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path);
}
BlobNotFoundException(String path, long generation) {
super(RestStatus.NOT_FOUND, "Blob not found: " + path + ", generation " + generation);
}
}
static class GcsRestException extends RuntimeException {

View file

@ -73,6 +73,11 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
super.tearDown();
}
/**
* Override to add any headers you expect on a successful download
*/
protected void addSuccessfulDownloadHeaders(HttpExchange exchange) {}
protected abstract String downloadStorageEndpoint(BlobContainer container, String blob);
protected abstract String bytesContentType();
@ -118,6 +123,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
addSuccessfulDownloadHeaders(exchange);
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
@ -183,6 +189,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
final int effectiveRangeEnd = Math.min(bytes.length - 1, rangeEnd);
final int length = (effectiveRangeEnd - rangeStart) + 1;
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
addSuccessfulDownloadHeaders(exchange);
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
exchange.getResponseBody().write(bytes, rangeStart, length);
exchange.close();
@ -401,6 +408,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
length = bytes.length - rangeStart;
}
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
addSuccessfulDownloadHeaders(exchange);
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
int minSend = Math.min(0, length - 1);
final int bytesToSend = randomIntBetween(minSend, length - 1);