mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-29 09:54:06 -04:00
Implement test for GCS metrics (#122909)
This commit is contained in:
parent
de41d5704b
commit
b3959b6642
7 changed files with 652 additions and 107 deletions
|
@ -710,6 +710,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
Storage.BlobTargetOption.generationMatch()
|
Storage.BlobTargetOption.generationMatch()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
stats.trackPostOperation();
|
||||||
return OptionalBytesReference.of(expected);
|
return OptionalBytesReference.of(expected);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
final var serviceException = unwrapServiceException(e);
|
final var serviceException = unwrapServiceException(e);
|
||||||
|
|
|
@ -0,0 +1,239 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
|
import fixture.gcs.FakeOAuth2HttpHandler;
|
||||||
|
import fixture.gcs.GoogleCloudStorageHttpHandler;
|
||||||
|
|
||||||
|
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||||
|
import com.sun.net.httpserver.HttpServer;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.BackoffPolicy;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobStoreActionStats;
|
||||||
|
import org.elasticsearch.common.blobstore.support.BlobContainerUtils;
|
||||||
|
import org.elasticsearch.common.blobstore.support.BlobMetadata;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
|
import org.elasticsearch.common.io.Streams;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
|
import org.elasticsearch.core.IOUtils;
|
||||||
|
import org.elasticsearch.core.SuppressForbidden;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
|
import org.elasticsearch.core.Tuple;
|
||||||
|
import org.elasticsearch.mocksocket.MockHttpServer;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.blobstore.BlobStoreTestUtil.randomPurpose;
|
||||||
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.APPLICATION_NAME_SETTING;
|
||||||
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.CONNECT_TIMEOUT_SETTING;
|
||||||
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.PROJECT_ID_SETTING;
|
||||||
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
||||||
|
|
||||||
|
@SuppressForbidden(reason = "Uses a HttpServer to emulate a Google Cloud Storage endpoint")
|
||||||
|
public class GoogleCloudStorageBlobContainerStatsTests extends ESTestCase {
|
||||||
|
private static final String BUCKET = "bucket";
|
||||||
|
private static final ByteSizeValue BUFFER_SIZE = ByteSizeValue.ofKb(128);
|
||||||
|
|
||||||
|
private HttpServer httpServer;
|
||||||
|
private ThreadPool threadPool;
|
||||||
|
private GoogleCloudStorageService googleCloudStorageService;
|
||||||
|
private GoogleCloudStorageHttpHandler googleCloudStorageHttpHandler;
|
||||||
|
private ContainerAndBlobStore containerAndStore;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void createStorageService() throws Exception {
|
||||||
|
threadPool = new TestThreadPool(getTestClass().getName());
|
||||||
|
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0);
|
||||||
|
httpServer.start();
|
||||||
|
googleCloudStorageService = new GoogleCloudStorageService();
|
||||||
|
googleCloudStorageHttpHandler = new GoogleCloudStorageHttpHandler(BUCKET);
|
||||||
|
httpServer.createContext("/", googleCloudStorageHttpHandler);
|
||||||
|
httpServer.createContext("/token", new FakeOAuth2HttpHandler());
|
||||||
|
containerAndStore = createBlobContainer(randomIdentifier());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopHttpServer() {
|
||||||
|
IOUtils.closeWhileHandlingException(containerAndStore);
|
||||||
|
httpServer.stop(0);
|
||||||
|
ThreadPool.terminate(threadPool, 10L, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleMultipartWrite() throws Exception {
|
||||||
|
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||||
|
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||||
|
|
||||||
|
final String blobName = randomIdentifier();
|
||||||
|
final int blobLength = randomIntBetween(1, (int) store.getLargeBlobThresholdInBytes() - 1);
|
||||||
|
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(blobLength));
|
||||||
|
container.writeBlob(randomPurpose(), blobName, blobContents, true);
|
||||||
|
assertEquals(createStats(1, 0, 0), store.stats());
|
||||||
|
|
||||||
|
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
|
||||||
|
assertEquals(blobContents, Streams.readFully(is));
|
||||||
|
}
|
||||||
|
assertEquals(createStats(1, 0, 1), store.stats());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResumableWrite() throws Exception {
|
||||||
|
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||||
|
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||||
|
|
||||||
|
final String blobName = randomIdentifier();
|
||||||
|
final int size = randomIntBetween((int) store.getLargeBlobThresholdInBytes(), (int) store.getLargeBlobThresholdInBytes() * 2);
|
||||||
|
final BytesArray blobContents = new BytesArray(randomByteArrayOfLength(size));
|
||||||
|
container.writeBlob(randomPurpose(), blobName, blobContents, true);
|
||||||
|
assertEquals(createStats(1, 0, 0), store.stats());
|
||||||
|
|
||||||
|
try (InputStream is = container.readBlob(randomPurpose(), blobName)) {
|
||||||
|
assertEquals(blobContents, Streams.readFully(is));
|
||||||
|
}
|
||||||
|
assertEquals(createStats(1, 0, 1), store.stats());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeleteDirectory() throws Exception {
|
||||||
|
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||||
|
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||||
|
|
||||||
|
final String directoryName = randomIdentifier();
|
||||||
|
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
|
||||||
|
final int numberOfFiles = randomIntBetween(1, 20);
|
||||||
|
for (int i = 0; i < numberOfFiles; i++) {
|
||||||
|
container.writeBlob(randomPurpose(), String.format("%s/file_%d", directoryName, i), contents, true);
|
||||||
|
}
|
||||||
|
assertEquals(createStats(numberOfFiles, 0, 0), store.stats());
|
||||||
|
|
||||||
|
container.delete(randomPurpose());
|
||||||
|
// We only count the list because we can't track the bulk delete
|
||||||
|
assertEquals(createStats(numberOfFiles, 1, 0), store.stats());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListBlobsAccountsForPaging() throws Exception {
|
||||||
|
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||||
|
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||||
|
|
||||||
|
final int pageSize = randomIntBetween(3, 20);
|
||||||
|
googleCloudStorageHttpHandler.setDefaultPageLimit(pageSize);
|
||||||
|
final int numberOfPages = randomIntBetween(1, 10);
|
||||||
|
final int numberOfObjects = randomIntBetween((numberOfPages - 1) * pageSize, numberOfPages * pageSize - 1);
|
||||||
|
final BytesArray contents = new BytesArray(randomByteArrayOfLength(50));
|
||||||
|
for (int i = 0; i < numberOfObjects; i++) {
|
||||||
|
container.writeBlob(randomPurpose(), String.format("file_%d", i), contents, true);
|
||||||
|
}
|
||||||
|
assertEquals(createStats(numberOfObjects, 0, 0), store.stats());
|
||||||
|
|
||||||
|
final Map<String, BlobMetadata> stringBlobMetadataMap = container.listBlobs(randomPurpose());
|
||||||
|
assertEquals(numberOfObjects, stringBlobMetadataMap.size());
|
||||||
|
// There should be {numberOfPages} pages of blobs
|
||||||
|
assertEquals(createStats(numberOfObjects, numberOfPages, 0), store.stats());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testCompareAndSetRegister() {
|
||||||
|
final GoogleCloudStorageBlobContainer container = containerAndStore.blobContainer();
|
||||||
|
final GoogleCloudStorageBlobStore store = containerAndStore.blobStore();
|
||||||
|
|
||||||
|
// update from empty (adds a single insert)
|
||||||
|
final BytesArray contents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||||
|
final String registerName = randomIdentifier();
|
||||||
|
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, BytesArray.EMPTY, contents, l)));
|
||||||
|
assertEquals(createStats(1, 0, 0), store.stats());
|
||||||
|
|
||||||
|
// successful update from non-null (adds two gets, one insert)
|
||||||
|
final BytesArray nextContents = new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH));
|
||||||
|
assertTrue(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, contents, nextContents, l)));
|
||||||
|
assertEquals(createStats(2, 0, 2), store.stats());
|
||||||
|
|
||||||
|
// failed update (adds two gets, zero inserts)
|
||||||
|
final BytesArray wrongContents = randomValueOtherThan(
|
||||||
|
nextContents,
|
||||||
|
() -> new BytesArray(randomByteArrayOfLength(BlobContainerUtils.MAX_REGISTER_CONTENT_LENGTH))
|
||||||
|
);
|
||||||
|
assertFalse(safeAwait(l -> container.compareAndSetRegister(randomPurpose(), registerName, wrongContents, contents, l)));
|
||||||
|
assertEquals(createStats(2, 0, 4), store.stats());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, BlobStoreActionStats> createStats(int insertCount, int listCount, int getCount) {
|
||||||
|
return Map.of(
|
||||||
|
"GetObject",
|
||||||
|
new BlobStoreActionStats(getCount, getCount),
|
||||||
|
"ListObjects",
|
||||||
|
new BlobStoreActionStats(listCount, listCount),
|
||||||
|
"InsertObject",
|
||||||
|
new BlobStoreActionStats(insertCount, insertCount)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private record ContainerAndBlobStore(GoogleCloudStorageBlobContainer blobContainer, GoogleCloudStorageBlobStore blobStore)
|
||||||
|
implements
|
||||||
|
Closeable {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
blobStore.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ContainerAndBlobStore createBlobContainer(final String repositoryName) throws Exception {
|
||||||
|
final String clientName = randomIdentifier();
|
||||||
|
|
||||||
|
final Tuple<ServiceAccountCredentials, byte[]> serviceAccountCredentialsTuple = GoogleCloudStorageTestUtilities.randomCredential(
|
||||||
|
clientName
|
||||||
|
);
|
||||||
|
final GoogleCloudStorageClientSettings clientSettings = new GoogleCloudStorageClientSettings(
|
||||||
|
serviceAccountCredentialsTuple.v1(),
|
||||||
|
getEndpointForServer(httpServer),
|
||||||
|
PROJECT_ID_SETTING.getDefault(Settings.EMPTY),
|
||||||
|
CONNECT_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
|
||||||
|
READ_TIMEOUT_SETTING.getDefault(Settings.EMPTY),
|
||||||
|
APPLICATION_NAME_SETTING.getDefault(Settings.EMPTY),
|
||||||
|
new URI(getEndpointForServer(httpServer) + "/token"),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
googleCloudStorageService.refreshAndClearCache(Map.of(clientName, clientSettings));
|
||||||
|
final GoogleCloudStorageBlobStore blobStore = new GoogleCloudStorageBlobStore(
|
||||||
|
BUCKET,
|
||||||
|
clientName,
|
||||||
|
repositoryName,
|
||||||
|
googleCloudStorageService,
|
||||||
|
BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
|
Math.toIntExact(BUFFER_SIZE.getBytes()),
|
||||||
|
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(10), 10)
|
||||||
|
);
|
||||||
|
final GoogleCloudStorageBlobContainer googleCloudStorageBlobContainer = new GoogleCloudStorageBlobContainer(
|
||||||
|
BlobPath.EMPTY,
|
||||||
|
blobStore
|
||||||
|
);
|
||||||
|
return new ContainerAndBlobStore(googleCloudStorageBlobContainer, blobStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getEndpointForServer(final HttpServer server) {
|
||||||
|
final InetSocketAddress address = server.getAddress();
|
||||||
|
return "http://" + address.getHostString() + ":" + address.getPort();
|
||||||
|
}
|
||||||
|
}
|
|
@ -8,7 +8,6 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.repositories.gcs;
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
import com.google.api.services.storage.StorageScopes;
|
|
||||||
import com.google.auth.oauth2.ServiceAccountCredentials;
|
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||||
|
|
||||||
import org.apache.http.HttpRequest;
|
import org.apache.http.HttpRequest;
|
||||||
|
@ -29,11 +28,9 @@ import java.net.InetSocketAddress;
|
||||||
import java.net.Proxy;
|
import java.net.Proxy;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.security.KeyPair;
|
|
||||||
import java.security.KeyPairGenerator;
|
import java.security.KeyPairGenerator;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Base64;
|
import java.util.Base64;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
|
@ -47,6 +44,7 @@ import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSetting
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.READ_TIMEOUT_SETTING;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.getClientSettings;
|
||||||
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.loadCredential;
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageClientSettings.loadCredential;
|
||||||
|
import static org.elasticsearch.repositories.gcs.GoogleCloudStorageTestUtilities.randomCredential;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
|
public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
|
||||||
|
@ -292,32 +290,6 @@ public class GoogleCloudStorageClientSettingsTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
|
|
||||||
private static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
|
|
||||||
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
|
||||||
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
|
|
||||||
credentialBuilder.setClientId("id_" + clientName);
|
|
||||||
credentialBuilder.setClientEmail(clientName);
|
|
||||||
credentialBuilder.setProjectId("project_id_" + clientName);
|
|
||||||
credentialBuilder.setPrivateKey(keyPair.getPrivate());
|
|
||||||
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
|
|
||||||
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
|
|
||||||
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
|
|
||||||
credentialBuilder.setTokenServerUri(tokenServerUri);
|
|
||||||
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
|
|
||||||
final String serviceAccount = Strings.format("""
|
|
||||||
{
|
|
||||||
"type": "service_account",
|
|
||||||
"project_id": "project_id_%s",
|
|
||||||
"private_key_id": "private_key_id_%s",
|
|
||||||
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
|
|
||||||
"client_email": "%s",
|
|
||||||
"client_id": "id_%s",
|
|
||||||
"token_uri": "%s"
|
|
||||||
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
|
|
||||||
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static TimeValue randomTimeout() {
|
private static TimeValue randomTimeout() {
|
||||||
return randomFrom(TimeValue.MINUS_ONE, TimeValue.ZERO, randomPositiveTimeValue());
|
return randomFrom(TimeValue.MINUS_ONE, TimeValue.ZERO, randomPositiveTimeValue());
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
|
import com.google.api.services.storage.StorageScopes;
|
||||||
|
import com.google.auth.oauth2.ServiceAccountCredentials;
|
||||||
|
|
||||||
|
import org.elasticsearch.core.Strings;
|
||||||
|
import org.elasticsearch.core.Tuple;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.security.KeyPair;
|
||||||
|
import java.security.KeyPairGenerator;
|
||||||
|
import java.util.Base64;
|
||||||
|
import java.util.Collections;
|
||||||
|
|
||||||
|
public class GoogleCloudStorageTestUtilities {
|
||||||
|
|
||||||
|
/** Generates a random GoogleCredential along with its corresponding Service Account file provided as a byte array **/
|
||||||
|
public static Tuple<ServiceAccountCredentials, byte[]> randomCredential(final String clientName) throws Exception {
|
||||||
|
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
||||||
|
final ServiceAccountCredentials.Builder credentialBuilder = ServiceAccountCredentials.newBuilder();
|
||||||
|
credentialBuilder.setClientId("id_" + clientName);
|
||||||
|
credentialBuilder.setClientEmail(clientName);
|
||||||
|
credentialBuilder.setProjectId("project_id_" + clientName);
|
||||||
|
credentialBuilder.setPrivateKey(keyPair.getPrivate());
|
||||||
|
credentialBuilder.setPrivateKeyId("private_key_id_" + clientName);
|
||||||
|
credentialBuilder.setScopes(Collections.singleton(StorageScopes.DEVSTORAGE_FULL_CONTROL));
|
||||||
|
URI tokenServerUri = URI.create("http://localhost/oauth2/token");
|
||||||
|
credentialBuilder.setTokenServerUri(tokenServerUri);
|
||||||
|
final String encodedPrivateKey = Base64.getEncoder().encodeToString(keyPair.getPrivate().getEncoded());
|
||||||
|
final String serviceAccount = Strings.format("""
|
||||||
|
{
|
||||||
|
"type": "service_account",
|
||||||
|
"project_id": "project_id_%s",
|
||||||
|
"private_key_id": "private_key_id_%s",
|
||||||
|
"private_key": "-----BEGIN PRIVATE KEY-----\\n%s\\n-----END PRIVATE KEY-----\\n",
|
||||||
|
"client_email": "%s",
|
||||||
|
"client_id": "id_%s",
|
||||||
|
"token_uri": "%s"
|
||||||
|
}""", clientName, clientName, encodedPrivateKey, clientName, clientName, tokenServerUri);
|
||||||
|
return Tuple.tuple(credentialBuilder.build(), serviceAccount.getBytes(StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,29 +17,30 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
import org.elasticsearch.common.io.Streams;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.common.util.Maps;
|
|
||||||
import org.elasticsearch.core.SuppressForbidden;
|
import org.elasticsearch.core.SuppressForbidden;
|
||||||
import org.elasticsearch.core.Tuple;
|
import org.elasticsearch.core.Tuple;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.RestUtils;
|
import org.elasticsearch.rest.RestUtils;
|
||||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||||
|
import org.elasticsearch.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.xcontent.XContentFactory;
|
||||||
|
import org.elasticsearch.xcontent.XContentType;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.net.URLDecoder;
|
import java.net.URLDecoder;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import java.util.zip.GZIPInputStream;
|
import java.util.zip.GZIPInputStream;
|
||||||
|
|
||||||
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
|
import static fixture.gcs.MockGcsBlobStore.failAndThrow;
|
||||||
|
@ -56,6 +57,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
|
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageHttpHandler.class);
|
||||||
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
|
private static final String IF_GENERATION_MATCH = "ifGenerationMatch";
|
||||||
|
|
||||||
|
private final AtomicInteger defaultPageLimit = new AtomicInteger(1_000);
|
||||||
private final MockGcsBlobStore mockGcsBlobStore;
|
private final MockGcsBlobStore mockGcsBlobStore;
|
||||||
private final String bucket;
|
private final String bucket;
|
||||||
|
|
||||||
|
@ -64,6 +66,15 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
this.mockGcsBlobStore = new MockGcsBlobStore();
|
this.mockGcsBlobStore = new MockGcsBlobStore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the default page limit
|
||||||
|
*
|
||||||
|
* @param limit The new limit
|
||||||
|
*/
|
||||||
|
public void setDefaultPageLimit(final int limit) {
|
||||||
|
this.defaultPageLimit.set(limit);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(final HttpExchange exchange) throws IOException {
|
public void handle(final HttpExchange exchange) throws IOException {
|
||||||
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
final String request = exchange.getRequestMethod() + " " + exchange.getRequestURI().toString();
|
||||||
|
@ -82,40 +93,31 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
|
final String key = exchange.getRequestURI().getPath().replace("/storage/v1/b/" + bucket + "/o/", "");
|
||||||
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
|
final Long ifGenerationMatch = parseOptionalLongParameter(exchange, IF_GENERATION_MATCH);
|
||||||
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
|
final MockGcsBlobStore.BlobVersion blob = mockGcsBlobStore.getBlob(key, ifGenerationMatch);
|
||||||
final byte[] response = buildBlobInfoJson(blob).getBytes(UTF_8);
|
writeBlobVersionAsJson(exchange, blob);
|
||||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
|
||||||
exchange.getResponseBody().write(response);
|
|
||||||
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
|
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
|
||||||
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
|
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
|
||||||
final Map<String, String> params = new HashMap<>();
|
final Map<String, String> params = new HashMap<>();
|
||||||
RestUtils.decodeQueryString(exchange.getRequestURI(), params);
|
RestUtils.decodeQueryString(exchange.getRequestURI(), params);
|
||||||
final String prefix = params.getOrDefault("prefix", "");
|
final String prefix = params.getOrDefault("prefix", "");
|
||||||
final String delimiter = params.get("delimiter");
|
final int maxResults = Integer.parseInt(params.getOrDefault("maxResults", String.valueOf(defaultPageLimit.get())));
|
||||||
|
final String delimiter = params.getOrDefault("delimiter", "");
|
||||||
|
final String pageToken = params.get("pageToken");
|
||||||
|
|
||||||
final Set<String> prefixes = new HashSet<>();
|
final MockGcsBlobStore.PageOfBlobs pageOfBlobs;
|
||||||
final List<String> listOfBlobs = new ArrayList<>();
|
if (pageToken != null) {
|
||||||
|
pageOfBlobs = mockGcsBlobStore.listBlobs(pageToken);
|
||||||
for (final Map.Entry<String, MockGcsBlobStore.BlobVersion> blob : mockGcsBlobStore.listBlobs().entrySet()) {
|
} else {
|
||||||
final String blobName = blob.getKey();
|
pageOfBlobs = mockGcsBlobStore.listBlobs(maxResults, delimiter, prefix);
|
||||||
if (prefix.isEmpty() || blobName.startsWith(prefix)) {
|
|
||||||
int delimiterPos = (delimiter != null) ? blobName.substring(prefix.length()).indexOf(delimiter) : -1;
|
|
||||||
if (delimiterPos > -1) {
|
|
||||||
prefixes.add("\"" + blobName.substring(0, prefix.length() + delimiterPos + 1) + "\"");
|
|
||||||
} else {
|
|
||||||
listOfBlobs.add(buildBlobInfoJson(blob.getValue()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
byte[] response = (String.format(Locale.ROOT, """
|
ListBlobsResponse response = new ListBlobsResponse(bucket, pageOfBlobs);
|
||||||
{"kind":"storage#objects","items":[%s],"prefixes":[%s]}\
|
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
|
||||||
""", String.join(",", listOfBlobs), String.join(",", prefixes))).getBytes(UTF_8);
|
response.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||||
|
BytesReference responseBytes = BytesReference.bytes(builder);
|
||||||
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
|
||||||
exchange.getResponseBody().write(response);
|
responseBytes.writeTo(exchange.getResponseBody());
|
||||||
|
}
|
||||||
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
|
} else if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "*", request)) {
|
||||||
// GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
|
// GET Bucket https://cloud.google.com/storage/docs/json_api/v1/buckets/get
|
||||||
throw new AssertionError("Should not call get bucket API");
|
throw new AssertionError("Should not call get bucket API");
|
||||||
|
@ -193,10 +195,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
ifGenerationMatch,
|
ifGenerationMatch,
|
||||||
content.get().v2()
|
content.get().v2()
|
||||||
);
|
);
|
||||||
byte[] response = buildBlobInfoJson(newBlobVersion).getBytes(UTF_8);
|
writeBlobVersionAsJson(exchange, newBlobVersion);
|
||||||
exchange.getResponseHeaders().add("Content-Type", "application/json");
|
|
||||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
|
|
||||||
exchange.getResponseBody().write(response);
|
|
||||||
} else {
|
} else {
|
||||||
throw new AssertionError(
|
throw new AssertionError(
|
||||||
"Could not read multi-part request to ["
|
"Could not read multi-part request to ["
|
||||||
|
@ -266,6 +265,48 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void writeBlobVersionAsJson(HttpExchange exchange, MockGcsBlobStore.BlobVersion newBlobVersion) throws IOException {
|
||||||
|
try (XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON)) {
|
||||||
|
writeBlobAsXContent(newBlobVersion, builder, bucket);
|
||||||
|
BytesReference responseBytes = BytesReference.bytes(builder);
|
||||||
|
exchange.getResponseHeaders().add("Content-Type", "application/json; charset=utf-8");
|
||||||
|
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), responseBytes.length());
|
||||||
|
responseBytes.writeTo(exchange.getResponseBody());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
record ListBlobsResponse(String bucket, MockGcsBlobStore.PageOfBlobs pageOfBlobs) implements ToXContent {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("kind", "storage#objects");
|
||||||
|
if (pageOfBlobs.nextPageToken() != null) {
|
||||||
|
builder.field("nextPageToken", pageOfBlobs.nextPageToken());
|
||||||
|
}
|
||||||
|
builder.startArray("items");
|
||||||
|
for (MockGcsBlobStore.BlobVersion blobVersion : pageOfBlobs().blobs()) {
|
||||||
|
writeBlobAsXContent(blobVersion, builder, bucket);
|
||||||
|
}
|
||||||
|
builder.endArray();
|
||||||
|
builder.field("prefixes", pageOfBlobs.prefixes());
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void writeBlobAsXContent(MockGcsBlobStore.BlobVersion blobVersion, XContentBuilder builder, String bucket)
|
||||||
|
throws IOException {
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("kind", "storage#object");
|
||||||
|
builder.field("bucket", bucket);
|
||||||
|
builder.field("name", blobVersion.path());
|
||||||
|
builder.field("id", blobVersion.path());
|
||||||
|
builder.field("size", String.valueOf(blobVersion.contents().length()));
|
||||||
|
builder.field("generation", String.valueOf(blobVersion.generation()));
|
||||||
|
builder.endObject();
|
||||||
|
}
|
||||||
|
|
||||||
private void sendError(HttpExchange exchange, MockGcsBlobStore.GcsRestException e) throws IOException {
|
private void sendError(HttpExchange exchange, MockGcsBlobStore.GcsRestException e) throws IOException {
|
||||||
final String responseBody = Strings.format("""
|
final String responseBody = Strings.format("""
|
||||||
{
|
{
|
||||||
|
@ -280,21 +321,10 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
|
||||||
exchange.getResponseBody().write(responseBody.getBytes(UTF_8));
|
exchange.getResponseBody().write(responseBody.getBytes(UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildBlobInfoJson(MockGcsBlobStore.BlobVersion blobReference) {
|
|
||||||
return String.format(
|
|
||||||
Locale.ROOT,
|
|
||||||
"""
|
|
||||||
{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"%s","generation":"%d"}""",
|
|
||||||
bucket,
|
|
||||||
blobReference.path(),
|
|
||||||
blobReference.path(),
|
|
||||||
blobReference.contents().length(),
|
|
||||||
blobReference.generation()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, BytesReference> blobs() {
|
public Map<String, BytesReference> blobs() {
|
||||||
return Maps.transformValues(mockGcsBlobStore.listBlobs(), MockGcsBlobStore.BlobVersion::contents);
|
return mockGcsBlobStore.listBlobs()
|
||||||
|
.stream()
|
||||||
|
.collect(Collectors.toMap(MockGcsBlobStore.BlobVersion::path, MockGcsBlobStore.BlobVersion::contents));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String httpServerUrl(final HttpExchange exchange) {
|
private static String httpServerUrl(final HttpExchange exchange) {
|
||||||
|
|
|
@ -10,22 +10,30 @@
|
||||||
package fixture.gcs;
|
package fixture.gcs;
|
||||||
|
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.UUIDs;
|
import org.elasticsearch.common.UUIDs;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
import org.elasticsearch.common.bytes.CompositeBytesReference;
|
||||||
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||||
|
|
||||||
import java.util.Map;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Base64;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ConcurrentSkipListMap;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class MockGcsBlobStore {
|
public class MockGcsBlobStore {
|
||||||
|
|
||||||
private static final int RESUME_INCOMPLETE = 308;
|
private static final int RESUME_INCOMPLETE = 308;
|
||||||
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentHashMap<>();
|
// we use skip-list map so we can do paging right
|
||||||
|
private final ConcurrentMap<String, BlobVersion> blobs = new ConcurrentSkipListMap<>();
|
||||||
private final ConcurrentMap<String, ResumableUpload> resumableUploads = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, ResumableUpload> resumableUploads = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
record BlobVersion(String path, long generation, BytesReference contents) {}
|
record BlobVersion(String path, long generation, BytesReference contents) {}
|
||||||
|
@ -164,8 +172,114 @@ public class MockGcsBlobStore {
|
||||||
blobs.remove(path);
|
blobs.remove(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
Map<String, BlobVersion> listBlobs() {
|
private String stripPrefixIfPresent(@Nullable String prefix, String toStrip) {
|
||||||
return Map.copyOf(blobs);
|
if (prefix != null && toStrip.startsWith(prefix)) {
|
||||||
|
return toStrip.substring(prefix.length());
|
||||||
|
}
|
||||||
|
return toStrip;
|
||||||
|
}
|
||||||
|
|
||||||
|
PageOfBlobs listBlobs(String pageToken) {
|
||||||
|
final PageToken parsedToken = PageToken.fromString(pageToken);
|
||||||
|
return calculatePageOfBlobs(parsedToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculate the requested page of blobs taking into account the request parameters
|
||||||
|
*
|
||||||
|
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">Description of prefix/delimiter</a>
|
||||||
|
* @see <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list#parameters">List objects parameters</a>
|
||||||
|
* @param pageToken The token containing the prefix/delimiter/maxResults/pageNumber parameters
|
||||||
|
* @return The filtered list
|
||||||
|
*/
|
||||||
|
private PageOfBlobs calculatePageOfBlobs(PageToken pageToken) {
|
||||||
|
final String previousBlob = pageToken.previousBlob();
|
||||||
|
final int maxResults = pageToken.maxResults();
|
||||||
|
final String prefix = pageToken.prefix();
|
||||||
|
final String delimiter = pageToken.delimiter();
|
||||||
|
final SortedSet<String> prefixes = new TreeSet<>();
|
||||||
|
final List<BlobVersion> matchingBlobs = new ArrayList<>();
|
||||||
|
String lastBlobPath = null;
|
||||||
|
for (BlobVersion blob : blobs.values()) {
|
||||||
|
if (Strings.hasLength(previousBlob) && previousBlob.compareTo(blob.path()) >= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (blob.path().startsWith(prefix)) {
|
||||||
|
final String pathWithoutPrefix = stripPrefixIfPresent(prefix, blob.path());
|
||||||
|
if (Strings.hasLength(delimiter) && pathWithoutPrefix.contains(delimiter)) {
|
||||||
|
// This seems counter to what is described in the example at the top of
|
||||||
|
// https://cloud.google.com/storage/docs/json_api/v1/objects/list,
|
||||||
|
// but it's required to make the third party tests pass
|
||||||
|
prefixes.add(prefix + pathWithoutPrefix.substring(0, pathWithoutPrefix.indexOf(delimiter) + 1));
|
||||||
|
} else {
|
||||||
|
matchingBlobs.add(blob);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastBlobPath = blob.path();
|
||||||
|
if (prefixes.size() + matchingBlobs.size() == maxResults) {
|
||||||
|
return new PageOfBlobs(
|
||||||
|
new PageToken(prefix, delimiter, maxResults, previousBlob),
|
||||||
|
new ArrayList<>(prefixes),
|
||||||
|
matchingBlobs,
|
||||||
|
lastBlobPath,
|
||||||
|
false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new PageOfBlobs(
|
||||||
|
new PageToken(prefix, delimiter, maxResults, previousBlob),
|
||||||
|
new ArrayList<>(prefixes),
|
||||||
|
matchingBlobs,
|
||||||
|
lastBlobPath,
|
||||||
|
true
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
PageOfBlobs listBlobs(int maxResults, String delimiter, String prefix) {
|
||||||
|
final PageToken pageToken = new PageToken(prefix, delimiter, maxResults, "");
|
||||||
|
return calculatePageOfBlobs(pageToken);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<BlobVersion> listBlobs() {
|
||||||
|
return new ArrayList<>(blobs.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We serialise this as a tuple with base64 encoded components so we don't need to escape the delimiter
|
||||||
|
*/
|
||||||
|
record PageToken(String prefix, String delimiter, int maxResults, String previousBlob) {
|
||||||
|
public static PageToken fromString(String pageToken) {
|
||||||
|
final String[] parts = pageToken.split("\\.");
|
||||||
|
assert parts.length == 4;
|
||||||
|
return new PageToken(decode(parts[0]), decode(parts[1]), Integer.parseInt(decode(parts[2])), decode(parts[3]));
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return encode(prefix) + "." + encode(delimiter) + "." + encode(String.valueOf(maxResults)) + "." + encode(previousBlob);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PageToken nextPageToken(String previousBlob) {
|
||||||
|
return new PageToken(prefix, delimiter, maxResults, previousBlob);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String encode(String value) {
|
||||||
|
return Base64.getEncoder().encodeToString(value.getBytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String decode(String value) {
|
||||||
|
return new String(Base64.getDecoder().decode(value));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
record PageOfBlobs(PageToken pageToken, List<String> prefixes, List<BlobVersion> blobs, String lastBlobIncluded, boolean lastPage) {
|
||||||
|
|
||||||
|
public boolean isLastPage() {
|
||||||
|
return lastPage;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String nextPageToken() {
|
||||||
|
return isLastPage() ? null : pageToken.nextPageToken(lastBlobIncluded).toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class BlobNotFoundException extends GcsRestException {
|
static class BlobNotFoundException extends GcsRestException {
|
||||||
|
|
|
@ -22,8 +22,10 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.core.Nullable;
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.XContentTestUtils;
|
||||||
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
import org.elasticsearch.test.fixture.HttpHeaderParser;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
@ -33,13 +35,19 @@ import java.net.URI;
|
||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import java.util.zip.GZIPOutputStream;
|
import java.util.zip.GZIPOutputStream;
|
||||||
|
|
||||||
|
import static java.util.Objects.requireNonNull;
|
||||||
|
|
||||||
public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
|
|
||||||
private static final String HOST = "http://127.0.0.1:12345";
|
private static final String HOST = "http://127.0.0.1:12345";
|
||||||
|
@ -72,7 +80,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
new TestHttpResponse(RestStatus.OK, "{\"kind\":\"storage#objects\",\"items\":[],\"prefixes\":[]}"),
|
new TestHttpResponse(RestStatus.OK, "{\"kind\":\"storage#objects\",\"items\":[],\"prefixes\":[]}"),
|
||||||
listBlobs(handler, bucket, null)
|
listBlobs(handler, bucket, null, null)
|
||||||
);
|
);
|
||||||
|
|
||||||
final var body = randomAlphaOfLength(50);
|
final var body = randomAlphaOfLength(50);
|
||||||
|
@ -85,14 +93,14 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
|
|
||||||
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
||||||
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
||||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null));
|
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, null, null));
|
||||||
|
|
||||||
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
assertEquals(new TestHttpResponse(RestStatus.OK, Strings.format("""
|
||||||
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
{"kind":"storage#objects","items":[{"kind":"storage#object","bucket":"%s","name":"%s","id":"%s","size":"50",\
|
||||||
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/"));
|
"generation":"1"}],"prefixes":[]}""", bucket, blobName, blobName)), listBlobs(handler, bucket, "path/", null));
|
||||||
|
|
||||||
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
||||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path"));
|
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "some/other/path", null));
|
||||||
|
|
||||||
assertEquals(
|
assertEquals(
|
||||||
new TestHttpResponse(RestStatus.OK, """
|
new TestHttpResponse(RestStatus.OK, """
|
||||||
|
@ -129,7 +137,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
|
|
||||||
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
assertEquals(new TestHttpResponse(RestStatus.OK, """
|
||||||
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/"));
|
{"kind":"storage#objects","items":[],"prefixes":[]}"""), listBlobs(handler, bucket, "path/", null));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetWithBytesRange() {
|
public void testGetWithBytesRange() {
|
||||||
|
@ -422,6 +430,114 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testListObjectsWithPrefix() {
|
||||||
|
final var bucket = randomIdentifier();
|
||||||
|
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||||
|
|
||||||
|
final int numberOfFiles = randomIntBetween(1, 100);
|
||||||
|
final int numberWithMatchingPrefix = randomIntBetween(0, numberOfFiles);
|
||||||
|
final String prefix = randomIdentifier();
|
||||||
|
|
||||||
|
// Create expected state
|
||||||
|
for (int i = 0; i < numberOfFiles; i++) {
|
||||||
|
final String blobName;
|
||||||
|
if (i < numberWithMatchingPrefix) {
|
||||||
|
blobName = prefix + "blob_name_" + i;
|
||||||
|
} else {
|
||||||
|
final String nonMatchingPrefix = randomValueOtherThan(prefix, ESTestCase::randomIdentifier);
|
||||||
|
blobName = nonMatchingPrefix + "blob_name_" + i;
|
||||||
|
}
|
||||||
|
assertEquals(
|
||||||
|
RestStatus.OK,
|
||||||
|
executeUpload(handler, bucket, blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
TestHttpResponse response = listBlobs(handler, bucket, prefix, null);
|
||||||
|
assertEquals(RestStatus.OK, response.restStatus());
|
||||||
|
|
||||||
|
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||||
|
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||||
|
);
|
||||||
|
assertEquals(numberWithMatchingPrefix, ((List<?>) jsonMapView.get("items")).size());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testListObjectsWithPrefixAndDelimiter() {
|
||||||
|
final var bucket = randomIdentifier();
|
||||||
|
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||||
|
final var delimiter = randomFrom("/", ".", "+", "\\");
|
||||||
|
final var prefix = randomBoolean() ? "" : randomIdentifier() + delimiter;
|
||||||
|
|
||||||
|
final int numberOfFiles = randomIntBetween(1, 100);
|
||||||
|
final int numberWithDelimiter = randomIntBetween(0, numberOfFiles);
|
||||||
|
|
||||||
|
// Create expected state
|
||||||
|
final Set<String> topLevelDirectories = new HashSet<>();
|
||||||
|
for (int i = 0; i < numberOfFiles; i++) {
|
||||||
|
final String blobName;
|
||||||
|
if (i < numberWithDelimiter) {
|
||||||
|
final String directory = randomAlphaOfLength(3);
|
||||||
|
blobName = directory + delimiter + "blob_name_" + i;
|
||||||
|
topLevelDirectories.add(directory + delimiter);
|
||||||
|
} else {
|
||||||
|
blobName = randomIdentifier() + "_blob_name_" + i;
|
||||||
|
}
|
||||||
|
assertEquals(
|
||||||
|
RestStatus.OK,
|
||||||
|
executeUpload(handler, bucket, prefix + blobName, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
final TestHttpResponse response = listBlobs(handler, bucket, prefix, delimiter);
|
||||||
|
assertEquals(RestStatus.OK, response.restStatus());
|
||||||
|
|
||||||
|
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||||
|
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||||
|
);
|
||||||
|
assertEquals(numberOfFiles - numberWithDelimiter, ((List<?>) jsonMapView.get("items")).size());
|
||||||
|
assertEquals(
|
||||||
|
topLevelDirectories.stream().map(d -> prefix + d).collect(Collectors.toSet()),
|
||||||
|
new HashSet<>(jsonMapView.get("prefixes"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the example from <a href="https://cloud.google.com/storage/docs/json_api/v1/objects/list">The docs</a>
|
||||||
|
*/
|
||||||
|
public void testListObjectsExampleFromDocumentation() {
|
||||||
|
final var bucket = randomIdentifier();
|
||||||
|
final var handler = new GoogleCloudStorageHttpHandler(bucket);
|
||||||
|
|
||||||
|
Stream.of("a/b", "a/c", "d", "e", "e/f", "e/g/h")
|
||||||
|
.forEach(
|
||||||
|
path -> assertEquals(
|
||||||
|
RestStatus.OK,
|
||||||
|
executeUpload(handler, bucket, path, randomBytesReference(randomIntBetween(100, 5_000)), null).restStatus()
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHttpResponse response = listBlobs(handler, bucket, null, "/");
|
||||||
|
assertEquals(RestStatus.OK, response.restStatus());
|
||||||
|
XContentTestUtils.JsonMapView jsonMapView = XContentTestUtils.createJsonMapView(
|
||||||
|
new ByteArrayInputStream(BytesReference.toBytes(response.body()))
|
||||||
|
);
|
||||||
|
assertEquals(
|
||||||
|
Set.of("d", "e"),
|
||||||
|
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
|
||||||
|
);
|
||||||
|
assertEquals(Set.of("a/", "e/"), new HashSet<>(jsonMapView.get("prefixes")));
|
||||||
|
|
||||||
|
response = listBlobs(handler, bucket, "e/", "/");
|
||||||
|
assertEquals(RestStatus.OK, response.restStatus());
|
||||||
|
jsonMapView = XContentTestUtils.createJsonMapView(new ByteArrayInputStream(BytesReference.toBytes(response.body())));
|
||||||
|
assertEquals(
|
||||||
|
Set.of("e/f"),
|
||||||
|
((List<?>) jsonMapView.get("items")).stream().map(i -> ((Map<?, ?>) i).get("name")).collect(Collectors.toSet())
|
||||||
|
);
|
||||||
|
// note this differs from the example, but third party test indicates this is what we get back
|
||||||
|
assertEquals(Set.of("e/g/"), new HashSet<>(jsonMapView.get("prefixes")));
|
||||||
|
}
|
||||||
|
|
||||||
private static TestHttpResponse executeUpload(
|
private static TestHttpResponse executeUpload(
|
||||||
GoogleCloudStorageHttpHandler handler,
|
GoogleCloudStorageHttpHandler handler,
|
||||||
String bucket,
|
String bucket,
|
||||||
|
@ -449,9 +565,8 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
"POST",
|
"POST",
|
||||||
"/upload/storage/v1/b/"
|
"/upload/storage/v1/b/"
|
||||||
+ bucket
|
+ bucket
|
||||||
+ "/?uploadType=resumable&name="
|
+ "/"
|
||||||
+ blobName
|
+ generateQueryString("uploadType", "resumable", "name", blobName, "ifGenerationMatch", ifGenerationMatch)
|
||||||
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : "")
|
|
||||||
);
|
);
|
||||||
final var locationHeader = createUploadResponse.headers.getFirst("Location");
|
final var locationHeader = createUploadResponse.headers.getFirst("Location");
|
||||||
final var sessionURI = locationHeader.substring(locationHeader.indexOf(HOST) + HOST.length());
|
final var sessionURI = locationHeader.substring(locationHeader.indexOf(HOST) + HOST.length());
|
||||||
|
@ -476,10 +591,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
return handleRequest(
|
return handleRequest(
|
||||||
handler,
|
handler,
|
||||||
"POST",
|
"POST",
|
||||||
"/upload/storage/v1/b/"
|
"/upload/storage/v1/b/" + bucket + "/" + generateQueryString("uploadType", "multipart", "ifGenerationMatch", ifGenerationMatch),
|
||||||
+ bucket
|
|
||||||
+ "/?uploadType=multipart"
|
|
||||||
+ (ifGenerationMatch != null ? "&ifGenerationMatch=" + ifGenerationMatch : ""),
|
|
||||||
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes)
|
createGzipCompressedMultipartUploadBody(bucket, blobName, bytes)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -494,11 +606,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
return handleRequest(
|
return handleRequest(
|
||||||
handler,
|
handler,
|
||||||
"GET",
|
"GET",
|
||||||
"/download/storage/v1/b/"
|
"/download/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch),
|
||||||
+ bucket
|
|
||||||
+ "/o/"
|
|
||||||
+ blobName
|
|
||||||
+ (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : ""),
|
|
||||||
BytesArray.EMPTY,
|
BytesArray.EMPTY,
|
||||||
range != null ? rangeHeader(range.start(), range.end()) : TestHttpExchange.EMPTY_HEADERS
|
range != null ? rangeHeader(range.start(), range.end()) : TestHttpExchange.EMPTY_HEADERS
|
||||||
);
|
);
|
||||||
|
@ -513,23 +621,23 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
return handleRequest(
|
return handleRequest(
|
||||||
handler,
|
handler,
|
||||||
"GET",
|
"GET",
|
||||||
"/storage/v1/b/" + bucket + "/o/" + blobName + (ifGenerationMatch != null ? "?ifGenerationMatch=" + ifGenerationMatch : "")
|
"/storage/v1/b/" + bucket + "/o/" + blobName + generateQueryString("ifGenerationMatch", ifGenerationMatch)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static long getCurrentGeneration(GoogleCloudStorageHttpHandler handler, String bucket, String blobName) {
|
private static long getCurrentGeneration(GoogleCloudStorageHttpHandler handler, String bucket, String blobName) {
|
||||||
TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
|
final TestHttpResponse blobMetadata = getBlobMetadata(handler, bucket, blobName, null);
|
||||||
assertEquals(RestStatus.OK, blobMetadata.restStatus());
|
assertEquals(RestStatus.OK, blobMetadata.restStatus());
|
||||||
Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
|
final Matcher matcher = GENERATION_PATTERN.matcher(blobMetadata.body.utf8ToString());
|
||||||
assertTrue(matcher.find());
|
assertTrue(matcher.find());
|
||||||
return Long.parseLong(matcher.group(1));
|
return Long.parseLong(matcher.group(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix) {
|
private static TestHttpResponse listBlobs(GoogleCloudStorageHttpHandler handler, String bucket, String prefix, String delimiter) {
|
||||||
return handleRequest(
|
return handleRequest(
|
||||||
handler,
|
handler,
|
||||||
"GET",
|
"GET",
|
||||||
"/storage/v1/b/" + bucket + "/o" + (prefix != null ? "?prefix=" + URLEncoder.encode(prefix, StandardCharsets.UTF_8) : "")
|
"/storage/v1/b/" + bucket + "/o" + generateQueryString("prefix", prefix, "delimiter", delimiter)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -551,7 +659,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
RestStatus restStatus() {
|
RestStatus restStatus() {
|
||||||
return Objects.requireNonNull(RestStatus.fromCode(status));
|
return requireNonNull(RestStatus.fromCode(status));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -601,7 +709,7 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
fail(e);
|
fail(e);
|
||||||
}
|
}
|
||||||
assertNotEquals(0, httpExchange.getResponseCode());
|
assertNotEquals(0, httpExchange.getResponseCode());
|
||||||
var responseHeaders = new Headers();
|
final var responseHeaders = new Headers();
|
||||||
httpExchange.getResponseHeaders().forEach((header, values) -> {
|
httpExchange.getResponseHeaders().forEach((header, values) -> {
|
||||||
// com.sun.net.httpserver.Headers.Headers() normalize keys
|
// com.sun.net.httpserver.Headers.Headers() normalize keys
|
||||||
if ("Range".equals(header) || "Content-range".equals(header) || "Location".equals(header)) {
|
if ("Range".equals(header) || "Content-range".equals(header) || "Location".equals(header)) {
|
||||||
|
@ -611,6 +719,35 @@ public class GoogleCloudStorageHttpHandlerTests extends ESTestCase {
|
||||||
return new TestHttpResponse(httpExchange.getResponseCode(), httpExchange.getResponseBodyContents(), responseHeaders);
|
return new TestHttpResponse(httpExchange.getResponseCode(), httpExchange.getResponseBodyContents(), responseHeaders);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a query string for the given parameters
|
||||||
|
*
|
||||||
|
* @param parameters The query parameters as alternating key, value pairs
|
||||||
|
* @return The query string including all parameters with a non-null value (e.g.
|
||||||
|
*/
|
||||||
|
public static String generateQueryString(Object... parameters) {
|
||||||
|
if (parameters.length % 2 != 0) {
|
||||||
|
final String message = "Parameters must be represented as alternating key, value pairs";
|
||||||
|
assert false : message;
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
final StringBuilder builder = new StringBuilder();
|
||||||
|
for (int i = 0; i < parameters.length; i += 2) {
|
||||||
|
final String key = String.valueOf(requireNonNull(parameters[i], "Parameter names must be non-null strings"));
|
||||||
|
final Object value = parameters[i + 1];
|
||||||
|
if (value != null) {
|
||||||
|
if (builder.isEmpty() == false) {
|
||||||
|
builder.append("&");
|
||||||
|
}
|
||||||
|
builder.append(key).append("=").append(URLEncoder.encode(String.valueOf(value), StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (builder.isEmpty() == false) {
|
||||||
|
return "?" + builder;
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
private static Headers contentRangeHeader(@Nullable Integer startInclusive, @Nullable Integer endInclusive, @Nullable Integer limit) {
|
private static Headers contentRangeHeader(@Nullable Integer startInclusive, @Nullable Integer endInclusive, @Nullable Integer limit) {
|
||||||
final String rangeString = startInclusive != null && endInclusive != null ? startInclusive + "-" + endInclusive : "*";
|
final String rangeString = startInclusive != null && endInclusive != null ? startInclusive + "-" + endInclusive : "*";
|
||||||
final String limitString = limit == null ? "*" : limit.toString();
|
final String limitString = limit == null ? "*" : limit.toString();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue