mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-21 22:00:36 -04:00
Implement test for GCS metrics (#122909)
This commit is contained in:
parent
de41d5704b
commit
b3959b6642
7 changed files with 652 additions and 107 deletions
|
@ -710,6 +710,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
Storage.BlobTargetOption.generationMatch()
|
||||
)
|
||||
);
|
||||
stats.trackPostOperation();
|
||||
return OptionalBytesReference.of(expected);
|
||||
} catch (Exception e) {
|
||||
final var serviceException = unwrapServiceException(e);
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||
import fixture.gcs.GoogleCloudStorageHttpHandler;
|
||||
|
||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
|
||||
import org.elasticsearch.common.BackoffPolicy;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
|
||||
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
|
||||
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.core.IOUtils;
|
||||
import org.elasticsearch.core.SuppressForbidden;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.core.Tuple;
|
||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
||||
|
||||
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||
public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase {
|
||||
private static final String BUCKET = "bucket";
|
||||
private static final ByteSizeValue BUFFER_SIZE = ByteSizeValue.ofKb(128);
|
||||
|
||||
private HttpServer httpServer;
|
||||
private ThreadPool threadPool;
|
||||
private GoogleCloudStorageService googleCloudStorageService;
|
||||
private GoogleCloudStorageHttpHandler googleCloudStorageHttpHandler;
|
||||
private ContainerAndBlobStore containerAndStore;
|
||||
|
||||
@Before
|
||||
public void createStorageService() throws Exception {
|
||||
threadPool = new TestThreadPool(getTestClass().getName());
|
||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||
httpServer.start();
|
||||
googleCloudStorageService = new GoogleCloudStorageService();
|
||||
googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET);
|
||||
httpServer.createContext("/", googleCloudStorageHttpHandler);
|
||||
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
||||
containerAndStore = createBlobContainer(randomIdentifier());
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopHttpServer() {
|
||||
IOUtils.closeWhileHandlingException(containerAndStore);
|
||||
httpServer.stop(0);
|
||||
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleMultipartWrite() throws Exception {
|
||||
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||
|
||||
final String blobName = randomIdentifier();
|
||||
final int blobLength = randomIntBetween(1, (int) store.getLargeBlobThresholdInBytes() - 1);
|
||||
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(blobLength));
|
||||
container.writeBlob(randomPurpose(), blobName, blobContents, true);
|
||||
assertEquals(createStats(1, 0, 0), store.stats());
|
||||
|
||||
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
|
||||
assertEquals(blobContents, Streams.readFully(is));
|
||||
}
|
||||
assertEquals(createStats(1, 0, 1), store.stats());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResumableWrite() throws Exception {
|
||||
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||
|
||||
final String blobName = randomIdentifier();
|
||||
final int size = randomIntBetween((int) store.getLargeBlobThresholdInBytes(), (int) store.getLargeBlobThresholdInBytes() * 2);
|
||||
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(size));
|
||||
container.writeBlob(randomPurpose(), blobName, blobContents, true);
|
||||
assertEquals(createStats(1, 0, 0), store.stats());
|
||||
|
||||
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
|
||||
assertEquals(blobContents, Streams.readFully(is));
|
||||
}
|
||||
assertEquals(createStats(1, 0, 1), store.stats());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteDirectory() throws Exception {
|
||||
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||
|
||||
final String directoryName = randomIdentifier();
|
||||
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
|
||||
final int numberOfFiles = randomIntBetween(1, 20);
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
container.writeBlob(randomPurpose(), String.format("%s/file_%d", directoryName, i), contents, true);
|
||||
}
|
||||
assertEquals(createStats(numberOfFiles, 0, 0), store.stats());
|
||||
|
||||
container.delete(randomPurpose());
|
||||
// We only count the list because we can't track the bulk delete
|
||||
assertEquals(createStats(numberOfFiles, 1, 0), store.stats());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListBlobsAccountsForPaging() throws Exception {
|
||||
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||
|
||||
final int pageSize = randomIntBetween(3, 20);
|
||||
googleCloudStorageHttpHandler.setDefaultPageLimit(pageSize);
|
||||
final int numberOfPages = randomIntBetween(1, 10);
|
||||
final int numberOfObjects = randomIntBetween((numberOfPages - 1) * pageSize, numberOfPages * pageSize - 1);
|
||||
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
|
||||
for (int i = 0; i < numberOfObjects; i++) {
|
||||
container.writeBlob(randomPurpose(), String.format("file_%d", i), contents, true);
|
||||
}
|
||||
assertEquals(createStats(numberOfObjects, 0, 0), store.stats());
|
||||
|
||||
final Map<String, BlobMetadata> stringBlobMetadataMap = container.listBlobs(randomPurpose());
|
||||
assertEquals(numberOfObjects, stringBlobMetadataMap.size());
|
||||
// There should be {numberOfPages} pages of blobs
|
||||
assertEquals(createStats(numberOfObjects, numberOfPages, 0), store.stats());
|
||||
}
|
||||
|
||||
public void testCompareAndSetRegister() {
|
||||
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||
|
||||
// update from empty (adds a single insert)
|
||||
final BytesArray contents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||
final String registerName = randomIdentifier();
|
||||
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, BytesArray.EMPTY, contents, l)));
|
||||
assertEquals(createStats(1, 0, 0), store.stats());
|
||||
|
||||
// successful update from non-null (adds two gets, one insert)
|
||||
final BytesArray nextContents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, contents, nextContents, l)));
|
||||
assertEquals(createStats(2, 0, 2), store.stats());
|
||||
|
||||
// failed update (adds two gets, zero inserts)
|
||||
final BytesArray wrongContents = randomValueOtherThan(
|
||||
nextContents,
|
||||
() -> new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH))
|
||||
);
|
||||
assertFalse(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, wrongContents, contents, l)));
|
||||
assertEquals(createStats(2, 0, 4), store.stats());
|
||||
}
|
||||
|
||||
private Map<String, BlobStoreActionStats> createStats(int insertCount, int listCount, int getCount) {
|
||||
return Map.of(
|
||||
"GetObject",
|
||||
new BlobStoreActionStats(getCount, getCount),
|
||||
"ListObjects",
|
||||
new BlobStoreActionStats(listCount, listCount),
|
||||
"InsertObject",
|
||||
new BlobStoreActionStats(insertCount, insertCount)
|
||||
);
|
||||
}
|
||||
|
||||
private record ContainerAndBlobStore(GoogleCloudStorageBlobContainer blobContainer, GoogleCloudStorageBlobStore blobStore)
|
||||
implements
|
||||
Closeable {
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
blobStore.close();
|
||||
}
|
||||
}
|
||||
|
||||
private ContainerAndBlobStore createBlobContainer(final String repositoryName) throws Exception {
|
||||
final String clientName = randomIdentifier();
|
||||
|
||||
final Tuple<ServiceAccountCredentials, byte[]> serviceAccountCredentialsTuple = GoogleCloudStorageTestUtilities.randomCredential(
|
||||
clientName
|
||||
);
|
||||
final GoogleCloudStorageClientSettings clientSettings = new GoogleCloudStorageClientSettings(
|
||||
serviceAccountCredentialsTuple.v1(),
|
||||
getEndpointForServer(httpServer),
|
||||
PROJECT_ID_SETTING.getDefault(Settings.EMPTY),
|
||||
CONNECT_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
|
||||
READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
|
||||
APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY),
|
||||
new URI(getEndpointForServer(httpServer) + "/token"),
|
||||
null
|
||||
);
|
||||
googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings));
|
||||
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore(
|
||||
BUCKET,
|
||||
clientName,
|
||||
repositoryName,
|
||||
googleCloudStorageService,
|
||||
BigArrays.NON_RECYCLING_INSTANCE,
|
||||
Math.toIntExact(BUFFER_SIZE.getBytes()),
|
||||
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10)
|
||||
);
|
||||
final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer(
|
||||
BlobPath.EMPTY,
|
||||
blobStore
|
||||
);
|
||||
return new ContainerAndBlobStore(googleCloudStorageBlobContainer, blobStore);
|
||||
}
|
||||
|
||||
protected String getEndpointForServer(final HttpServer server) {
|
||||
final InetSocketAddress address = server.getAddress();
|
||||
return "http://" + address.getHostString() + ":" + address.getPort();
|
||||
}
|
||||
}
|
|
@ -8,7 +8,6 @@
|
|||
*/
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.api.services.storage.StorageScopes;
|
||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||
|
||||
import org.apache.http.HttpRequest;
|
||||
|
@ -29,11 +28,9 @@ import java.net.InetSocketAddress;
|
|||
import java.net.Proxy;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyPairGenerator;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -47,6 +44,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
|
|||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.loadCredential;
|
||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageTestUtilities.randomCredential;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
|
||||
|
@ -292,32 +290,6 @@ public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
|
||||
private static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
|
||||
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
||||
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
|
||||
credentialBuilder.setClientId("id_" + clientName);
|
||||
credentialBuilder.setClientEmail(clientName);
|
||||
credentialBuilder.setProjectId("project_id_" + clientName);
|
||||
credentialBuilder.setPrivateKey(keyPair.getPrivate());
|
||||
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
|
||||
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
|
||||
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
|
||||
credentialBuilder.setTokenServerUri(tokenServerUri);
|
||||
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
|
||||
final String serviceAccount = Strings.format("""
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "project_id_%s",
|
||||
"private_key_id": "private_key_id_%s",
|
||||
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
|
||||
"client_email": "%s",
|
||||
"client_id": "id_%s",
|
||||
"token_uri": "%s"
|
||||
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
|
||||
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
private static TimeValue randomTimeout() {
|
||||
return randomFrom(TimeValue.MINUS_ONE, TimeValue.ZERO, randomPositiveTimeValue());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the "Elastic License
|
||||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||
* Public License v 1"; you may not use this file except in compliance with, at
|
||||
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||
*/
|
||||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.api.services.storage.StorageScopes;
|
||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||
|
||||
import org.elasticsearch.core.Strings;
|
||||
import org.elasticsearch.core.Tuple;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.KeyPair;
|
||||
import java.security.KeyPairGenerator;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
|
||||
public class GoogleCloudStorageTestUtilities {
|
||||
|
||||
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
|
||||
public static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
|
||||
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
||||
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
|
||||
credentialBuilder.setClientId("id_" + clientName);
|
||||
credentialBuilder.setClientEmail(clientName);
|
||||
credentialBuilder.setProjectId("project_id_" + clientName);
|
||||
credentialBuilder.setPrivateKey(keyPair.getPrivate());
|
||||
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
|
||||
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
|
||||
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
|
||||
credentialBuilder.setTokenServerUri(tokenServerUri);
|
||||
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
|
||||
final String serviceAccount = Strings.format("""
|
||||
{
|
||||
"type": "service_account",
|
||||
"project_id": "project_id_%s",
|
||||
"private_key_id": "private_key_id_%s",
|
||||
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
|
||||
"client_email": "%s",
|
||||
"client_id": "id_%s",
|
||||
"token_uri": "%s"
|
||||
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
|
||||
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
|
@ -17,29 +17,30 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.util.Maps;
|
||||
import org.elasticsearch.core.SuppressForbidden;
|
||||
import org.elasticsearch.core.Tuple;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.RestUtils;
|
||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||
import org.elasticsearch.xcontent.ToXContent;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xcontent.XContentFactory;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URLDecoder;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
|
||||
|
@ -56,6 +57,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
|
||||
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
|
||||
|
||||
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
|
||||
private final MockGcsBlobStore mockGcsBlobStore;
|
||||
private final String bucket;
|
||||
|
||||
|
@ -64,6 +66,15 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
this.mockGcsBlobStore = new MockGcsBlobStore();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the default page limit
|
||||
*
|
||||
* @param limit The new limit
|
||||
*/
|
||||
public void setDefaultPageLimit(final int limit) {
|
||||
this.defaultPageLimit.set(limit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(final HttpExchange exchange) throws IOException {
|
||||
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
||||
|
@ -82,40 +93,31 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
|
||||
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
|
||||
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
|
||||
final byte[] response = buildBlobInfoJson(blob).getBytes(UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
writeBlobVersionAsJson(exchange, blob);
|
||||
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
|
||||
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
|
||||
final Map<String, String> params = new HashMap<>();
|
||||
RestUtils.decodeQueryString(exchange.getRequestURI(), params);
|
||||
final String prefix = params.getOrDefault("prefix", "");
|
||||
final String delimiter = params.get("delimiter");
|
||||
final int maxResults = Integer.parseInt(params.getOrDefault("maxResults", String.valueOf(defaultPageLimit.get())));
|
||||
final String delimiter = params.getOrDefault("delimiter", "");
|
||||
final String pageToken = params.get("pageToken");
|
||||
|
||||
final Set<String> prefixes = new HashSet<>();
|
||||
final List<String> listOfBlobs = new ArrayList<>();
|
||||
|
||||
for (final Map.Entry<String, MockGcsBlobStore.BlobVersion> blob : mockGcsBlobStore.listBlobs().entrySet()) {
|
||||
final String blobName = blob.getKey();
|
||||
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
|
||||
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
|
||||
if (delimiterPos > -1) {
|
||||
prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
|
||||
} else {
|
||||
listOfBlobs.add(buildBlobInfoJson(blob.getValue()));
|
||||
}
|
||||
}
|
||||
final MockGcsBlobStore.PageOfBlobs pageOfBlobs;
|
||||
if (pageToken != null) {
|
||||
pageOfBlobs = mockGcsBlobStore.listBlobs(pageToken);
|
||||
} else {
|
||||
pageOfBlobs = mockGcsBlobStore.listBlobs(maxResults, delimiter, prefix);
|
||||
}
|
||||
|
||||
byte[] response = (String.format(Locale.ROOT, """
|
||||
{"kind":"storage#objects","items":[%s],"prefixes":[%s]}\
|
||||
""", String.join(",", listOfBlobs), String.join(",", prefixes))).getBytes(UTF_8);
|
||||
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
|
||||
ListBlobsResponse response = new ListBlobsResponse(bucket, pageOfBlobs);
|
||||
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
|
||||
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
BytesReference responseBytes = BytesReference.bytes(builder);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
|
||||
responseBytes.writeTo(exchange.getResponseBody());
|
||||
}
|
||||
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
|
||||
// GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
|
||||
throw new AssertionError("Should not call get bucket API");
|
||||
|
@ -193,10 +195,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
ifGenerationMatch,
|
||||
content.get().v2()
|
||||
);
|
||||
byte[] response = buildBlobInfoJson(newBlobVersion).getBytes(UTF_8);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
||||
exchange.getResponseBody().write(response);
|
||||
writeBlobVersionAsJson(exchange, newBlobVersion);
|
||||
} else {
|
||||
throw new AssertionError(
|
||||
"Could not read multi-part request to ["
|
||||
|
@ -266,6 +265,48 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeBlobVersionAsJson(HttpExchange exchange, MockGcsBlobStore.BlobVersion newBlobVersion) throws IOException {
|
||||
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
|
||||
writeBlobAsXContent(newBlobVersion, builder, bucket);
|
||||
BytesReference responseBytes = BytesReference.bytes(builder);
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
|
||||
responseBytes.writeTo(exchange.getResponseBody());
|
||||
}
|
||||
}
|
||||
|
||||
record ListBlobsResponse(String bucket, MockGcsBlobStore.PageOfBlobs pageOfBlobs) implements ToXContent {
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("kind", "storage#objects");
|
||||
if (pageOfBlobs.nextPageToken() != null) {
|
||||
builder.field("nextPageToken", pageOfBlobs.nextPageToken());
|
||||
}
|
||||
builder.startArray("items");
|
||||
for (MockGcsBlobStore.BlobVersion blobVersion : pageOfBlobs().blobs()) {
|
||||
writeBlobAsXContent(blobVersion, builder, bucket);
|
||||
}
|
||||
builder.endArray();
|
||||
builder.field("prefixes", pageOfBlobs.prefixes());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
||||
private static void writeBlobAsXContent(MockGcsBlobStore.BlobVersion blobVersion, XContentBuilder builder, String bucket)
|
||||
throws IOException {
|
||||
builder.startObject();
|
||||
builder.field("kind", "storage#object");
|
||||
builder.field("bucket", bucket);
|
||||
builder.field("name", blobVersion.path());
|
||||
builder.field("id", blobVersion.path());
|
||||
builder.field("size", String.valueOf(blobVersion.contents().length()));
|
||||
builder.field("generation", String.valueOf(blobVersion.generation()));
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
private void sendError(HttpExchange exchange, MockGcsBlobStore.GcsRestException e) throws IOException {
|
||||
final String responseBody = Strings.format("""
|
||||
{
|
||||
|
@ -280,21 +321,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
|||
exchange.getResponseBody().write(responseBody.getBytes(UTF_8));
|
||||
}
|
||||
|
||||
private String buildBlobInfoJson(MockGcsBlobStore.BlobVersion blobReference) {
|
||||
return String.format(
|
||||
Locale.ROOT,
|
||||
"""
|
||||
{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"%s","generation":"%d"}""",
|
||||
bucket,
|
||||
blobReference.path(),
|
||||
blobReference.path(),
|
||||
blobReference.contents().length(),
|
||||
blobReference.generation()
|
||||
);
|
||||
}
|
||||
|
||||
public Map<String, BytesReference> blobs() {
|
||||
return Maps.transformValues(mockGcsBlobStore.listBlobs(), MockGcsBlobStore.BlobVersion::contents);
|
||||
return mockGcsBlobStore.listBlobs()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(MockGcsBlobStore.BlobVersion::path, MockGcsBlobStore.BlobVersion::contents));
|
||||
}
|
||||
|
||||
private static String httpServerUrl(final HttpExchange exchange) {
|
||||
|
|
|
@ -10,22 +10,30 @@
|
|||
package fixture.gcs;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class MockGcsBlobStore {
|
||||
|
||||
private static final int RESUME_INCOMPLETE = 308;
|
||||
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentHashMap<>();
|
||||
// we use skip-list map so we can do paging right
|
||||
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentSkipListMap<>();
|
||||
private final ConcurrentMap<String, ResumableUpload> resumableUploads = new ConcurrentHashMap<>();
|
||||
|
||||
record BlobVersion(String path, long generation, BytesReference contents) {}
|
||||
|
@ -164,8 +172,114 @@ public class MockGcsBlobStore {
|
|||
blobs.remove(path);
|
||||
}
|
||||
|
||||
Map<String, BlobVersion> listBlobs() {
|
||||
return Map.copyOf(blobs);
|
||||
private String stripPrefixIfPresent(@Nullable String prefix, String toStrip) {
|
||||
if (prefix != null && toStrip.startsWith(prefix)) {
|
||||
return toStrip.substring(prefix.length());
|
||||
}
|
||||
return toStrip;
|
||||
}
|
||||
|
||||
PageOfBlobs listBlobs(String pageToken) {
|
||||
final PageToken parsedToken = PageToken.fromString(pageToken);
|
||||
return calculatePageOfBlobs(parsedToken);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the requested page of blobs taking into account the request parameters
|
||||
*
|
||||
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">Description of prefix/delimiter</a>
|
||||
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list#parameters">List objects parameters</a>
|
||||
* @param pageToken The token containing the prefix/delimiter/maxResults/pageNumber parameters
|
||||
* @return The filtered list
|
||||
*/
|
||||
private PageOfBlobs calculatePageOfBlobs(PageToken pageToken) {
|
||||
final String previousBlob = pageToken.previousBlob();
|
||||
final int maxResults = pageToken.maxResults();
|
||||
final String prefix = pageToken.prefix();
|
||||
final String delimiter = pageToken.delimiter();
|
||||
final SortedSet<String> prefixes = new TreeSet<>();
|
||||
final List<BlobVersion> matchingBlobs = new ArrayList<>();
|
||||
String lastBlobPath = null;
|
||||
for (BlobVersion blob : blobs.values()) {
|
||||
if (Strings.hasLength(previousBlob) && previousBlob.compareTo(blob.path()) >= 0) {
|
||||
continue;
|
||||
}
|
||||
if (blob.path().startsWith(prefix)) {
|
||||
final String pathWithoutPrefix = stripPrefixIfPresent(prefix, blob.path());
|
||||
if (Strings.hasLength(delimiter) && pathWithoutPrefix.contains(delimiter)) {
|
||||
// This seems counter to what is described in the example at the top of
|
||||
// https://cloud.google.com/storage/docs/json_api/v1/objects/list,
|
||||
// but it's required to make the third party tests pass
|
||||
prefixes.add(prefix + pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(delimiter) + 1));
|
||||
} else {
|
||||
matchingBlobs.add(blob);
|
||||
}
|
||||
}
|
||||
lastBlobPath = blob.path();
|
||||
if (prefixes.size() + matchingBlobs.size() == maxResults) {
|
||||
return new PageOfBlobs(
|
||||
new PageToken(prefix, delimiter, maxResults, previousBlob),
|
||||
new ArrayList<>(prefixes),
|
||||
matchingBlobs,
|
||||
lastBlobPath,
|
||||
false
|
||||
);
|
||||
}
|
||||
}
|
||||
return new PageOfBlobs(
|
||||
new PageToken(prefix, delimiter, maxResults, previousBlob),
|
||||
new ArrayList<>(prefixes),
|
||||
matchingBlobs,
|
||||
lastBlobPath,
|
||||
true
|
||||
);
|
||||
}
|
||||
|
||||
PageOfBlobs listBlobs(int maxResults, String delimiter, String prefix) {
|
||||
final PageToken pageToken = new PageToken(prefix, delimiter, maxResults, "");
|
||||
return calculatePageOfBlobs(pageToken);
|
||||
}
|
||||
|
||||
List<BlobVersion> listBlobs() {
|
||||
return new ArrayList<>(blobs.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* We serialise this as a tuple with base64 encoded components so we don't need to escape the delimiter
|
||||
*/
|
||||
record PageToken(String prefix, String delimiter, int maxResults, String previousBlob) {
|
||||
public static PageToken fromString(String pageToken) {
|
||||
final String[] parts = pageToken.split("\\.");
|
||||
assert parts.length == 4;
|
||||
return new PageToken(decode(parts[0]), decode(parts[1]), Integer.parseInt(decode(parts[2])), decode(parts[3]));
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return encode(prefix) + "." + encode(delimiter) + "." + encode(String.valueOf(maxResults)) + "." + encode(previousBlob);
|
||||
}
|
||||
|
||||
public PageToken nextPageToken(String previousBlob) {
|
||||
return new PageToken(prefix, delimiter, maxResults, previousBlob);
|
||||
}
|
||||
|
||||
private static String encode(String value) {
|
||||
return Base64.getEncoder().encodeToString(value.getBytes());
|
||||
}
|
||||
|
||||
private static String decode(String value) {
|
||||
return new String(Base64.getDecoder().decode(value));
|
||||
}
|
||||
}
|
||||
|
||||
record PageOfBlobs(PageToken pageToken, List<String> prefixes, List<BlobVersion> blobs, String lastBlobIncluded, boolean lastPage) {
|
||||
|
||||
public boolean isLastPage() {
|
||||
return lastPage;
|
||||
}
|
||||
|
||||
public String nextPageToken() {
|
||||
return isLastPage() ? null : pageToken.nextPageToken(lastBlobIncluded).toString();
|
||||
}
|
||||
}
|
||||
|
||||
static class BlobNotFoundException extends GcsRestException {
|
||||
|
|
|
@ -22,8 +22,10 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.XContentTestUtils;
|
||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
@ -33,13 +35,19 @@ import java.net.URI;
|
|||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.zip.GZIPOutputStream;
|
||||
|
||||
import static java.util.Objects.requireNonNull;
|
||||
|
||||
public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||
|
||||
private static final String HOST = "http://127.0.0.1:12345";
|
||||
|
@ -72,7 +80,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
|
||||
assertEquals(
|
||||
new TestHttpResponse(RestStatus.OK, "{\"kind\":\"storage#objects\",\"items\":[],\"prefixes\":[]}"),
|
||||
listBlobs(handler, bucket, null)
|
||||
listBlobs(handler, bucket, null, null)
|
||||
);
|
||||
|
||||
final var body = randomAlphaOfLength(50);
|
||||
|
@ -85,14 +93,14 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
|
||||
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
||||
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null));
|
||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null, null));
|
||||
|
||||
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
||||
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/"));
|
||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/", null));
|
||||
|
||||
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path"));
|
||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null));
|
||||
|
||||
assertEquals(
|
||||
new TestHttpResponse(RestStatus.OK, """
|
||||
|
@ -129,7 +137,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
);
|
||||
|
||||
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/"));
|
||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/", null));
|
||||
}
|
||||
|
||||
public void testGetWithBytesRange() {
|
||||
|
@ -422,6 +430,114 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
public void testListObjectsWithPrefix() {
|
||||
final var bucket = randomIdentifier();
|
||||
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||
|
||||
final int numberOfFiles = randomIntBetween(1, 100);
|
||||
final int numberWithMatchingPrefix = randomIntBetween(0, numberOfFiles);
|
||||
final String prefix = randomIdentifier();
|
||||
|
||||
// Create expected state
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
final String blobName;
|
||||
if (i < numberWithMatchingPrefix) {
|
||||
blobName = prefix + "blob_name_" + i;
|
||||
} else {
|
||||
final String nonMatchingPrefix = randomValueOtherThan(prefix, ESTestCase::randomIdentifier);
|
||||
blobName = nonMatchingPrefix + "blob_name_" + i;
|
||||
}
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
executeUpload(handler, bucket, blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||
);
|
||||
}
|
||||
|
||||
TestHttpResponse response = listBlobs(handler, bucket, prefix, null);
|
||||
assertEquals(RestStatus.OK, response.restStatus());
|
||||
|
||||
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||
);
|
||||
assertEquals(numberWithMatchingPrefix, ((List<?>) jsonMapView.get("items")).size());
|
||||
}
|
||||
|
||||
public void testListObjectsWithPrefixAndDelimiter() {
|
||||
final var bucket = randomIdentifier();
|
||||
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||
final var delimiter = randomFrom("/", ".", "+", "\\");
|
||||
final var prefix = randomBoolean() ? "" : randomIdentifier() + delimiter;
|
||||
|
||||
final int numberOfFiles = randomIntBetween(1, 100);
|
||||
final int numberWithDelimiter = randomIntBetween(0, numberOfFiles);
|
||||
|
||||
// Create expected state
|
||||
final Set<String> topLevelDirectories = new HashSet<>();
|
||||
for (int i = 0; i < numberOfFiles; i++) {
|
||||
final String blobName;
|
||||
if (i < numberWithDelimiter) {
|
||||
final String directory = randomAlphaOfLength(3);
|
||||
blobName = directory + delimiter + "blob_name_" + i;
|
||||
topLevelDirectories.add(directory + delimiter);
|
||||
} else {
|
||||
blobName = randomIdentifier() + "_blob_name_" + i;
|
||||
}
|
||||
assertEquals(
|
||||
RestStatus.OK,
|
||||
executeUpload(handler, bucket, prefix + blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||
);
|
||||
}
|
||||
|
||||
final TestHttpResponse response = listBlobs(handler, bucket, prefix, delimiter);
|
||||
assertEquals(RestStatus.OK, response.restStatus());
|
||||
|
||||
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||
);
|
||||
assertEquals(numberOfFiles - numberWithDelimiter, ((List<?>) jsonMapView.get("items")).size());
|
||||
assertEquals(
|
||||
topLevelDirectories.stream().map(d -> prefix + d).collect(Collectors.toSet()),
|
||||
new HashSet<>(jsonMapView.get("prefixes"))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests the example from <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">The docs</a>
|
||||
*/
|
||||
public void testListObjectsExampleFromDocumentation() {
|
||||
final var bucket = randomIdentifier();
|
||||
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||
|
||||
Stream.of("a/b", "a/c", "d", "e", "e/f", "e/g/h")
|
||||
.forEach(
|
||||
path -> assertEquals(
|
||||
RestStatus.OK,
|
||||
executeUpload(handler, bucket, path, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||
)
|
||||
);
|
||||
|
||||
TestHttpResponse response = listBlobs(handler, bucket, null, "/");
|
||||
assertEquals(RestStatus.OK, response.restStatus());
|
||||
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||
);
|
||||
assertEquals(
|
||||
Set.of("d", "e"),
|
||||
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
|
||||
);
|
||||
assertEquals(Set.of("a/", "e/"), new HashSet<>(jsonMapView.get("prefixes")));
|
||||
|
||||
response = listBlobs(handler, bucket, "e/", "/");
|
||||
assertEquals(RestStatus.OK, response.restStatus());
|
||||
jsonMapView = XContentTestUtils.createJsonMapView(new ByteArrayInputStream(BytesReference.toBytes(response.body())));
|
||||
assertEquals(
|
||||
Set.of("e/f"),
|
||||
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
|
||||
);
|
||||
// note this differs from the example, but third party test indicates this is what we get back
|
||||
assertEquals(Set.of("e/g/"), new HashSet<>(jsonMapView.get("prefixes")));
|
||||
}
|
||||
|
||||
private static TestHttpResponse executeUpload(
|
||||
GoogleCloudStorageHttpHandler handler,
|
||||
String bucket,
|
||||
|
@ -449,9 +565,8 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
"POST",
|
||||
"/upload/storage/v1/b/"
|
||||
+ bucket
|
||||
+ "/?uploadType=resumable&name="
|
||||
+ blobName
|
||||
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : "")
|
||||
+ "/"
|
||||
+ generateQueryString("uploadType", "resumable", "name", blobName, "ifGenerationMatch", ifGenerationMatch)
|
||||
);
|
||||
final var locationHeader = createUploadResponse.headers.getFirst("Location");
|
||||
final var sessionURI = locationHeader.substring(locationHeader.indexOf(HOST) + HOST.length());
|
||||
|
@ -476,10 +591,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
return handleRequest(
|
||||
handler,
|
||||
"POST",
|
||||
"/upload/storage/v1/b/"
|
||||
+ bucket
|
||||
+ "/?uploadType=multipart"
|
||||
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : ""),
|
||||
"/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch),
|
||||
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes)
|
||||
);
|
||||
}
|
||||
|
@ -494,11 +606,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
return handleRequest(
|
||||
handler,
|
||||
"GET",
|
||||
"/download/storage/v1/b/"
|
||||
+ bucket
|
||||
+ "/o/"
|
||||
+ blobName
|
||||
+ (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : ""),
|
||||
"/download/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch),
|
||||
BytesArray.EMPTY,
|
||||
range != null ? rangeHeader(range.start(), range.end()) : TestHttpExchange.EMPTY_HEADERS
|
||||
);
|
||||
|
@ -513,23 +621,23 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
return handleRequest(
|
||||
handler,
|
||||
"GET",
|
||||
"/storage/v1/b/" + bucket + "/o/" + blobName + (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : "")
|
||||
"/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch)
|
||||
);
|
||||
}
|
||||
|
||||
private static long getCurrentGeneration(GoogleCloudStorageHttpHandler handler, String bucket, String blobName) {
|
||||
TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
|
||||
final TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
|
||||
assertEquals(RestStatus.OK, blobMetadata.restStatus());
|
||||
Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
|
||||
final Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
|
||||
assertTrue(matcher.find());
|
||||
return Long.parseLong(matcher.group(1));
|
||||
}
|
||||
|
||||
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix) {
|
||||
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix, String delimiter) {
|
||||
return handleRequest(
|
||||
handler,
|
||||
"GET",
|
||||
"/storage/v1/b/" + bucket + "/o" + (prefix != null ? "?prefix=" + URLEncoder.encode(prefix, StandardCharsets.UTF_8) : "")
|
||||
"/storage/v1/b/" + bucket + "/o" + generateQueryString("prefix", prefix, "delimiter", delimiter)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -551,7 +659,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
RestStatus restStatus() {
|
||||
return Objects.requireNonNull(RestStatus.fromCode(status));
|
||||
return requireNonNull(RestStatus.fromCode(status));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -601,7 +709,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
fail(e);
|
||||
}
|
||||
assertNotEquals(0, httpExchange.getResponseCode());
|
||||
var responseHeaders = new Headers();
|
||||
final var responseHeaders = new Headers();
|
||||
httpExchange.getResponseHeaders().forEach((header, values) -> {
|
||||
// com.sun.net.httpserver.Headers.Headers() normalize keys
|
||||
if ("Range".equals(header) || "Content-range".equals(header) || "Location".equals(header)) {
|
||||
|
@ -611,6 +719,35 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
|||
return new TestHttpResponse(httpExchange.getResponseCode(), httpExchange.getResponseBodyContents(), responseHeaders);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a query string for the given parameters
|
||||
*
|
||||
* @param parameters The query parameters as alternating key, value pairs
|
||||
* @return The query string including all parameters with a non-null value (e.g.
|
||||
*/
|
||||
public static String generateQueryString(Object... parameters) {
|
||||
if (parameters.length % 2 != 0) {
|
||||
final String message = "Parameters must be represented as alternating key, value pairs";
|
||||
assert false : message;
|
||||
throw new IllegalArgumentException(message);
|
||||
}
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
for (int i = 0; i < parameters.length; i += 2) {
|
||||
final String key = String.valueOf(requireNonNull(parameters[i], "Parameter names must be non-null strings"));
|
||||
final Object value = parameters[i + 1];
|
||||
if (value != null) {
|
||||
if (builder.isEmpty() == false) {
|
||||
builder.append("&");
|
||||
}
|
||||
builder.append(key).append("=").append(URLEncoder.encode(String.valueOf(value), StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
if (builder.isEmpty() == false) {
|
||||
return "?" + builder;
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private static Headers contentRangeHeader(@Nullable Integer startInclusive, @Nullable Integer endInclusive, @Nullable Integer limit) {
|
||||
final String rangeString = startInclusive != null && endInclusive != null ? startInclusive + "-" + endInclusive : "*";
|
||||
final String limitString = limit == null ? "*" : limit.toString();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue