Implement test for GCS metrics (#122909)

This commit is contained in:
Nick Tindall 2025-02-21 09:06:28 +11:00 committed by GitHub
parent de41d5704b
commit b3959b6642
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 652 additions and 107 deletions

View file

@ -710,6 +710,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
Storage.BlobTargetOption.generationMatch()
)
);
stats.trackPostOperation();
return OptionalBytesReference.of(expected);
} catch (Exception e) {
final var serviceException = unwrapServiceException(e);

View file

@ -0,0 +1,239 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.repositories.gcs;
import fixture.gcs.FakeOAuth2HttpHandler;
import fixture.gcs.GoogleCloudStorageHttpHandler;
import com.google.auth.oauth2.ServiceAccountCredentials;
import com.sun.net.httpserver.HttpServer;
import org.elasticsearch.common.BackoffPolicy;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.mocksocket.MockHttpServer;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.Closeable;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase {
private static final String BUCKET = "bucket";
private static final ByteSizeValue BUFFER_SIZE = ByteSizeValue.ofKb(128);
private HttpServer httpServer;
private ThreadPool threadPool;
private GoogleCloudStorageService googleCloudStorageService;
private GoogleCloudStorageHttpHandler googleCloudStorageHttpHandler;
private ContainerAndBlobStore containerAndStore;
@Before
public void createStorageService() throws Exception {
threadPool = new TestThreadPool(getTestClass().getName());
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
httpServer.start();
googleCloudStorageService = new GoogleCloudStorageService();
googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET);
httpServer.createContext("/", googleCloudStorageHttpHandler);
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
containerAndStore = createBlobContainer(randomIdentifier());
}
@After
public void stopHttpServer() {
IOUtils.closeWhileHandlingException(containerAndStore);
httpServer.stop(0);
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
}
@Test
public void testSingleMultipartWrite() throws Exception {
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
final String blobName = randomIdentifier();
final int blobLength = randomIntBetween(1, (int) store.getLargeBlobThresholdInBytes() - 1);
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(blobLength));
container.writeBlob(randomPurpose(), blobName, blobContents, true);
assertEquals(createStats(1, 0, 0), store.stats());
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
assertEquals(blobContents, Streams.readFully(is));
}
assertEquals(createStats(1, 0, 1), store.stats());
}
@Test
public void testResumableWrite() throws Exception {
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
final String blobName = randomIdentifier();
final int size = randomIntBetween((int) store.getLargeBlobThresholdInBytes(), (int) store.getLargeBlobThresholdInBytes() * 2);
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(size));
container.writeBlob(randomPurpose(), blobName, blobContents, true);
assertEquals(createStats(1, 0, 0), store.stats());
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
assertEquals(blobContents, Streams.readFully(is));
}
assertEquals(createStats(1, 0, 1), store.stats());
}
@Test
public void testDeleteDirectory() throws Exception {
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
final String directoryName = randomIdentifier();
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
final int numberOfFiles = randomIntBetween(1, 20);
for (int i = 0; i < numberOfFiles; i++) {
container.writeBlob(randomPurpose(), String.format("%s/file_%d", directoryName, i), contents, true);
}
assertEquals(createStats(numberOfFiles, 0, 0), store.stats());
container.delete(randomPurpose());
// We only count the list because we can't track the bulk delete
assertEquals(createStats(numberOfFiles, 1, 0), store.stats());
}
@Test
public void testListBlobsAccountsForPaging() throws Exception {
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
final int pageSize = randomIntBetween(3, 20);
googleCloudStorageHttpHandler.setDefaultPageLimit(pageSize);
final int numberOfPages = randomIntBetween(1, 10);
final int numberOfObjects = randomIntBetween((numberOfPages - 1) * pageSize, numberOfPages * pageSize - 1);
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
for (int i = 0; i < numberOfObjects; i++) {
container.writeBlob(randomPurpose(), String.format("file_%d", i), contents, true);
}
assertEquals(createStats(numberOfObjects, 0, 0), store.stats());
final Map<String, BlobMetadata> stringBlobMetadataMap = container.listBlobs(randomPurpose());
assertEquals(numberOfObjects, stringBlobMetadataMap.size());
// There should be {numberOfPages} pages of blobs
assertEquals(createStats(numberOfObjects, numberOfPages, 0), store.stats());
}
public void testCompareAndSetRegister() {
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
// update from empty (adds a single insert)
final BytesArray contents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
final String registerName = randomIdentifier();
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, BytesArray.EMPTY, contents, l)));
assertEquals(createStats(1, 0, 0), store.stats());
// successful update from non-null (adds two gets, one insert)
final BytesArray nextContents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, contents, nextContents, l)));
assertEquals(createStats(2, 0, 2), store.stats());
// failed update (adds two gets, zero inserts)
final BytesArray wrongContents = randomValueOtherThan(
nextContents,
() -> new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH))
);
assertFalse(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, wrongContents, contents, l)));
assertEquals(createStats(2, 0, 4), store.stats());
}
private Map<String, BlobStoreActionStats> createStats(int insertCount, int listCount, int getCount) {
return Map.of(
"GetObject",
new BlobStoreActionStats(getCount, getCount),
"ListObjects",
new BlobStoreActionStats(listCount, listCount),
"InsertObject",
new BlobStoreActionStats(insertCount, insertCount)
);
}
private record ContainerAndBlobStore(GoogleCloudStorageBlobContainer blobContainer, GoogleCloudStorageBlobStore blobStore)
implements
Closeable {
@Override
public void close() {
blobStore.close();
}
}
private ContainerAndBlobStore createBlobContainer(final String repositoryName) throws Exception {
final String clientName = randomIdentifier();
final Tuple<ServiceAccountCredentials, byte[]> serviceAccountCredentialsTuple = GoogleCloudStorageTestUtilities.randomCredential(
clientName
);
final GoogleCloudStorageClientSettings clientSettings = new GoogleCloudStorageClientSettings(
serviceAccountCredentialsTuple.v1(),
getEndpointForServer(httpServer),
PROJECT_ID_SETTING.getDefault(Settings.EMPTY),
CONNECT_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY),
new URI(getEndpointForServer(httpServer) + "/token"),
null
);
googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings));
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore(
BUCKET,
clientName,
repositoryName,
googleCloudStorageService,
BigArrays.NON_RECYCLING_INSTANCE,
Math.toIntExact(BUFFER_SIZE.getBytes()),
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10)
);
final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer(
BlobPath.EMPTY,
blobStore
);
return new ContainerAndBlobStore(googleCloudStorageBlobContainer, blobStore);
}
protected String getEndpointForServer(final HttpServer server) {
final InetSocketAddress address = server.getAddress();
return "http://" + address.getHostString() + ":" + address.getPort();
}
}

