BlobContainer: add copyBlob method (#125737)

* BlobContainer: add copyBlob method

If a container implements copyBlob, then the copy is
performed by the store, without client-side IO. If the store
does not provide a copy operation then the default implementation
throws UnsupportedOperationException.

This change provides implementations for the FS and S3 blob containers.
More will follow.

Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
Co-authored-by: David Turner <david.turner@elastic.co>
This commit is contained in:
Brendan Cully 2025-04-09 10:33:01 -07:00 committed by GitHub
parent 44507cce04
commit c1a71ff45c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 750 additions and 93 deletions

View file

@ -48,6 +48,7 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
@ -241,7 +242,7 @@ public class AzureBlobStoreRepositoryTests extends ESMockAPIBasedRepositoryInteg
private static class AzureHTTPStatsCollectorHandler extends HttpStatsCollectorHandler {
private AzureHTTPStatsCollectorHandler(HttpHandler delegate) {
super(delegate);
super(delegate, Arrays.stream(AzureBlobStore.Operation.values()).map(AzureBlobStore.Operation::getKey).toArray(String[]::new));
}
@Override

View file

@ -364,7 +364,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
private static class GoogleCloudStorageStatsCollectorHttpHandler extends HttpStatsCollectorHandler {
GoogleCloudStorageStatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
super(delegate, Arrays.stream(StorageOperation.values()).map(StorageOperation::key).toArray(String[]::new));
}
@Override

View file

@ -228,4 +228,56 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
e -> asInstanceOf(AmazonS3Exception.class, e.getCause()).getStatusCode() == RestStatus.REQUESTED_RANGE_NOT_SATISFIED.getStatus()
);
}
public void testCopy() {
final var sourceBlobName = randomIdentifier();
final var blobBytes = randomBytesReference(randomIntBetween(100, 2_000));
final var destinationBlobName = randomIdentifier();
final var repository = getRepository();
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
final var destinationBlobContainer = repository.blobStore().blobContainer(repository.basePath().add("target"));
destinationBlobContainer.copyBlob(
randomPurpose(),
sourceBlobContainer,
sourceBlobName,
destinationBlobName,
blobBytes.length()
);
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
});
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
}
public void testMultipartCopy() {
final var sourceBlobName = randomIdentifier();
// executeMultipart requires a minimum part size of 5 MiB
final var blobBytes = randomBytesReference(randomIntBetween(5 * 1024 * 1024, 10 * 1024 * 1024));
final var destinationBlobName = randomIdentifier();
final var repository = getRepository();
final var targetBytes = executeOnBlobStore(repository, sourceBlobContainer -> {
sourceBlobContainer.writeBlob(randomPurpose(), sourceBlobName, blobBytes, true);
final S3BlobContainer destinationBlobContainer = (S3BlobContainer) repository.blobStore()
.blobContainer(repository.basePath().add("target"));
destinationBlobContainer.executeMultipartCopy(
randomPurpose(),
(S3BlobContainer) sourceBlobContainer,
sourceBlobName,
destinationBlobName,
blobBytes.length()
);
return destinationBlobContainer.readBlob(randomPurpose(), destinationBlobName).readAllBytes();
});
assertArrayEquals(BytesReference.toBytes(blobBytes), targetBytes);
}
}

View file

