Small changes in concurrent multipart upload interfaces (#128977)

Small changes in BlobContainer interface and wrapper.

Relates ES-11815
This commit is contained in:
Tanguy Leroux 2025-06-06 16:58:39 +02:00 committed by GitHub
parent 49f8e5c0ae
commit fa383afcc5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 55 additions and 7 deletions

View file

@ -15,7 +15,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -116,7 +115,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
OperationPurpose purpose,
String blobName,
long blobSize,
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
blobStore.writeBlobAtomic(purpose, buildKey(blobName), blobSize, provider, failIfAlreadyExists);

View file

@ -50,7 +50,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
@ -477,7 +476,7 @@ public class AzureBlobStore implements BlobStore {
final OperationPurpose purpose,
final String blobName,
final long blobSize,
final CheckedBiFunction<Long, Long, InputStream, IOException> provider,
final BlobContainer.BlobMultiPartInputStreamProvider provider,
final boolean failIfAlreadyExists
) throws IOException {
try {
@ -559,7 +558,7 @@ public class AzureBlobStore implements BlobStore {
BlockBlobAsyncClient asyncClient,
String blobName,
MultiPart multiPart,
CheckedBiFunction<Long, Long, InputStream, IOException> provider
BlobContainer.BlobMultiPartInputStreamProvider provider
) {
logger.debug(
"{}: staging part [{}] of size [{}] from offset [{}]",

View file

@ -10,7 +10,6 @@
package org.elasticsearch.common.blobstore;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -153,11 +152,42 @@ public interface BlobContainer {
return false;
}
/**
* Provides an {@link InputStream} to read a part of the blob content.
*/
interface BlobMultiPartInputStreamProvider {
/**
* Provides an {@link InputStream} to read a part of the blob content.
*
* @param offset the offset in the blob content to start reading bytes from
* @param length the number of bytes to read
* @return an {@link InputStream} to read a part of the blob content.
* @throws IOException if something goes wrong opening the input stream
*/
InputStream apply(long offset, long length) throws IOException;
}
/**
* Reads the blob's content by calling an input stream provider multiple times, in order to split the blob's content into multiple
* parts that can be written to the container concurrently before being assembled into the final blob, using an atomic write operation
* if the implementation supports it. The number and the size of the parts depends of the implementation.
*
* Note: the method {link {@link #supportsConcurrentMultipartUploads()}} must be checked before calling this method.
*
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param provider The input stream provider that is used to read the blob content
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
* implementation dependent whether this value is used in writing the blob to the repository.
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
long blobSize,
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException();

View file

@ -89,6 +89,10 @@ public class FsBlobContainer extends AbstractBlobContainer {
this.path = path;
}
public Path getPath() {
return path;
}
@Override
public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOException {
return listBlobsByPrefix(purpose, null);

View file

@ -88,6 +88,22 @@ public abstract class FilterBlobContainer implements BlobContainer {
delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}
@Override
public boolean supportsConcurrentMultipartUploads() {
return delegate.supportsConcurrentMultipartUploads();
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
long blobSize,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
delegate.writeBlobAtomic(purpose, blobName, blobSize, provider, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,