View file

@ -8,7 +8,6 @@
*/
package org.elasticsearch.repositories.gcs;
import com.google.api.services.storage.StorageScopes;
import com.google.auth.oauth2.ServiceAccountCredentials;
import org.apache.http.HttpRequest;
@ -29,11 +28,9 @@ import java.net.InetSocketAddress;
import java.net.Proxy;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@ -47,6 +44,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.loadCredential;
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageTestUtilities.randomCredential;
import static org.hamcrest.Matchers.equalTo;
public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
@ -292,32 +290,6 @@ public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
);
}
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
private static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
credentialBuilder.setClientId("id_" + clientName);
credentialBuilder.setClientEmail(clientName);
credentialBuilder.setProjectId("project_id_" + clientName);
credentialBuilder.setPrivateKey(keyPair.getPrivate());
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
credentialBuilder.setTokenServerUri(tokenServerUri);
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
final String serviceAccount = Strings.format("""
{
"type": "service_account",
"project_id": "project_id_%s",
"private_key_id": "private_key_id_%s",
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
"client_email": "%s",
"client_id": "id_%s",
"token_uri": "%s"
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
}
private static TimeValue randomTimeout() {
return randomFrom(TimeValue.MINUS_ONE, TimeValue.ZERO, randomPositiveTimeValue());
}

View file

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.repositories.gcs;
import com.google.api.services.storage.StorageScopes;
import com.google.auth.oauth2.ServiceAccountCredentials;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.util.Base64;
import java.util.Collections;
public class GoogleCloudStorageTestUtilities {
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
public static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
credentialBuilder.setClientId("id_" + clientName);
credentialBuilder.setClientEmail(clientName);
credentialBuilder.setProjectId("project_id_" + clientName);
credentialBuilder.setPrivateKey(keyPair.getPrivate());
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
credentialBuilder.setTokenServerUri(tokenServerUri);
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
final String serviceAccount = Strings.format("""
{
"type": "service_account",
"project_id": "project_id_%s",
"private_key_id": "private_key_id_%s",
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
"client_email": "%s",
"client_id": "id_%s",
"token_uri": "%s"
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
}
}

View file

@ -17,29 +17,30 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.test.fixture.HttpHeaderParser;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
@ -56,6 +57,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
private final MockGcsBlobStore mockGcsBlobStore;
private final String bucket;
@ -64,6 +66,15 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
this.mockGcsBlobStore = new MockGcsBlobStore();
}
/**
* Set the default page limit
*
* @param limit The new limit
*/
public void setDefaultPageLimit(final int limit) {
this.defaultPageLimit.set(limit);
}
@Override
public void handle(final HttpExchange exchange) throws IOException {
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
@ -82,40 +93,31 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
final byte[] response = buildBlobInfoJson(blob).getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
writeBlobVersionAsJson(exchange, blob);
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
RestUtils.decodeQueryString(exchange.getRequestURI(), params);
final String prefix = params.getOrDefault("prefix", "");
final String delimiter = params.get("delimiter");
final int maxResults = Integer.parseInt(params.getOrDefault("maxResults", String.valueOf(defaultPageLimit.get())));
final String delimiter = params.getOrDefault("delimiter", "");
final String pageToken = params.get("pageToken");
final Set<String> prefixes = new HashSet<>();
final List<String> listOfBlobs = new ArrayList<>();
for (final Map.Entry<String, MockGcsBlobStore.BlobVersion> blob : mockGcsBlobStore.listBlobs().entrySet()) {
final String blobName = blob.getKey();
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
if (delimiterPos > -1) {
prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
} else {
listOfBlobs.add(buildBlobInfoJson(blob.getValue()));
}
}
final MockGcsBlobStore.PageOfBlobs pageOfBlobs;
if (pageToken != null) {
pageOfBlobs = mockGcsBlobStore.listBlobs(pageToken);
} else {
pageOfBlobs = mockGcsBlobStore.listBlobs(maxResults, delimiter, prefix);
}
byte[] response = (String.format(Locale.ROOT, """
{"kind":"storage#objects","items":[%s],"prefixes":[%s]}\
""", String.join(",", listOfBlobs), String.join(",", prefixes))).getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
ListBlobsResponse response = new ListBlobsResponse(bucket, pageOfBlobs);
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
BytesReference responseBytes = BytesReference.bytes(builder);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
responseBytes.writeTo(exchange.getResponseBody());
}
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
// GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
throw new AssertionError("Should not call get bucket API");
@ -193,10 +195,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
ifGenerationMatch,
content.get().v2()
);
byte[] response = buildBlobInfoJson(newBlobVersion).getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
writeBlobVersionAsJson(exchange, newBlobVersion);
} else {
throw new AssertionError(
"Could not read multi-part request to ["
@ -266,6 +265,48 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
}
}
private void writeBlobVersionAsJson(HttpExchange exchange, MockGcsBlobStore.BlobVersion newBlobVersion) throws IOException {
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
writeBlobAsXContent(newBlobVersion, builder, bucket);
BytesReference responseBytes = BytesReference.bytes(builder);
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
responseBytes.writeTo(exchange.getResponseBody());
}
}
record ListBlobsResponse(String bucket, MockGcsBlobStore.PageOfBlobs pageOfBlobs) implements ToXContent {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("kind", "storage#objects");
if (pageOfBlobs.nextPageToken() != null) {
builder.field("nextPageToken", pageOfBlobs.nextPageToken());
}
builder.startArray("items");
for (MockGcsBlobStore.BlobVersion blobVersion : pageOfBlobs().blobs()) {
writeBlobAsXContent(blobVersion, builder, bucket);
}
builder.endArray();
builder.field("prefixes", pageOfBlobs.prefixes());
builder.endObject();
return builder;
}
}
private static void writeBlobAsXContent(MockGcsBlobStore.BlobVersion blobVersion, XContentBuilder builder, String bucket)
throws IOException {
builder.startObject();
builder.field("kind", "storage#object");
builder.field("bucket", bucket);
builder.field("name", blobVersion.path());
builder.field("id", blobVersion.path());
builder.field("size", String.valueOf(blobVersion.contents().length()));
builder.field("generation", String.valueOf(blobVersion.generation()));
builder.endObject();
}
private void sendError(HttpExchange exchange, MockGcsBlobStore.GcsRestException e) throws IOException {
final String responseBody = Strings.format("""
{
@ -280,21 +321,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
exchange.getResponseBody().write(responseBody.getBytes(UTF_8));
}
private String buildBlobInfoJson(MockGcsBlobStore.BlobVersion blobReference) {
return String.format(
Locale.ROOT,
"""
{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"%s","generation":"%d"}""",
bucket,
blobReference.path(),
blobReference.path(),
blobReference.contents().length(),
blobReference.generation()
);
}
public Map<String, BytesReference> blobs() {
return Maps.transformValues(mockGcsBlobStore.listBlobs(), MockGcsBlobStore.BlobVersion::contents);
return mockGcsBlobStore.listBlobs()
.stream()
.collect(Collectors.toMap(MockGcsBlobStore.BlobVersion::path, MockGcsBlobStore.BlobVersion::contents));
}
private static String httpServerUrl(final HttpExchange exchange) {

View file

@ -10,22 +10,30 @@
package fixture.gcs;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.fixture.HttpHeaderParser;
import java.util.Map;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicReference;
public class MockGcsBlobStore {
private static final int RESUME_INCOMPLETE = 308;
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentHashMap<>();
// we use skip-list map so we can do paging right
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentSkipListMap<>();
private final ConcurrentMap<String, ResumableUpload> resumableUploads = new ConcurrentHashMap<>();
record BlobVersion(String path, long generation, BytesReference contents) {}
@ -164,8 +172,114 @@ public class MockGcsBlobStore {
blobs.remove(path);
}
Map<String, BlobVersion> listBlobs() {
return Map.copyOf(blobs);
private String stripPrefixIfPresent(@Nullable String prefix, String toStrip) {
if (prefix != null && toStrip.startsWith(prefix)) {
return toStrip.substring(prefix.length());
}
return toStrip;
}
PageOfBlobs listBlobs(String pageToken) {
final PageToken parsedToken = PageToken.fromString(pageToken);
return calculatePageOfBlobs(parsedToken);
}
/**
* Calculate the requested page of blobs taking into account the request parameters
*
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">Description of prefix/delimiter</a>
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list#parameters">List objects parameters</a>
* @param pageToken The token containing the prefix/delimiter/maxResults/pageNumber parameters
* @return The filtered list
*/
private PageOfBlobs calculatePageOfBlobs(PageToken pageToken) {
final String previousBlob = pageToken.previousBlob();
final int maxResults = pageToken.maxResults();
final String prefix = pageToken.prefix();
final String delimiter = pageToken.delimiter();
final SortedSet<String> prefixes = new TreeSet<>();
final List<BlobVersion> matchingBlobs = new ArrayList<>();
String lastBlobPath = null;
for (BlobVersion blob : blobs.values()) {
if (Strings.hasLength(previousBlob) && previousBlob.compareTo(blob.path()) >= 0) {
continue;
}
if (blob.path().startsWith(prefix)) {
final String pathWithoutPrefix = stripPrefixIfPresent(prefix, blob.path());
if (Strings.hasLength(delimiter) && pathWithoutPrefix.contains(delimiter)) {
// This seems counter to what is described in the example at the top of
// https://cloud.google.com/storage/docs/json_api/v1/objects/list,
// but it's required to make the third party tests pass
prefixes.add(prefix + pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(delimiter) + 1));
} else {
matchingBlobs.add(blob);
}
}
lastBlobPath = blob.path();
if (prefixes.size() + matchingBlobs.size() == maxResults) {
return new PageOfBlobs(
new PageToken(prefix, delimiter, maxResults, previousBlob),
new ArrayList<>(prefixes),
matchingBlobs,
lastBlobPath,
false
);
}
}
return new PageOfBlobs(
new PageToken(prefix, delimiter, maxResults, previousBlob),
new ArrayList<>(prefixes),
matchingBlobs,
lastBlobPath,
true
);
}
PageOfBlobs listBlobs(int maxResults, String delimiter, String prefix) {
final PageToken pageToken = new PageToken(prefix, delimiter, maxResults, "");
return calculatePageOfBlobs(pageToken);
}
List<BlobVersion> listBlobs() {
return new ArrayList<>(blobs.values());
}
/**
* We serialise this as a tuple with base64 encoded components so we don't need to escape the delimiter
*/
record PageToken(String prefix, String delimiter, int maxResults, String previousBlob) {
public static PageToken fromString(String pageToken) {
final String[] parts = pageToken.split("\\.");
assert parts.length == 4;
return new PageToken(decode(parts[0]), decode(parts[1]), Integer.parseInt(decode(parts[2])), decode(parts[3]));
}
public String toString() {
return encode(prefix) + "." + encode(delimiter) + "." + encode(String.valueOf(maxResults)) + "." + encode(previousBlob);
}
public PageToken nextPageToken(String previousBlob) {
return new PageToken(prefix, delimiter, maxResults, previousBlob);
}
private static String encode(String value) {
return Base64.getEncoder().encodeToString(value.getBytes());
}
private static String decode(String value) {
return new String(Base64.getDecoder().decode(value));
}
}
record PageOfBlobs(PageToken pageToken, List<String> prefixes, List<BlobVersion> blobs, String lastBlobIncluded, boolean lastPage) {
public boolean isLastPage() {
return lastPage;
}
public String nextPageToken() {
return isLastPage() ? null : pageToken.nextPageToken(lastBlobIncluded).toString();
}
}
static class BlobNotFoundException extends GcsRestException {

View file

@ -22,8 +22,10 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.XContentTestUtils;
import org.elasticsearch.test.fixture.HttpHeaderParser;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -33,13 +35,19 @@ import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.zip.GZIPOutputStream;
import static java.util.Objects.requireNonNull;
public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
private static final String HOST = "http://127.0.0.1:12345";
@ -72,7 +80,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
assertEquals(
new TestHttpResponse(RestStatus.OK, "{\"kind\":\"storage#objects\",\"items\":[],\"prefixes\":[]}"),
listBlobs(handler, bucket, null)
listBlobs(handler, bucket, null, null)
);
final var body = randomAlphaOfLength(50);
@ -85,14 +93,14 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null));
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null, null));
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/"));
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/", null));
assertEquals(new TestHttpResponse(RestStatus.OK, """
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path"));
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null));
assertEquals(
new TestHttpResponse(RestStatus.OK, """
@ -129,7 +137,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
);
assertEquals(new TestHttpResponse(RestStatus.OK, """
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/"));
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/", null));
}
public void testGetWithBytesRange() {
@ -422,6 +430,114 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
);
}
public void testListObjectsWithPrefix() {
final var bucket = randomIdentifier();
final var handler = new GoogleCloudStorageHttpHandler(bucket);
final int numberOfFiles = randomIntBetween(1, 100);
final int numberWithMatchingPrefix = randomIntBetween(0, numberOfFiles);
final String prefix = randomIdentifier();
// Create expected state
for (int i = 0; i < numberOfFiles; i++) {
final String blobName;
if (i < numberWithMatchingPrefix) {
blobName = prefix + "blob_name_" + i;
} else {
final String nonMatchingPrefix = randomValueOtherThan(prefix, ESTestCase::randomIdentifier);
blobName = nonMatchingPrefix + "blob_name_" + i;
}
assertEquals(
RestStatus.OK,
executeUpload(handler, bucket, blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
);
}
TestHttpResponse response = listBlobs(handler, bucket, prefix, null);
assertEquals(RestStatus.OK, response.restStatus());
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
);
assertEquals(numberWithMatchingPrefix, ((List<?>) jsonMapView.get("items")).size());
}
public void testListObjectsWithPrefixAndDelimiter() {
final var bucket = randomIdentifier();
final var handler = new GoogleCloudStorageHttpHandler(bucket);
final var delimiter = randomFrom("/", ".", "+", "\\");
final var prefix = randomBoolean() ? "" : randomIdentifier() + delimiter;
final int numberOfFiles = randomIntBetween(1, 100);
final int numberWithDelimiter = randomIntBetween(0, numberOfFiles);
// Create expected state
final Set<String> topLevelDirectories = new HashSet<>();
for (int i = 0; i < numberOfFiles; i++) {
final String blobName;
if (i < numberWithDelimiter) {
final String directory = randomAlphaOfLength(3);
blobName = directory + delimiter + "blob_name_" + i;
topLevelDirectories.add(directory + delimiter);
} else {
blobName = randomIdentifier() + "_blob_name_" + i;
}
assertEquals(
RestStatus.OK,
executeUpload(handler, bucket, prefix + blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
);
}
final TestHttpResponse response = listBlobs(handler, bucket, prefix, delimiter);
assertEquals(RestStatus.OK, response.restStatus());
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
);
assertEquals(numberOfFiles - numberWithDelimiter, ((List<?>) jsonMapView.get("items")).size());
assertEquals(
topLevelDirectories.stream().map(d -> prefix + d).collect(Collectors.toSet()),
new HashSet<>(jsonMapView.get("prefixes"))
);
}
/**
* Tests the example from <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">The docs</a>
*/
public void testListObjectsExampleFromDocumentation() {
final var bucket = randomIdentifier();
final var handler = new GoogleCloudStorageHttpHandler(bucket);
Stream.of("a/b", "a/c", "d", "e", "e/f", "e/g/h")
.forEach(
path -> assertEquals(
RestStatus.OK,
executeUpload(handler, bucket, path, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
)
);
TestHttpResponse response = listBlobs(handler, bucket, null, "/");
assertEquals(RestStatus.OK, response.restStatus());
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
);
assertEquals(
Set.of("d", "e"),
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
);
assertEquals(Set.of("a/", "e/"), new HashSet<>(jsonMapView.get("prefixes")));
response = listBlobs(handler, bucket, "e/", "/");
assertEquals(RestStatus.OK, response.restStatus());
jsonMapView = XContentTestUtils.createJsonMapView(new ByteArrayInputStream(BytesReference.toBytes(response.body())));
assertEquals(
Set.of("e/f"),
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
);
// note this differs from the example, but third party test indicates this is what we get back
assertEquals(Set.of("e/g/"), new HashSet<>(jsonMapView.get("prefixes")));
}
private static TestHttpResponse executeUpload(
GoogleCloudStorageHttpHandler handler,
String bucket,
@ -449,9 +565,8 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
"POST",
"/upload/storage/v1/b/"
+ bucket
+ "/?uploadType=resumable&name="
+ blobName
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : "")
+ "/"
+ generateQueryString("uploadType", "resumable", "name", blobName, "ifGenerationMatch", ifGenerationMatch)
);
final var locationHeader = createUploadResponse.headers.getFirst("Location");
final var sessionURI = locationHeader.substring(locationHeader.indexOf(HOST) + HOST.length());
@ -476,10 +591,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return handleRequest(
handler,
"POST",
"/upload/storage/v1/b/"
+ bucket
+ "/?uploadType=multipart"
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : ""),
"/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch),
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes)
);
}
@ -494,11 +606,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return handleRequest(
handler,
"GET",
"/download/storage/v1/b/"
+ bucket
+ "/o/"
+ blobName
+ (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : ""),
"/download/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch),
BytesArray.EMPTY,
range != null ? rangeHeader(range.start(), range.end()) : TestHttpExchange.EMPTY_HEADERS
);
@ -513,23 +621,23 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return handleRequest(
handler,
"GET",
"/storage/v1/b/" + bucket + "/o/" + blobName + (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : "")
"/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch)
);
}
private static long getCurrentGeneration(GoogleCloudStorageHttpHandler handler, String bucket, String blobName) {
TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
final TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
assertEquals(RestStatus.OK, blobMetadata.restStatus());
Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
final Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
assertTrue(matcher.find());
return Long.parseLong(matcher.group(1));
}
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix) {
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix, String delimiter) {
return handleRequest(
handler,
"GET",
"/storage/v1/b/" + bucket + "/o" + (prefix != null ? "?prefix=" + URLEncoder.encode(prefix, StandardCharsets.UTF_8) : "")
"/storage/v1/b/" + bucket + "/o" + generateQueryString("prefix", prefix, "delimiter", delimiter)
);
}
@ -551,7 +659,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
}
RestStatus restStatus() {
return Objects.requireNonNull(RestStatus.fromCode(status));
return requireNonNull(RestStatus.fromCode(status));
}
@Override
@ -601,7 +709,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
fail(e);
}
assertNotEquals(0, httpExchange.getResponseCode());
var responseHeaders = new Headers();
final var responseHeaders = new Headers();
httpExchange.getResponseHeaders().forEach((header, values) -> {
// com.sun.net.httpserver.Headers.Headers() normalize keys
if ("Range".equals(header) || "Content-range".equals(header) || "Location".equals(header)) {
@ -611,6 +719,35 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
return new TestHttpResponse(httpExchange.getResponseCode(), httpExchange.getResponseBodyContents(), responseHeaders);
}
/**
* Generate a query string for the given parameters
*
* @param parameters The query parameters as alternating key, value pairs
* @return The query string including all parameters with a non-null value (e.g.
*/
public static String generateQueryString(Object... parameters) {
if (parameters.length % 2 != 0) {
final String message = "Parameters must be represented as alternating key, value pairs";
assert false : message;
throw new IllegalArgumentException(message);
}
final StringBuilder builder = new StringBuilder();
for (int i = 0; i < parameters.length; i += 2) {
final String key = String.valueOf(requireNonNull(parameters[i], "Parameter names must be non-null strings"));
final Object value = parameters[i + 1];
if (value != null) {
if (builder.isEmpty() == false) {
builder.append("&");
}
builder.append(key).append("=").append(URLEncoder.encode(String.valueOf(value), StandardCharsets.UTF_8));
}
}
if (builder.isEmpty() == false) {
return "?" + builder;
}
return "";
}
private static Headers contentRangeHeader(@Nullable Integer startInclusive, @Nullable Integer endInclusive, @Nullable Integer limit) {
final String rangeString = startInclusive != null && endInclusive != null ? startInclusive + "-" + endInclusive : "*";
final String limitString = limit == null ? "*" : limit.toString();