Add documentation to concurrent multipart upload utility methods (#128821)

In the hope to make more sense of how the flux of byte buffer works.
Also remove the synchronization on the input stream.

Relates ES-11815
This commit is contained in:
Tanguy Leroux 2025-06-04 10:28:37 +02:00 committed by GitHub
parent 28959c2e49
commit f190a69f5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -66,6 +66,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.Assertions;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
@ -74,6 +75,7 @@ import org.elasticsearch.repositories.RepositoriesMetrics;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.FilterInputStream;
import java.io.IOException;
@ -101,6 +103,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
@ -507,7 +510,11 @@ public class AzureBlobStore implements BlobStore {
return asyncClient.commitBlockList(
multiParts.stream().map(MultiPart::blockId).toList(),
failIfAlreadyExists == false
).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()));
)
.doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size()))
// Note: non-committed uploaded blocks will be deleted by Azure after a week
// (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks)
.doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e));
})
.block();
}
@ -562,12 +569,13 @@ public class AzureBlobStore implements BlobStore {
multiPart.blockOffset()
);
try {
var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart);
final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize());
assert stream.markSupported() : "provided input stream must support mark and reset";
boolean success = false;
try {
var stageBlock = asyncClient.stageBlock(
multiPart.blockId(),
toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE),
multiPart.blockSize()
).doOnSuccess(unused -> {
logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize()));
@ -760,88 +768,106 @@ public class AzureBlobStore implements BlobStore {
// we read the input stream (i.e. when it's rate limited)
}
private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) {
assert delegate.markSupported() : "An InputStream with mark support was expected";
// We need to introduce a read barrier in order to provide visibility for the underlying
// input stream state as the input stream can be read from different threads.
// TODO See if this is still needed
/**
* Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces.
*/
private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) {
return new FilterInputStream(delegate) {
private final AtomicReference<Thread> currentThread = Assertions.ENABLED ? new AtomicReference<>() : null;
private final boolean isTraceEnabled = logger.isTraceEnabled();
@Override
public synchronized int read(byte[] b, int off, int len) throws IOException {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
public int read(byte[] b, int off, int len) throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read(b, off, len);
if (isTraceEnabled) {
logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
return result;
}
@Override
public synchronized int read() throws IOException {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
public int read() throws IOException {
assert assertThread(null, Thread.currentThread());
assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
try {
var result = super.read();
if (isTraceEnabled) {
logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part());
}
return result;
} finally {
assert assertThread(Thread.currentThread(), null);
}
return result;
}
@Override
public synchronized void mark(int readlimit) {
if (isTraceEnabled) {
logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part());
}
super.mark(readlimit);
}
@Override
public synchronized void reset() throws IOException {
if (isTraceEnabled) {
logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part());
}
super.reset();
}
@Override
public synchronized void close() throws IOException {
if (isTraceEnabled) {
logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part());
}
super.close();
}
@Override
public String toString() {
return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']';
private boolean assertThread(Thread current, Thread updated) {
final Thread witness = currentThread.compareAndExchange(current, updated);
assert witness == current
: "Unable to set current thread to ["
+ updated
+ "]: expected thread ["
+ current
+ "] to be the thread currently accessing the input stream for reading, but thread "
+ witness
+ " is already reading "
+ blobName
+ " part "
+ multipart.part();
return true;
}
};
}
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, int chunkSize) {
assert stream.markSupported() : "An InputStream with mark support was expected";
// We need to mark the InputStream as it's possible that we need to retry for the same chunk
/**
* Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes.
*
* @param stream the input stream that needs to be converted
* @param length the expected length in bytes of the input stream
* @param byteBufferSize the size of the ByteBuffers to be created
**/
private static Flux<ByteBuffer> toFlux(InputStream stream, long length, final int byteBufferSize) {
assert stream.markSupported() : "input stream must support mark and reset";
// always marks the input stream in case it needs to be retried
stream.mark(Integer.MAX_VALUE);
// defer the creation of the flux until it is subscribed
return Flux.defer(() -> {
// TODO Code in this Flux.defer() can be concurrently executed by multiple threads?
try {
stream.reset();
} catch (IOException e) {
throw new RuntimeException(e);
// Flux.defer() catches and propagates the exception
throw new UncheckedIOException(e);
}
// the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another
// thread pool (azure_event_loop), so we need this to be atomic.
final var bytesRead = new AtomicLong(0L);
// This flux is subscribed by a downstream operator that finally queues the
// buffers into netty output queue. Sadly we are not able to get a signal once
// the buffer has been flushed, so we have to allocate those and let the GC to
// reclaim them (see MonoSendMany). Additionally, that very same operator requests
// 128 elements (that's hardcoded) once it's subscribed (later on, it requests
// by 64 elements), that's why we provide 64kb buffers.
// length is at most 100MB so it's safe to cast back to an integer in this case
final int parts = (int) length / chunkSize;
final long remaining = length % chunkSize;
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + chunkSize > length ? length - pos : chunkSize;
assert length <= ByteSizeValue.ofMb(100L).getBytes() : length;
// length is at most 100MB so it's safe to cast back to an integer
final int parts = Math.toIntExact(length / byteBufferSize);
final long remaining = length % byteBufferSize;
// This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty
// output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let
// the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to
// it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by
// the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so
// on.
//
// So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in
// the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per
// flux which is aligned with BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE. The creation of the ByteBuffer objects are
// forked to the repository_azure thread pool, which has a maximum of 15 threads (most of the time, can be less than that for
// nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are allocated on heap at a time here (omitting the
// ones already created and pending garbage collection).
return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> {
long count = pos + byteBufferSize > length ? length - pos : byteBufferSize;
int numOfBytesRead = 0;
int offset = 0;
int len = (int) count;
@ -867,9 +893,8 @@ public class AzureBlobStore implements BlobStore {
);
}
});
// We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream
// subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk
}).subscribeOn(Schedulers.elastic());
}
/**