@ -71,6 +71,7 @@ import org.elasticsearch.xcontent.XContentFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@ -621,6 +622,12 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
return ByteSizeUnit.MB.toBytes(1L);
}
@Override
long getMaxCopySizeBeforeMultipart() {
// on my laptop 10K exercises this better but larger values should be fine for nightlies
return ByteSizeUnit.MB.toBytes(1L);
}
@Override
void ensureMultiPartUploadSize(long blobSize) {}
};
@ -688,7 +695,7 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
private final Map<S3BlobStore.StatsKey, AtomicLong> metricsCount = ConcurrentCollections.newConcurrentMap();
S3StatsCollectorHttpHandler(final HttpHandler delegate) {
super(delegate);
super(delegate, Arrays.stream(S3BlobStore.Operation.values()).map(S3BlobStore.Operation::getKey).toArray(String[]::new));
}
private S3HttpHandler.S3Request parseRequest(HttpExchange exchange) {
@ -736,9 +743,17 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
k -> new AtomicLong()
).incrementAndGet();
} else if (request.isPutObjectRequest()) {
trackRequest("PutObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
if (exchange.getRequestHeaders().containsKey(S3BlobStore.CUSTOM_QUERY_PARAMETER_COPY_SOURCE)) {
trackRequest("CopyObject");
metricsCount.computeIfAbsent(
new S3BlobStore.StatsKey(S3BlobStore.Operation.COPY_OBJECT, purpose),
k -> new AtomicLong()
).incrementAndGet();
} else {
trackRequest("PutObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.PUT_OBJECT, purpose), k -> new AtomicLong())
.incrementAndGet();
}
} else if (request.isMultiObjectDeleteRequest()) {
trackRequest("DeleteObjects");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.DELETE_OBJECTS, purpose), k -> new AtomicLong())

View file

@ -14,6 +14,8 @@ import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@ -63,12 +65,14 @@ import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Date;
@ -300,6 +304,11 @@ class S3BlobContainer extends AbstractBlobContainer {
return blobStore.bufferSizeInBytes();
}
// package private for testing
long getMaxCopySizeBeforeMultipart() {
return blobStore.maxCopySizeBeforeMultipart();
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
@ -317,6 +326,67 @@ class S3BlobContainer extends AbstractBlobContainer {
writeBlob(purpose, blobName, bytes, failIfAlreadyExists);
}
/**
* Perform server-side copy of a blob from a source container
* <p>
* Server-side copy can be done for any size object, but if the object is larger than 5 GB then
* it must be done through a series of part copy operations rather than a single blob copy.
* See <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html">CopyObject</a>.
* Note that this operation will overwrite the destination if it already exists.
* @param purpose The purpose of the operation
* @param sourceBlobContainer The blob container to copy the blob into
* @param sourceBlobName The name of the blob to copy from
* @param blobName The name of the blob to copy to
* @param blobSize The size of the source blob in bytes (needed because some object stores use different implementations
* for very large blobs)
* @throws IOException If the operation fails on the server side
*/
@Override
public void copyBlob(
final OperationPurpose purpose,
final BlobContainer sourceBlobContainer,
final String sourceBlobName,
final String blobName,
final long blobSize
) throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, sourceBlobName);
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
if (sourceBlobContainer instanceof S3BlobContainer == false) {
throw new IllegalArgumentException("source blob container must be a S3BlobContainer");
}
final var s3SourceBlobContainer = (S3BlobContainer) sourceBlobContainer;
try {
if (blobSize > getMaxCopySizeBeforeMultipart()) {
executeMultipartCopy(purpose, s3SourceBlobContainer, sourceBlobName, blobName, blobSize);
} else {
// metadata is inherited from source, but not canned ACL or storage class
final var blobKey = buildKey(blobName);
final CopyObjectRequest copyRequest = new CopyObjectRequest(
s3SourceBlobContainer.blobStore.bucket(),
s3SourceBlobContainer.buildKey(sourceBlobName),
blobStore.bucket(),
blobKey
).withCannedAccessControlList(blobStore.getCannedACL()).withStorageClass(blobStore.getStorageClass());
S3BlobStore.configureRequestForMetrics(copyRequest, blobStore, Operation.COPY_OBJECT, purpose);
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> { clientReference.client().copyObject(copyRequest); });
}
}
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception amazonS3Exception) {
if (amazonS3Exception.getStatusCode() == RestStatus.NOT_FOUND.getStatus()) {
final var sourceKey = s3SourceBlobContainer.buildKey(sourceBlobName);
throw new NoSuchFileException("Copy source [" + sourceKey + "] not found: " + amazonS3Exception.getMessage());
}
}
throw new IOException("Unable to copy object [" + blobName + "] from [" + sourceBlobContainer + "][" + sourceBlobName + "]", e);
}
}
@Override
public DeleteResult delete(OperationPurpose purpose) throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
@ -475,23 +545,25 @@ class S3BlobContainer extends AbstractBlobContainer {
}
}
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(
OperationPurpose purpose,
private interface PartOperation {
PartETag doPart(String uploadId, int partNum, long partSize, boolean lastPart);
}
// for copy, blobName and s3BlobStore are the destination
private void executeMultipart(
final OperationPurpose purpose,
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
final long partSize,
final long blobSize,
final PartOperation partOperation
) throws IOException {
ensureMultiPartUploadSize(blobSize);
final long partSize = s3BlobStore.bufferSizeInBytes();
final Tuple<Long, Long> multiparts = numberOfMultiparts(blobSize, partSize);
if (multiparts.v1() > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger buffer size?");
throw new IllegalArgumentException("Too many multipart upload requests, maybe try a larger part size?");
}
final int nbParts = multiparts.v1().intValue();
@ -510,7 +582,7 @@ class S3BlobContainer extends AbstractBlobContainer {
);
}
if (Strings.isEmpty(uploadId.get())) {
throw new IOException("Failed to initialize multipart upload " + blobName);
throw new IOException("Failed to initialize multipart operation for " + blobName);
}
final List<PartETag> parts = new ArrayList<>();
@ -518,28 +590,20 @@ class S3BlobContainer extends AbstractBlobContainer {
long bytesCount = 0;
for (int i = 1; i <= nbParts; i++) {
final boolean lastPart = i == nbParts;
final UploadPartRequest uploadRequest = createPartUploadRequest(
purpose,
input,
uploadId.get(),
i,
blobName,
lastPart ? lastPartSize : partSize,
lastPart
);
bytesCount += uploadRequest.getPartSize();
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.client().uploadPart(uploadRequest)
);
parts.add(uploadResponse.getPartETag());
}
final var curPartSize = lastPart ? lastPartSize : partSize;
final var partEtag = partOperation.doPart(uploadId.get(), i, curPartSize, lastPart);
bytesCount += curPartSize;
parts.add(partEtag);
}
if (bytesCount != blobSize) {
throw new IOException(
"Failed to execute multipart upload for [" + blobName + "], expected " + blobSize + "bytes sent but got " + bytesCount
"Failed to execute multipart operation for ["
+ blobName
+ "], expected "
+ blobSize
+ "bytes sent but got "
+ bytesCount
);
}
@ -556,7 +620,7 @@ class S3BlobContainer extends AbstractBlobContainer {
success = true;
} catch (final AmazonClientException e) {
throw new IOException("Unable to upload object [" + blobName + "] using multipart upload", e);
throw new IOException("Unable to upload or copy object [" + blobName + "] using multipart upload", e);
} finally {
if ((success == false) && Strings.hasLength(uploadId.get())) {
abortMultiPartUpload(purpose, uploadId.get(), blobName);
@ -564,6 +628,81 @@ class S3BlobContainer extends AbstractBlobContainer {
}
}
/**
* Uploads a blob using multipart upload requests.
*/
void executeMultipartUpload(
OperationPurpose purpose,
final S3BlobStore s3BlobStore,
final String blobName,
final InputStream input,
final long blobSize
) throws IOException {
executeMultipart(
purpose,
s3BlobStore,
blobName,
s3BlobStore.bufferSizeInBytes(),
blobSize,
(uploadId, partNum, partSize, lastPart) -> {
final UploadPartRequest uploadRequest = createPartUploadRequest(
purpose,
input,
uploadId,
partNum,
blobName,
partSize,
lastPart
);
try (AmazonS3Reference clientReference = s3BlobStore.clientReference()) {
final UploadPartResult uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.client().uploadPart(uploadRequest)
);
return uploadResponse.getPartETag();
}
}
);
}
/**
* Copies a blob using multipart
* <p>
* This is required when the blob size is larger than MAX_FILE_SIZE.
* It must be called on the destination blob container.
* <p>
* It uses MAX_FILE_SIZE as the copy part size, because that minimizes the number of requests needed.
* Smaller part sizes might improve throughput when downloading from multiple parts at once, but we have no measurements
* indicating this would be helpful so we optimize for request count.
*/
void executeMultipartCopy(
OperationPurpose purpose,
final S3BlobContainer sourceContainer,
final String sourceBlobName,
final String destinationBlobName,
final long blobSize
) throws IOException {
final long copyPartSize = MAX_FILE_SIZE.getBytes();
final var destinationKey = buildKey(destinationBlobName);
executeMultipart(purpose, blobStore, destinationKey, copyPartSize, blobSize, ((uploadId, partNum, partSize, lastPart) -> {
final long startOffset = (partNum - 1) * copyPartSize;
final var request = new CopyPartRequest().withSourceBucketName(sourceContainer.blobStore.bucket())
.withSourceKey(sourceContainer.buildKey(sourceBlobName))
.withDestinationBucketName(blobStore.bucket())
.withDestinationKey(destinationKey)
.withUploadId(uploadId)
.withPartNumber(partNum)
.withFirstByte(startOffset)
.withLastByte(startOffset + partSize - 1);
S3BlobStore.configureRequestForMetrics(request, blobStore, Operation.COPY_MULTIPART_OBJECT, purpose);
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final var result = SocketAccess.doPrivileged(() -> clientReference.client().copyPart(request));
return result.getPartETag();
}
}));
}
// non-static, package private for testing
void ensureMultiPartUploadSize(final long blobSize) {
if (blobSize > MAX_FILE_SIZE_USING_MULTIPART.getBytes()) {

View file

@ -59,6 +59,7 @@ import static org.elasticsearch.rest.RestStatus.REQUESTED_RANGE_NOT_SATISFIED;
class S3BlobStore implements BlobStore {
public static final String CUSTOM_QUERY_PARAMETER_COPY_SOURCE = "x-amz-copy-source";
public static final String CUSTOM_QUERY_PARAMETER_PURPOSE = "x-purpose";
/**
@ -79,6 +80,8 @@ class S3BlobStore implements BlobStore {
private final ByteSizeValue bufferSize;
private final ByteSizeValue maxCopySizeBeforeMultipart;
private final boolean serverSideEncryption;
private final CannedAccessControlList cannedACL;
@ -103,6 +106,7 @@ class S3BlobStore implements BlobStore {
String bucket,
boolean serverSideEncryption,
ByteSizeValue bufferSize,
ByteSizeValue maxCopySizeBeforeMultipart,
String cannedACL,
String storageClass,
RepositoryMetadata repositoryMetadata,
@ -116,6 +120,7 @@ class S3BlobStore implements BlobStore {
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.maxCopySizeBeforeMultipart = maxCopySizeBeforeMultipart;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
this.repositoryMetadata = repositoryMetadata;
@ -251,10 +256,10 @@ class S3BlobStore implements BlobStore {
case GET_OBJECT, LIST_OBJECTS -> {
return request.getHttpMethod().name().equals("GET");
}
case PUT_OBJECT -> {
case PUT_OBJECT, COPY_OBJECT -> {
return request.getHttpMethod().name().equals("PUT");
}
case PUT_MULTIPART_OBJECT -> {
case PUT_MULTIPART_OBJECT, COPY_MULTIPART_OBJECT -> {
return request.getHttpMethod().name().equals("PUT") || request.getHttpMethod().name().equals("POST");
}
case DELETE_OBJECTS -> {
@ -328,6 +333,10 @@ class S3BlobStore implements BlobStore {
return bufferSize.getBytes();
}
public long maxCopySizeBeforeMultipart() {
return maxCopySizeBeforeMultipart.getBytes();
}
public RepositoryMetadata getRepositoryMetadata() {
return repositoryMetadata;
}
@ -551,7 +560,9 @@ class S3BlobStore implements BlobStore {
PUT_OBJECT("PutObject"),
PUT_MULTIPART_OBJECT("PutMultipartObject"),
DELETE_OBJECTS("DeleteObjects"),
ABORT_MULTIPART_OBJECT("AbortMultipartObject");
ABORT_MULTIPART_OBJECT("AbortMultipartObject"),
COPY_OBJECT("CopyObject"),
COPY_MULTIPART_OBJECT("CopyMultipartObject");
private final String key;

View file

@ -131,6 +131,18 @@ class S3Repository extends MeteredBlobStoreRepository {
MAX_PART_SIZE_USING_MULTIPART
);
/**
* Maximum size allowed for copy without multipart.
* Objects larger than this will be copied using multipart copy. S3 enforces a minimum multipart size of 5 MiB and a maximum
* non-multipart copy size of 5 GiB. The default is to use the maximum allowable size in order to minimize request count.
*/
static final Setting<ByteSizeValue> MAX_COPY_SIZE_BEFORE_MULTIPART = Setting.byteSizeSetting(
"max_copy_size_before_multipart",
MAX_FILE_SIZE,
MIN_PART_SIZE_USING_MULTIPART,
MAX_FILE_SIZE
);
/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 5tb.
*/
@ -241,6 +253,8 @@ class S3Repository extends MeteredBlobStoreRepository {
private final ByteSizeValue chunkSize;
private final ByteSizeValue maxCopySizeBeforeMultipart;
private final boolean serverSideEncryption;
private final String storageClass;
@ -308,6 +322,8 @@ class S3Repository extends MeteredBlobStoreRepository {
);
}
this.maxCopySizeBeforeMultipart = MAX_COPY_SIZE_BEFORE_MULTIPART.get(metadata.settings());
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(metadata.settings());
this.storageClass = STORAGE_CLASS_SETTING.get(metadata.settings());
@ -325,11 +341,13 @@ class S3Repository extends MeteredBlobStoreRepository {
coolDown = COOLDOWN_PERIOD.get(metadata.settings());
logger.debug(
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], cannedACL [{}], storageClass [{}]",
"using bucket [{}], chunk_size [{}], server_side_encryption [{}], buffer_size [{}], "
+ "max_copy_size_before_multipart [{}], cannedACL [{}], storageClass [{}]",
bucket,
chunkSize,
serverSideEncryption,
bufferSize,
maxCopySizeBeforeMultipart,
cannedACL,
storageClass
);
@ -454,6 +472,7 @@ class S3Repository extends MeteredBlobStoreRepository {
bucket,
serverSideEncryption,
bufferSize,
maxCopySizeBeforeMultipart,
cannedACL,
storageClass,
metadata,

View file

@ -214,6 +214,7 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes
"bucket",
S3Repository.SERVER_SIDE_ENCRYPTION_SETTING.getDefault(Settings.EMPTY),
bufferSize == null ? S3Repository.BUFFER_SIZE_SETTING.getDefault(Settings.EMPTY) : bufferSize,
S3Repository.MAX_COPY_SIZE_BEFORE_MULTIPART.getDefault(Settings.EMPTY),
S3Repository.CANNED_ACL_SETTING.getDefault(Settings.EMPTY),
S3Repository.STORAGE_CLASS_SETTING.getDefault(Settings.EMPTY),
repositoryMetadata,

View file

@ -15,6 +15,10 @@ import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
@ -159,12 +163,26 @@ public class S3BlobStoreContainerTests extends ESTestCase {
}
public void testExecuteMultipartUpload() throws IOException {
testExecuteMultipart(false);
}
public void testExecuteMultipartCopy() throws IOException {
testExecuteMultipart(true);
}
void testExecuteMultipart(boolean doCopy) throws IOException {
final String bucketName = randomAlphaOfLengthBetween(1, 10);
final String blobName = randomAlphaOfLengthBetween(1, 10);
final String sourceBucketName = randomAlphaOfLengthBetween(1, 10);
final String sourceBlobName = randomAlphaOfLengthBetween(1, 10);
final BlobPath blobPath = BlobPath.EMPTY;
if (randomBoolean()) {
IntStream.of(randomIntBetween(1, 5)).forEach(value -> BlobPath.EMPTY.add("path_" + value));
IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value));
}
final var sourceBlobPath = BlobPath.EMPTY;
if (randomBoolean()) {
IntStream.of(randomIntBetween(1, 5)).forEach(value -> sourceBlobPath.add("path_" + value));
}
final long blobSize = ByteSizeUnit.GB.toBytes(randomIntBetween(1, 128));
@ -174,6 +192,9 @@ public class S3BlobStoreContainerTests extends ESTestCase {
when(blobStore.bucket()).thenReturn(bucketName);
when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize);
final S3BlobStore sourceBlobStore = mock(S3BlobStore.class);
when(sourceBlobStore.bucket()).thenReturn(sourceBucketName);
final boolean serverSideEncryption = randomBoolean();
when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption);
@ -193,29 +214,45 @@ public class S3BlobStoreContainerTests extends ESTestCase {
when(client.initiateMultipartUpload(initArgCaptor.capture())).thenReturn(initResult);
final ArgumentCaptor<UploadPartRequest> uploadArgCaptor = ArgumentCaptor.forClass(UploadPartRequest.class);
final var copyArgCaptor = ArgumentCaptor.forClass(CopyPartRequest.class);
final List<String> expectedEtags = new ArrayList<>();
final long partSize = Math.min(bufferSize, blobSize);
final long partSize = Math.min(doCopy ? ByteSizeUnit.GB.toBytes(5) : bufferSize, blobSize);
long totalBytes = 0;
do {
expectedEtags.add(randomAlphaOfLength(50));
totalBytes += partSize;
} while (totalBytes < blobSize);
when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> {
final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0];
final UploadPartResult response = new UploadPartResult();
response.setPartNumber(request.getPartNumber());
response.setETag(expectedEtags.get(request.getPartNumber() - 1));
return response;
});
if (doCopy) {
when(client.copyPart(copyArgCaptor.capture())).thenAnswer(invocationOnMock -> {
final CopyPartRequest request = (CopyPartRequest) invocationOnMock.getArguments()[0];
final CopyPartResult result = new CopyPartResult();
result.setETag(expectedEtags.get(request.getPartNumber() - 1));
return result;
});
} else {
when(client.uploadPart(uploadArgCaptor.capture())).thenAnswer(invocationOnMock -> {
final UploadPartRequest request = (UploadPartRequest) invocationOnMock.getArguments()[0];
final UploadPartResult response = new UploadPartResult();
response.setPartNumber(request.getPartNumber());
response.setETag(expectedEtags.get(request.getPartNumber() - 1));
return response;
});
}
final ArgumentCaptor<CompleteMultipartUploadRequest> compArgCaptor = ArgumentCaptor.forClass(CompleteMultipartUploadRequest.class);
when(client.completeMultipartUpload(compArgCaptor.capture())).thenReturn(new CompleteMultipartUploadResult());
final ByteArrayInputStream inputStream = new ByteArrayInputStream(new byte[0]);
final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore);
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
final S3BlobContainer sourceContainer = new S3BlobContainer(sourceBlobPath, sourceBlobStore);
if (doCopy) {
blobContainer.executeMultipartCopy(randomPurpose(), sourceContainer, sourceBlobName, blobName, blobSize);
} else {
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, inputStream, blobSize);
}
final InitiateMultipartUploadRequest initRequest = initArgCaptor.getValue();
assertEquals(bucketName, initRequest.getBucketName());
@ -226,26 +263,46 @@ public class S3BlobStoreContainerTests extends ESTestCase {
assertEquals(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION, initRequest.getObjectMetadata().getSSEAlgorithm());
}
final Tuple<Long, Long> numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, bufferSize);
final Tuple<Long, Long> numberOfParts = S3BlobContainer.numberOfMultiparts(blobSize, partSize);
final List<UploadPartRequest> uploadRequests = uploadArgCaptor.getAllValues();
assertEquals(numberOfParts.v1().intValue(), uploadRequests.size());
if (doCopy) {
final var copyRequests = copyArgCaptor.getAllValues();
assertEquals(numberOfParts.v1().intValue(), copyRequests.size());
for (int i = 0; i < uploadRequests.size(); i++) {
final UploadPartRequest uploadRequest = uploadRequests.get(i);
for (int i = 0; i < copyRequests.size(); i++) {
final var request = copyRequests.get(i);
final long startOffset = i * partSize;
final long endOffset = Math.min(startOffset + partSize - 1, blobSize - 1);
assertEquals(bucketName, uploadRequest.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey());
assertEquals(initResult.getUploadId(), uploadRequest.getUploadId());
assertEquals(i + 1, uploadRequest.getPartNumber());
assertEquals(inputStream, uploadRequest.getInputStream());
assertEquals(sourceBucketName, request.getSourceBucketName());
assertEquals(sourceBlobPath.buildAsString() + sourceBlobName, request.getSourceKey());
assertEquals(bucketName, request.getDestinationBucketName());
assertEquals(blobPath.buildAsString() + blobName, request.getDestinationKey());
assertEquals(initResult.getUploadId(), request.getUploadId());
assertEquals(i + 1, request.getPartNumber());
assertEquals(Long.valueOf(startOffset), request.getFirstByte());
assertEquals(Long.valueOf(endOffset), request.getLastByte());
}
} else {
final List<UploadPartRequest> uploadRequests = uploadArgCaptor.getAllValues();
assertEquals(numberOfParts.v1().intValue(), uploadRequests.size());
if (i == (uploadRequests.size() - 1)) {
assertTrue(uploadRequest.isLastPart());
assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize());
} else {
assertFalse(uploadRequest.isLastPart());
assertEquals(bufferSize, uploadRequest.getPartSize());
for (int i = 0; i < uploadRequests.size(); i++) {
final UploadPartRequest uploadRequest = uploadRequests.get(i);
assertEquals(bucketName, uploadRequest.getBucketName());
assertEquals(blobPath.buildAsString() + blobName, uploadRequest.getKey());
assertEquals(initResult.getUploadId(), uploadRequest.getUploadId());
assertEquals(i + 1, uploadRequest.getPartNumber());
assertEquals(inputStream, uploadRequest.getInputStream());
if (i == (uploadRequests.size() - 1)) {
assertTrue(uploadRequest.isLastPart());
assertEquals(numberOfParts.v2().longValue(), uploadRequest.getPartSize());
} else {
assertFalse(uploadRequest.isLastPart());
assertEquals(bufferSize, uploadRequest.getPartSize());
}
}
}
@ -326,7 +383,7 @@ public class S3BlobStoreContainerTests extends ESTestCase {
blobContainer.executeMultipartUpload(randomPurpose(), blobStore, blobName, new ByteArrayInputStream(new byte[0]), blobSize);
});
assertEquals("Unable to upload object [" + blobName + "] using multipart upload", e.getMessage());
assertEquals("Unable to upload or copy object [" + blobName + "] using multipart upload", e.getMessage());
assertThat(e.getCause(), instanceOf(AmazonClientException.class));
assertEquals(exceptions.get(stage).getMessage(), e.getCause().getMessage());
@ -358,6 +415,46 @@ public class S3BlobStoreContainerTests extends ESTestCase {
closeMockClient(blobStore);
}
public void testCopy() throws Exception {
final var sourceBucketName = randomAlphaOfLengthBetween(1, 10);
final var sourceBlobName = randomAlphaOfLengthBetween(1, 10);
final var blobName = randomAlphaOfLengthBetween(1, 10);
final StorageClass storageClass = randomFrom(StorageClass.values());
final CannedAccessControlList cannedAccessControlList = randomBoolean() ? randomFrom(CannedAccessControlList.values()) : null;
final var blobStore = mock(S3BlobStore.class);
when(blobStore.bucket()).thenReturn(sourceBucketName);
when(blobStore.getStorageClass()).thenReturn(storageClass);
if (cannedAccessControlList != null) {
when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList);
}
when(blobStore.maxCopySizeBeforeMultipart()).thenReturn(S3Repository.MIN_PART_SIZE_USING_MULTIPART.getBytes());
final var sourceBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10));
final var sourceBlobContainer = new S3BlobContainer(sourceBlobPath, blobStore);
final var destinationBlobPath = BlobPath.EMPTY.add(randomAlphaOfLengthBetween(1, 10));
final var destinationBlobContainer = new S3BlobContainer(destinationBlobPath, blobStore);
final var client = configureMockClient(blobStore);
final ArgumentCaptor<CopyObjectRequest> captor = ArgumentCaptor.forClass(CopyObjectRequest.class);
when(client.copyObject(captor.capture())).thenReturn(new CopyObjectResult());
destinationBlobContainer.copyBlob(randomPurpose(), sourceBlobContainer, sourceBlobName, blobName, randomLongBetween(1, 10_000));
final CopyObjectRequest request = captor.getValue();
assertEquals(sourceBucketName, request.getSourceBucketName());
assertEquals(sourceBlobPath.buildAsString() + sourceBlobName, request.getSourceKey());
assertEquals(sourceBucketName, request.getDestinationBucketName());
assertEquals(destinationBlobPath.buildAsString() + blobName, request.getDestinationKey());
assertEquals(storageClass.toString(), request.getStorageClass());
assertEquals(cannedAccessControlList, request.getCannedAccessControlList());
closeMockClient(blobStore);
}
private static AmazonS3 configureMockClient(S3BlobStore blobStore) {
final AmazonS3 client = mock(AmazonS3.class);
try (AmazonS3Reference clientReference = new AmazonS3Reference(client)) {

View file

@ -214,6 +214,7 @@ public class TransportVersions {
public static final TransportVersion ESQL_REMOVE_AGGREGATE_TYPE = def(9_045_0_00);
public static final TransportVersion ADD_PROJECT_ID_TO_DSL_ERROR_INFO = def(9_046_0_00);
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG = def(9_047_00_0);
public static final TransportVersion REPO_ANALYSIS_COPY_BLOB = def(9_048_00_0);
/*
* STOP! READ THIS FIRST! No, really,

View file

@ -177,6 +177,32 @@ public interface BlobContainer {
writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
}
/**
* Copy a blob into this container from a source blob container and name.
* If copy is unavailable then throws UnsupportedOperationException.
* It may be unavailable either because the blob container has no copy implementation
* or because the target blob container is not on the same store as the source.
* If the destination blob already exists, this operation will overwrite it.
*
* @param purpose The purpose of the operation
* @param sourceBlobContainer The blob container to copy the blob into
* @param sourceBlobName The name of the blob to copy from
* @param blobName The name of the blob to copy to
* @param blobSize The size of the source blob in bytes (needed because some object stores use different implementations
* for very large blobs)
* @throws NoSuchFileException If the source blob does not exist
* @throws IOException If the operation generates an IO error
*/
default void copyBlob(
OperationPurpose purpose,
BlobContainer sourceBlobContainer,
String sourceBlobName,
String blobName,
long blobSize
) throws IOException {
throw new UnsupportedOperationException("this blob container does not support copy");
}
/**
* Deletes this container and all its contents from the repository.
*

View file

@ -349,6 +349,29 @@ public class FsBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void copyBlob(OperationPurpose purpose, BlobContainer sourceBlobContainer, String sourceBlobName, String blobName, long blobSize)
throws IOException {
if (sourceBlobContainer instanceof FsBlobContainer == false) {
throw new IllegalArgumentException("source blob container must be a FsBlobContainer");
}
final FsBlobContainer sourceContainer = (FsBlobContainer) sourceBlobContainer;
final Path sourceBlobPath = sourceContainer.path.resolve(sourceBlobName);
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
Files.copy(sourceBlobPath, tempBlobPath, StandardCopyOption.REPLACE_EXISTING);
try {
moveBlobAtomic(purpose, tempBlob, blobName, false);
} catch (IOException ex) {
try {
deleteBlobsIgnoringIfNotExists(purpose, Iterators.single(tempBlob));
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
}
}
private static void writeToPath(BytesReference bytes, Path tempBlobPath) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
bytes.writeTo(outputStream);

View file

@ -377,6 +377,31 @@ public class FsBlobContainerTests extends ESTestCase {
}
}
public void testCopy() throws Exception {
// without this, on CI the test sometimes fails with
// java.nio.file.ProviderMismatchException: mismatch, expected: class org.elasticsearch.common.blobstore.fs.FsBlobContainerTests$1,
// got: class org.elasticsearch.common.blobstore.fs.FsBlobContainerTests$MockFileSystemProvider
// and I haven't figured out why yet.
restoreFileSystem();
final var path = PathUtils.get(createTempDir().toString());
final var store = new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false);
final var sourcePath = BlobPath.EMPTY.add("source");
final var sourceContainer = store.blobContainer(sourcePath);
final var destinationPath = BlobPath.EMPTY.add("destination");
final var destinationContainer = store.blobContainer(destinationPath);
final var sourceBlobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
final var blobName = randomAlphaOfLengthBetween(1, 20).toLowerCase(Locale.ROOT);
final var contents = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
sourceContainer.writeBlob(randomPurpose(), sourceBlobName, contents, true);
destinationContainer.copyBlob(randomPurpose(), sourceContainer, sourceBlobName, blobName, contents.length());
var sourceContents = Streams.readFully(sourceContainer.readBlob(randomPurpose(), sourceBlobName));
var targetContents = Streams.readFully(destinationContainer.readBlob(randomPurpose(), blobName));
assertEquals(sourceContents, targetContents);
assertEquals(contents, targetContents);
}
static class MockFileSystemProvider extends FilterFileSystemProvider {
final Consumer<Long> onRead;

View file

@ -49,6 +49,7 @@ import java.util.regex.Pattern;
import javax.xml.parsers.DocumentBuilderFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.elasticsearch.test.fixture.HttpHeaderParser.parseRangeHeader;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.w3c.dom.Node.ELEMENT_NODE;
@ -155,10 +156,34 @@ public class S3HttpHandler implements HttpHandler {
if (upload == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
upload.addPart(blob.v1(), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
// CopyPart is UploadPart with an x-amz-copy-source header
final var sourceBlobName = exchange.getRequestHeaders().get("X-amz-copy-source");
if (sourceBlobName != null) {
var sourceBlob = blobs.get(sourceBlobName.getFirst());
if (sourceBlob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
var range = parsePartRange(exchange);
int start = Math.toIntExact(range.start());
int len = Math.toIntExact(range.end() - range.start() + 1);
var part = sourceBlob.slice(start, len);
var etag = UUIDs.randomBase64UUID();
upload.addPart(etag, part);
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CopyPartResult>
<ETag>%s</ETag>
</CopyPartResult>""".formatted(etag)).getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
} else {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
upload.addPart(blob.v1(), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
}
} else if (request.isCompleteMultipartUploadRequest()) {
@ -201,10 +226,28 @@ public class S3HttpHandler implements HttpHandler {
exchange.sendResponseHeaders((upload == null ? RestStatus.NOT_FOUND : RestStatus.NO_CONTENT).getStatus(), -1);
} else if (request.isPutObjectRequest()) {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
blobs.put(request.path(), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
// a copy request is a put request with a copy source header
final var sourceBlobName = exchange.getRequestHeaders().get("X-amz-copy-source");
if (sourceBlobName != null) {
var sourceBlob = blobs.get(sourceBlobName.getFirst());
if (sourceBlob == null) {
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
} else {
blobs.put(request.path(), sourceBlob);
byte[] response = ("""
<?xml version="1.0" encoding="UTF-8"?>
<CopyObjectResult></CopyObjectResult>""").getBytes(StandardCharsets.UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/xml");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
}
} else {
final Tuple<String, BytesReference> blob = parseRequestBody(exchange);
blobs.put(request.path(), blob.v2());
exchange.getResponseHeaders().add("ETag", blob.v1());
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
}
} else if (request.isListObjectsRequest()) {
final StringBuilder list = new StringBuilder();
@ -268,7 +311,7 @@ public class S3HttpHandler implements HttpHandler {
// requests with a header value like "Range: bytes=start-end" where both {@code start} and {@code end} are always defined
// (sometimes to very high value for {@code end}). It would be too tedious to fully support the RFC so S3HttpHandler only
// supports when both {@code start} and {@code end} are defined to match the SDK behavior.
final HttpHeaderParser.Range range = HttpHeaderParser.parseRangeHeader(rangeHeader);
final HttpHeaderParser.Range range = parseRangeHeader(rangeHeader);
if (range == null) {
throw new AssertionError("Bytes range does not match expected pattern: " + rangeHeader);
}
@ -467,6 +510,17 @@ public class S3HttpHandler implements HttpHandler {
}
}
private static HttpHeaderParser.Range parsePartRange(final HttpExchange exchange) {
final var sourceRangeHeaders = exchange.getRequestHeaders().get("X-amz-copy-source-range");
if (sourceRangeHeaders == null) {
throw new IllegalStateException("missing x-amz-copy-source-range header");
}
if (sourceRangeHeaders.size() != 1) {
throw new IllegalStateException("expected 1 x-amz-copy-source-range header, found " + sourceRangeHeaders.size());
}
return parseRangeHeader(sourceRangeHeaders.getFirst());
}
MultipartUpload getUpload(String uploadId) {
return uploads.get(uploadId);
}

View file

@ -137,17 +137,27 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
}
}
public void testWriteRead() throws IOException {
public void testWriteMaybeCopyRead() throws IOException {
try (BlobStore store = newBlobStore()) {
final BlobContainer container = store.blobContainer(BlobPath.EMPTY);
byte[] data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
final String blobName = randomAlphaOfLengthBetween(8, 12);
String readBlobName = blobName;
writeBlob(container, blobName, new BytesArray(data), randomBoolean());
if (randomBoolean()) {
// override file, to check if we get latest contents
data = randomBytes(randomIntBetween(10, scaledRandomIntBetween(1024, 1 << 16)));
writeBlob(container, "foobar", new BytesArray(data), false);
writeBlob(container, blobName, new BytesArray(data), false);
}
try (InputStream stream = container.readBlob(randomPurpose(), "foobar")) {
if (randomBoolean()) {
// server-side copy if supported
try {
final var destinationBlobName = blobName + "_copy";
container.copyBlob(randomPurpose(), container, blobName, destinationBlobName, data.length);
readBlobName = destinationBlobName;
} catch (UnsupportedOperationException ignored) {}
}
try (InputStream stream = container.readBlob(randomPurpose(), readBlobName)) {
BytesRefBuilder target = new BytesRefBuilder();
while (target.length() < data.length) {
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];

View file

@ -227,10 +227,8 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
}
}).filter(Objects::nonNull).map(Repository::stats).reduce(RepositoryStats::merge).get();
// Since no abort request is made, filter it out from the stats (also ensure it is 0) before comparing to the mock counts
Map<String, Long> sdkRequestCounts = repositoryStats.actionStats.entrySet()
.stream()
.filter(entry -> false == ("AbortMultipartObject".equals(entry.getKey()) && entry.getValue().requests() == 0L))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().requests()));
final Map<String, Long> mockCalls = getMockRequestCounts();
@ -355,8 +353,11 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
private final Map<String, Long> operationCount = new HashMap<>();
public HttpStatsCollectorHandler(HttpHandler delegate) {
public HttpStatsCollectorHandler(HttpHandler delegate, String[] operations) {
this.delegate = delegate;
for (String operation : operations) {
operationCount.put(operation, 0L);
}
}
@Override
@ -369,7 +370,7 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR
}
protected synchronized void trackRequest(final String requestType) {
operationCount.put(requestType, operationCount.getOrDefault(requestType, 0L) + 1);
operationCount.put(requestType, operationCount.get(requestType) + 1);
}
@Override

View file

@ -61,6 +61,7 @@ public class S3RepositoryAnalysisRestIT extends AbstractRepositoryAnalysisRestTe
.put("base_path", basePath)
.put("delete_objects_max_size", between(1, 1000))
.put("buffer_size", ByteSizeValue.ofMb(5)) // so some uploads are multipart ones
.put("max_copy_size_before_multipart", ByteSizeValue.ofMb(5))
.build();
}
}

View file

@ -74,6 +74,7 @@ import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
@ -172,6 +173,31 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
assertAnalysisFailureMessage(analyseRepositoryExpectFailure(request).getMessage());
}
public void testFailsOnCopyAfterWrite() {
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
request.maxBlobSize(ByteSizeValue.ofBytes(10L));
request.abortWritePermitted(false);
final AtomicBoolean failedCopy = new AtomicBoolean();
blobStore.setDisruption(new Disruption() {
@Override
public void onCopy() throws IOException {
failedCopy.set(true);
throw new IOException("simulated");
}
});
safeAwait((ActionListener<RepositoryAnalyzeAction.Response> l) -> analyseRepository(request, l.delegateResponse((ll, e) -> {
if (ExceptionsHelper.unwrapCause(e) instanceof RepositoryVerificationException repositoryVerificationException) {
assertAnalysisFailureMessage(repositoryVerificationException.getMessage());
assertTrue("did not fail a copy operation, so why did the verification fail?", failedCopy.get());
ll.onResponse(null);
} else {
ll.onFailure(e);
}
})));
}
public void testFailsOnChecksumMismatch() {
final RepositoryAnalyzeAction.Request request = new RepositoryAnalyzeAction.Request("test-repo");
request.maxBlobSize(ByteSizeValue.ofBytes(10L));
@ -593,6 +619,8 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
default void onWrite() throws IOException {}
default void onCopy() throws IOException {}
default Map<String, BlobMetadata> onList(Map<String, BlobMetadata> actualListing) throws IOException {
return actualListing;
}
@ -751,6 +779,25 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
blobs.put(blobName, contents);
}
@Override
public void copyBlob(
OperationPurpose purpose,
BlobContainer sourceBlobContainer,
String sourceBlobName,
String blobName,
long blobSize
) throws IOException {
assertThat(sourceBlobContainer, instanceOf(DisruptableBlobContainer.class));
assertPurpose(purpose);
final var source = (DisruptableBlobContainer) sourceBlobContainer;
final var sourceBlob = source.blobs.get(sourceBlobName);
if (sourceBlob == null) {
throw new FileNotFoundException(sourceBlobName + " not found");
}
disruption.onCopy();
blobs.put(blobName, sourceBlob);
}
@Override
public DeleteResult delete(OperationPurpose purpose) throws IOException {
assertPurpose(purpose);

View file

@ -70,6 +70,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.startsWith;
@ -469,6 +470,24 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
}
}
@Override
public void copyBlob(
OperationPurpose purpose,
BlobContainer sourceBlobContainer,
String sourceBlobName,
String blobName,
long blobSize
) throws IOException {
assertPurpose(purpose);
assertThat(sourceBlobContainer, instanceOf(AssertingBlobContainer.class));
final var source = (AssertingBlobContainer) sourceBlobContainer;
final var sourceBlob = source.blobs.get(sourceBlobName);
if (sourceBlob == null) {
throw new FileNotFoundException(sourceBlobName + " not found");
}
blobs.put(blobName, sourceBlob);
}
@Override
public DeleteResult delete(OperationPurpose purpose) {
assertPurpose(purpose);

View file

@ -9,6 +9,7 @@ package org.elasticsearch.repositories.blobstore.testkit.analyze;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
@ -47,7 +48,9 @@ import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@ -70,6 +73,11 @@ import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositor
* version of the blob, but again must not yield partial data). Usually, however, we write once and only read after the write completes, and
* in this case we insist that the read succeeds.
*
* The writer may also attempt to copy the blob, either just before the write completes (which may fail with not found)
* or after (which should not fail). The writer may overwrite the source while the copy is in progress. If a copy is attempted,
* readers will read the copy instead of the original. As above, if the copy succeeds, then readers should see a complete copy.
* If the source is overwritten while the copy is in progress, readers may see either the original blob or the new one but no
* mixture or partial result.
*
* <pre>
*
@ -83,6 +91,12 @@ import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositor
* | Write blob with random content | |
* |-----------------------------------| |
* | | |
* | Copy blob during write (rarely) | |
* |-----------------------------------| |
* | | |
* | Copy complete | |
* |-----------------------------------| |
* | | |
* | Read range during write (rarely) | |
* |----------------------------------------------------------------------------|
* | | |
@ -106,6 +120,18 @@ import static org.elasticsearch.repositories.blobstore.testkit.SnapshotRepositor
* |-| Read phase | | |
* | |------------| | |
* | | |
* | Copy blob (rarely) | |
* |-----------------------------------| |
* | | |
* | TODO: Overwrite source (rarely) | |
* |-----------------------------------| |
* | | |
* | Overwrite complete | |
* |-----------------------------------| |
* | | |
* | Copy complete | |
* |-----------------------------------| |
* | | |
* | Read range [a,b) | |
* |----------------------------------------------------------------------------|
* | | |
@ -199,6 +225,9 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
private final boolean checksumWholeBlob;
private final long checksumStart;
private final long checksumEnd;
// If a copy is requested, do exactly one so that the number of blobs created is controlled by RepositoryAnalyzeAction.
// Doing the copy in step 1 exercises copy before read completes. Step 2 exercises copy after read completes or the happy path.
private final boolean doEarlyCopy;
private final List<DiscoveryNode> earlyReadNodes;
private final List<DiscoveryNode> readNodes;
private final GroupedActionListener<NodeResponse> readNodesListener;
@ -230,6 +259,7 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
checksumStart = randomLongBetween(0L, request.targetLength);
checksumEnd = randomLongBetween(checksumStart + 1, request.targetLength + 1);
}
doEarlyCopy = random.nextBoolean();
final ArrayList<DiscoveryNode> nodes = new ArrayList<>(request.nodes); // copy for shuffling purposes
if (request.readEarly) {
@ -368,11 +398,37 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
}
private void onLastReadForInitialWrite() {
var readBlobName = request.blobName;
if (request.copyBlobName != null && doEarlyCopy) {
try {
blobContainer.copyBlob(
OperationPurpose.REPOSITORY_ANALYSIS,
blobContainer,
request.blobName,
request.copyBlobName,
request.targetLength
);
readBlobName = request.copyBlobName;
} catch (UnsupportedOperationException uoe) {
// not all repositories support copy
} catch (NoSuchFileException | FileNotFoundException ignored) {
// assume this is due to copy starting before the source was finished
logger.trace("copy FNF before write completed: {}", request.blobName);
} catch (IOException e) {
if (request.getAbortWrite() == false) {
throw new RepositoryVerificationException(
request.getRepositoryName(),
"failed to copy blob before write: [" + request.blobName + "]",
e
);
}
}
}
if (earlyReadNodes.isEmpty() == false) {
if (logger.isTraceEnabled()) {
logger.trace("sending read request to [{}] for [{}] before write complete", earlyReadNodes, request.getDescription());
}
readOnNodes(earlyReadNodes, true);
readOnNodes(earlyReadNodes, readBlobName, true);
}
if (request.getAbortWrite()) {
throw new BlobWriteAbortedException();
@ -383,10 +439,36 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
if (logger.isTraceEnabled()) {
logger.trace("sending read request to [{}] for [{}] after write complete", readNodes, request.getDescription());
}
readOnNodes(readNodes, false);
var readBlobName = request.blobName;
if (request.copyBlobName != null && doEarlyCopy == false && request.getAbortWrite() == false) {
try {
blobContainer.copyBlob(
OperationPurpose.REPOSITORY_ANALYSIS,
blobContainer,
request.blobName,
request.copyBlobName,
request.targetLength
);
readBlobName = request.copyBlobName;
} catch (UnsupportedOperationException uoe) {
// not all repositories support copy
} catch (IOException e) {
for (int i = 0; i < readNodes.size(); i++) {
readNodesListener.onFailure(
new RepositoryVerificationException(
request.getRepositoryName(),
"failed to copy blob after write: [" + request.blobName + "]",
e
)
);
}
return;
}
}
readOnNodes(readNodes, readBlobName, false);
}
private void readOnNodes(List<DiscoveryNode> nodes, boolean beforeWriteComplete) {
private void readOnNodes(List<DiscoveryNode> nodes, String blobName, boolean beforeWriteComplete) {
for (DiscoveryNode node : nodes) {
if (task.isCancelled()) {
// record dummy response since we're already on the path to failure
@ -396,7 +478,7 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
} else {
// no need for extra synchronization after checking if we were cancelled a couple of lines ago -- we haven't notified
// the outer listener yet so any bans on the children are still in place
final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest();
final GetBlobChecksumAction.Request blobChecksumRequest = getBlobChecksumRequest(blobName);
transportService.sendChildRequest(
node,
GetBlobChecksumAction.NAME,
@ -432,11 +514,11 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
}
}
private GetBlobChecksumAction.Request getBlobChecksumRequest() {
private GetBlobChecksumAction.Request getBlobChecksumRequest(String blobName) {
return new GetBlobChecksumAction.Request(
request.getRepositoryName(),
request.getBlobPath(),
request.getBlobName(),
blobName,
checksumStart,
checksumWholeBlob ? 0L : checksumEnd
);
@ -650,6 +732,8 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
private final boolean readEarly;
private final boolean writeAndOverwrite;
private final boolean abortWrite;
@Nullable
private final String copyBlobName;
Request(
String repositoryName,
@ -662,7 +746,8 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
int earlyReadNodeCount,
boolean readEarly,
boolean writeAndOverwrite,
boolean abortWrite
boolean abortWrite,
@Nullable String copyBlobName
) {
assert 0 < targetLength;
assert targetLength <= MAX_ATOMIC_WRITE_SIZE || (readEarly == false && writeAndOverwrite == false) : "oversized atomic write";
@ -678,6 +763,7 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
this.readEarly = readEarly;
this.writeAndOverwrite = writeAndOverwrite;
this.abortWrite = abortWrite;
this.copyBlobName = copyBlobName;
}
Request(StreamInput in) throws IOException {
@ -693,6 +779,11 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
readEarly = in.readBoolean();
writeAndOverwrite = in.readBoolean();
abortWrite = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)) {
copyBlobName = in.readOptionalString();
} else {
copyBlobName = null;
}
}
@Override
@ -709,6 +800,14 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
out.writeBoolean(readEarly);
out.writeBoolean(writeAndOverwrite);
out.writeBoolean(abortWrite);
if (out.getTransportVersion().onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)) {
out.writeOptionalString(copyBlobName);
} else if (copyBlobName != null) {
assert false : out.getTransportVersion();
throw new IllegalStateException(
"cannot serialize " + this + "] using transport version [" + out.getTransportVersion() + "]"
);
}
}
@Override
@ -734,6 +833,8 @@ class BlobAnalyzeAction extends HandledTransportAction<BlobAnalyzeAction.Request
+ writeAndOverwrite
+ ", abortWrite="
+ abortWrite
+ ", copyBlobName="
+ copyBlobName
+ "]";
}

View file

@ -517,14 +517,27 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
final List<Long> blobSizes = getBlobSizes(request);
Collections.shuffle(blobSizes, random);
for (int i = 0; i < request.getBlobCount(); i++) {
int blobCount = request.getBlobCount();
for (int i = 0; i < blobCount; i++) {
final long targetLength = blobSizes.get(i);
final boolean smallBlob = targetLength <= MAX_ATOMIC_WRITE_SIZE; // avoid the atomic API for larger blobs
final boolean abortWrite = smallBlob && request.isAbortWritePermitted() && rarely(random);
final boolean doCopy = minClusterTransportVersion.onOrAfter(TransportVersions.REPO_ANALYSIS_COPY_BLOB)
&& rarely(random)
&& i > 0;
final String blobName = "test-blob-" + i + "-" + UUIDs.randomBase64UUID(random);
String copyBlobName = null;
if (doCopy) {
copyBlobName = blobName + "-copy";
blobCount--;
if (i >= blobCount) {
break;
}
}
final BlobAnalyzeAction.Request blobAnalyzeRequest = new BlobAnalyzeAction.Request(
request.getRepositoryName(),
blobPath,
"test-blob-" + i + "-" + UUIDs.randomBase64UUID(random),
blobName,
targetLength,
random.nextLong(),
nodes,
@ -532,7 +545,8 @@ public class RepositoryAnalyzeAction extends HandledTransportAction<RepositoryAn
request.getEarlyReadNodeCount(),
smallBlob && rarely(random),
repository.supportURLRepo() && repository.hasAtomicOverwrites() && smallBlob && rarely(random) && abortWrite == false,
abortWrite
abortWrite,
copyBlobName
);
final DiscoveryNode node = nodes.get(random.nextInt(nodes.size()));
queue.add(ref -> runBlobAnalysis(ref, blobAnalyzeRequest, node));