diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index d2a52a95c66e..818cc4c0cd56 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -66,6 +66,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.core.Assertions; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; @@ -74,6 +75,7 @@ import org.elasticsearch.repositories.RepositoriesMetrics; import org.elasticsearch.repositories.azure.AzureRepository.Repository; import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.threadpool.ThreadPool; import java.io.FilterInputStream; import java.io.IOException; @@ -101,6 +103,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiPredicate; import java.util.stream.Collectors; @@ -507,7 +510,11 @@ public class AzureBlobStore implements BlobStore { return asyncClient.commitBlockList( multiParts.stream().map(MultiPart::blockId).toList(), failIfAlreadyExists == false - ).doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size())); + ) + .doOnSuccess(unused -> logger.debug("{}: all {} parts committed", blobName, multiParts.size())) + // Note: non-committed uploaded blocks will be deleted by Azure after a week + // (see https://docs.microsoft.com/en-us/rest/api/storageservices/put-block#remarks) + .doOnError(e -> logger.error(() -> format("%s: failed to commit %d parts", blobName, multiParts.size()), e)); }) .block(); } @@ -562,12 +569,13 @@ public class AzureBlobStore implements BlobStore { multiPart.blockOffset() ); try { - var stream = toSynchronizedInputStream(blobName, provider.apply(multiPart.blockOffset(), multiPart.blockSize()), multiPart); + final var stream = provider.apply(multiPart.blockOffset(), multiPart.blockSize()); + assert stream.markSupported() : "provided input stream must support mark and reset"; boolean success = false; try { var stageBlock = asyncClient.stageBlock( multiPart.blockId(), - toFlux(stream, multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE), + toFlux(wrapInputStream(blobName, stream, multiPart), multiPart.blockSize(), DEFAULT_UPLOAD_BUFFERS_SIZE), multiPart.blockSize() ).doOnSuccess(unused -> { logger.debug(() -> format("%s: part [%s] of size [%s] uploaded", blobName, multiPart.part(), multiPart.blockSize())); @@ -760,88 +768,106 @@ public class AzureBlobStore implements BlobStore { // we read the input stream (i.e. when it's rate limited) } - private static InputStream toSynchronizedInputStream(String blobName, InputStream delegate, MultiPart multipart) { - assert delegate.markSupported() : "An InputStream with mark support was expected"; - // We need to introduce a read barrier in order to provide visibility for the underlying - // input stream state as the input stream can be read from different threads. - // TODO See if this is still needed + /** + * Wraps an {@link InputStream} to assert that it is read only by a single thread at a time and to add log traces. + */ + private static InputStream wrapInputStream(final String blobName, final InputStream delegate, final MultiPart multipart) { return new FilterInputStream(delegate) { + private final AtomicReference currentThread = Assertions.ENABLED ? new AtomicReference<>() : null; private final boolean isTraceEnabled = logger.isTraceEnabled(); @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - var result = super.read(b, off, len); - if (isTraceEnabled) { - logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + public int read(byte[] b, int off, int len) throws IOException { + assert assertThread(null, Thread.currentThread()); + assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + try { + var result = super.read(b, off, len); + if (isTraceEnabled) { + logger.trace("{} reads {} bytes from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + } + return result; + } finally { + assert assertThread(Thread.currentThread(), null); } - return result; } @Override - public synchronized int read() throws IOException { - var result = super.read(); - if (isTraceEnabled) { - logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + public int read() throws IOException { + assert assertThread(null, Thread.currentThread()); + assert ThreadPool.assertCurrentThreadPool(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME); + try { + var result = super.read(); + if (isTraceEnabled) { + logger.trace("{} reads {} byte from {} part {}", Thread.currentThread(), result, blobName, multipart.part()); + } + return result; + } finally { + assert assertThread(Thread.currentThread(), null); } - return result; } - @Override - public synchronized void mark(int readlimit) { - if (isTraceEnabled) { - logger.trace("{} marks stream {} part {}", Thread.currentThread(), blobName, multipart.part()); - } - super.mark(readlimit); - } - - @Override - public synchronized void reset() throws IOException { - if (isTraceEnabled) { - logger.trace("{} resets stream {} part {}", Thread.currentThread(), blobName, multipart.part()); - } - super.reset(); - } - - @Override - public synchronized void close() throws IOException { - if (isTraceEnabled) { - logger.trace("{} closes stream {} part {}", Thread.currentThread(), blobName, multipart.part()); - } - super.close(); - } - - @Override - public String toString() { - return blobName + " part [" + multipart.part() + "] of size [" + multipart.blockSize() + ']'; + private boolean assertThread(Thread current, Thread updated) { + final Thread witness = currentThread.compareAndExchange(current, updated); + assert witness == current + : "Unable to set current thread to [" + + updated + + "]: expected thread [" + + current + + "] to be the thread currently accessing the input stream for reading, but thread " + + witness + + " is already reading " + + blobName + + " part " + + multipart.part(); + return true; } }; } - private static Flux toFlux(InputStream stream, long length, int chunkSize) { - assert stream.markSupported() : "An InputStream with mark support was expected"; - // We need to mark the InputStream as it's possible that we need to retry for the same chunk + /** + * Converts an input stream to a Flux of ByteBuffer. This method also checks that the stream has provided the expected number of bytes. + * + * @param stream the input stream that needs to be converted + * @param length the expected length in bytes of the input stream + * @param byteBufferSize the size of the ByteBuffers to be created + **/ + private static Flux toFlux(InputStream stream, long length, final int byteBufferSize) { + assert stream.markSupported() : "input stream must support mark and reset"; + // always marks the input stream in case it needs to be retried stream.mark(Integer.MAX_VALUE); + // defer the creation of the flux until it is subscribed return Flux.defer(() -> { - // TODO Code in this Flux.defer() can be concurrently executed by multiple threads? try { stream.reset(); } catch (IOException e) { - throw new RuntimeException(e); + // Flux.defer() catches and propagates the exception + throw new UncheckedIOException(e); } + // the number of bytes read is updated in a thread pool (repository_azure) and later compared to the expected length in another + // thread pool (azure_event_loop), so we need this to be atomic. final var bytesRead = new AtomicLong(0L); - // This flux is subscribed by a downstream operator that finally queues the - // buffers into netty output queue. Sadly we are not able to get a signal once - // the buffer has been flushed, so we have to allocate those and let the GC to - // reclaim them (see MonoSendMany). Additionally, that very same operator requests - // 128 elements (that's hardcoded) once it's subscribed (later on, it requests - // by 64 elements), that's why we provide 64kb buffers. - // length is at most 100MB so it's safe to cast back to an integer in this case - final int parts = (int) length / chunkSize; - final long remaining = length % chunkSize; - return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * chunkSize).concatMap(pos -> Mono.fromCallable(() -> { - long count = pos + chunkSize > length ? length - pos : chunkSize; + assert length <= ByteSizeValue.ofMb(100L).getBytes() : length; + // length is at most 100MB so it's safe to cast back to an integer + final int parts = Math.toIntExact(length / byteBufferSize); + final long remaining = length % byteBufferSize; + + // This flux is subscribed by a downstream subscriber (reactor.netty.channel.MonoSendMany) that queues the buffers into netty + // output queue. Sadly we are not able to get a signal once the buffer has been flushed, so we have to allocate those and let + // the GC to reclaim them. Additionally, the MonoSendMany subscriber requests 128 elements from the flux when it subscribes to + // it. This 128 value is hardcoded in reactor.netty.channel.MonoSend.MAX_SIZE). After 128 byte buffers have been published by + // the flux, the MonoSendMany subscriber requests 64 more byte buffers (see reactor.netty.channel.MonoSend.REFILL_SIZE) and so + // on. + // + // So this flux instantiates 128 ByteBuffer objects of DEFAULT_UPLOAD_BUFFERS_SIZE bytes in heap every time the NettyOutbound in + // the Azure's Netty event loop requests byte buffers to write to the network channel. That represents 128 * 64kb = 8 mb per + // flux which is aligned with BlobAsyncClient.BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE. The creation of the ByteBuffer objects are + // forked to the repository_azure thread pool, which has a maximum of 15 threads (most of the time, can be less than that for + // nodes with less than 750mb heap). It means that max. 15 * 8 = 120mb bytes are allocated on heap at a time here (omitting the + // ones already created and pending garbage collection). + return Flux.range(0, remaining == 0 ? parts : parts + 1).map(i -> i * byteBufferSize).concatMap(pos -> Mono.fromCallable(() -> { + long count = pos + byteBufferSize > length ? length - pos : byteBufferSize; int numOfBytesRead = 0; int offset = 0; int len = (int) count; @@ -867,9 +893,8 @@ public class AzureBlobStore implements BlobStore { ); } }); - // We need to subscribe on a different scheduler to avoid blocking the io threads when we read the input stream + // subscribe on a different scheduler to avoid blocking the network io threads when reading bytes from disk }).subscribeOn(Schedulers.elastic()); - } /**