diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index a5259ee73afc..1f92c9242638 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -105,6 +105,17 @@ public class AzureBlobContainer extends AbstractBlobContainer { blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index f2f2e7f63491..047549cc893e 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -95,6 +95,17 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index 4eb5363d9025..429a81b02bd5 100644 --- a/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -428,9 +428,21 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes final BytesReference serialized = BytesReference.bytes( modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT) ); - repository.blobStore() - .blobContainer(repository.basePath()) - .writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true); + if (randomBoolean()) { + repository.blobStore() + .blobContainer(repository.basePath()) + .writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true); + } else { + repository.blobStore() + .blobContainer(repository.basePath()) + .writeBlobAtomic( + randomNonDataPurpose(), + getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), + serialized.streamInput(), + serialized.length(), + true + ); + } final String newSnapshotName = "snapshot-new"; final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos(); diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 8e71cb995904..49df07845332 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -297,10 +297,20 @@ class S3BlobContainer extends AbstractBlobContainer { return blobStore.bufferSizeInBytes(); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { - assert BlobContainer.assertPurposeConsistency(purpose, blobName); writeBlob(purpose, blobName, bytes, failIfAlreadyExists); } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 419261d12c38..4aa0ef450ef8 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -139,6 +139,17 @@ public class URLBlobContainer extends AbstractBlobContainer { throw new UnsupportedOperationException("URL repository doesn't support this operation"); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index c8554c04ff23..54a855964758 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -221,6 +221,28 @@ final class HdfsBlobContainer extends AbstractBlobContainer { } } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + final String tempBlob = FsBlobContainer.tempBlobName(blobName); + final Path tempBlobPath = new Path(path, tempBlob); + final Path blob = new Path(path, blobName); + store.execute((Operation) fileContext -> { + writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK)); + try { + fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE); + } catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) { + throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage()); + } + return null; + }); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { @@ -260,6 +282,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer { while ((bytesRead = inputStream.read(buffer)) != -1) { stream.write(buffer, 0, bytesRead); } + assert stream.size() == blobSize : "Expected to write [" + blobSize + "] bytes but wrote [" + stream.size() + "] bytes"; } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryOperationPurposeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryOperationPurposeIT.java index 9177944ffef3..c0a2c83f7fe1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryOperationPurposeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryOperationPurposeIT.java @@ -190,6 +190,19 @@ public class BlobStoreRepositoryOperationPurposeIT extends AbstractSnapshotInteg super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + assertEquals(blobName, OperationPurpose.SNAPSHOT_METADATA, purpose); + assertPurposeConsistency(purpose, blobName); + super.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index d54a1868040d..8f6ee42339e6 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -99,8 +99,8 @@ public interface BlobContainer { * @param purpose The purpose of the operation * @param blobName The name of the blob to write the contents of the input stream to. * @param inputStream The input stream from which to retrieve the bytes to write to the blob. - * @param blobSize The size of the blob to be written, in bytes. It is implementation dependent whether - * this value is used in writing the blob to the repository. + * @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is + * implementation dependent whether this value is used in writing the blob to the repository. * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. @@ -144,6 +144,22 @@ public interface BlobContainer { CheckedConsumer writer ) throws IOException; + /** + * Reads blob content from the input stream and writes it to the container in a new blob with the given name, + * using an atomic write operation if the implementation supports it. + * + * @param purpose The purpose of the operation + * @param blobName The name of the blob to write the contents of the input stream to. + * @param inputStream The input stream from which to retrieve the bytes to write to the blob. + * @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is + * implementation dependent whether this value is used in writing the blob to the repository. + * @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists + * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists + * @throws IOException if the input stream could not be read, or the target blob could not be written to. + */ + void writeBlobAtomic(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException; + /** * Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name, * using an atomic write operation if the implementation supports it. @@ -155,7 +171,11 @@ public interface BlobContainer { * @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists * @throws IOException if the input stream could not be read, or the target blob could not be written to. */ - void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException; + default void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) + throws IOException { + assert assertPurposeConsistency(purpose, blobName); + writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); + } /** * Deletes this container and all its contents from the repository. diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index b4359d721b72..b5118d8a289a 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -302,6 +302,32 @@ public class FsBlobContainer extends AbstractBlobContainer { IOUtils.fsync(file, false); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + assert purpose != OperationPurpose.SNAPSHOT_DATA && BlobContainer.assertPurposeConsistency(purpose, blobName) : purpose; + final String tempBlob = tempBlobName(blobName); + final Path tempBlobPath = path.resolve(tempBlob); + try { + writeToPath(inputStream, tempBlobPath, blobSize); + moveBlobAtomic(purpose, tempBlob, blobName, failIfAlreadyExists); + } catch (IOException ex) { + try { + deleteBlobsIgnoringIfNotExists(purpose, Iterators.single(tempBlob)); + } catch (IOException e) { + ex.addSuppressed(e); + } + throw ex; + } finally { + IOUtils.fsync(path, true); + } + } + @Override public void writeBlobAtomic(OperationPurpose purpose, final String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { @@ -333,11 +359,12 @@ public class FsBlobContainer extends AbstractBlobContainer { private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException { try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) { final int bufferSize = blobStore.bufferSizeInBytes(); - org.elasticsearch.core.Streams.copy( + long bytesWritten = org.elasticsearch.core.Streams.copy( inputStream, outputStream, new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize] ); + assert bytesWritten == blobSize : "expected [" + blobSize + "] bytes but wrote [" + bytesWritten + "]"; } IOUtils.fsync(tempBlobPath, false); } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index fff4421772f8..4de563a9e292 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -88,6 +88,17 @@ public abstract class FilterBlobContainer implements BlobContainer { delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + delegate.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index ee588a0621d9..6cb1c00dab0e 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -350,14 +350,18 @@ public class FsBlobContainerTests extends ESTestCase { BlobPath.EMPTY, path ); - container.writeBlobAtomic( - randomNonDataPurpose(), - blobName, - new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))), - true - ); + final var randomData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))); + if (randomBoolean()) { + container.writeBlobAtomic(randomNonDataPurpose(), blobName, randomData, true); + } else { + container.writeBlobAtomic(randomNonDataPurpose(), blobName, randomData.streamInput(), randomData.length(), true); + } final var blobData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))); - container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData, false); + if (randomBoolean()) { + container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData, false); + } else { + container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData.streamInput(), blobData.length(), false); + } assertEquals(blobData, Streams.readFully(container.readBlob(randomPurpose(), blobName))); expectThrows( FileAlreadyExistsException.class, diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 115b984d89bc..b85ee970664e 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -236,7 +236,17 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase if (randomBoolean()) { container.writeBlob(randomPurpose(), blobName, bytesArray, failIfAlreadyExists); } else { - container.writeBlobAtomic(randomNonDataPurpose(), blobName, bytesArray, failIfAlreadyExists); + if (randomBoolean()) { + container.writeBlobAtomic(randomNonDataPurpose(), blobName, bytesArray, failIfAlreadyExists); + } else { + container.writeBlobAtomic( + randomNonDataPurpose(), + blobName, + bytesArray.streamInput(), + bytesArray.length(), + failIfAlreadyExists + ); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 09c445403cad..89d02068b56d 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -664,16 +664,17 @@ public class MockRepository extends FsRepository { @Override public void writeBlobAtomic( - final OperationPurpose purpose, - final String blobName, - final BytesReference bytes, - final boolean failIfAlreadyExists + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists ) throws IOException { final Random random = beforeAtomicWrite(blobName); if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) { // Simulate a failure between the write and move operation in FsBlobContainer final String tempBlobName = FsBlobContainer.tempBlobName(blobName); - super.writeBlob(purpose, tempBlobName, bytes, failIfAlreadyExists); + super.writeBlob(purpose, tempBlobName, inputStream, blobSize, failIfAlreadyExists); maybeIOExceptionOrBlock(blobName); final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate(); fsBlobContainer.moveBlobAtomic(purpose, tempBlobName, blobName, failIfAlreadyExists); @@ -681,10 +682,20 @@ public class MockRepository extends FsRepository { // Atomic write since it is potentially supported // by the delegating blob container maybeIOExceptionOrBlock(blobName); - super.writeBlobAtomic(purpose, blobName, bytes, failIfAlreadyExists); + super.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); } } + @Override + public void writeBlobAtomic( + final OperationPurpose purpose, + final String blobName, + final BytesReference bytes, + final boolean failIfAlreadyExists + ) throws IOException { + writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists); + } + private Random beforeAtomicWrite(String blobName) throws IOException { final Random random = RandomizedContext.current().getRandom(); if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index 9cdb683c3628..0f9c8502a036 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -222,6 +222,17 @@ public final class TestUtils { throw unsupportedException(); } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + throw unsupportedException(); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) { throw unsupportedException(); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java index e61f883abd60..1e9b7f23c60d 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisFailureIT.java @@ -699,12 +699,28 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { final BytesStreamOutput out = new BytesStreamOutput(); writer.accept(out); if (atomic) { - writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists); + if (randomBoolean()) { + writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlobAtomic(purpose, blobName, out.bytes().streamInput(), out.bytes().length(), failIfAlreadyExists); + } } else { writeBlob(purpose, blobName, out.bytes(), failIfAlreadyExists); } } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + assertPurpose(purpose); + writeBlobAtomic(blobName, inputStream, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java index bb452ad2a64c..1f8b247e7617 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/analyze/RepositoryAnalysisSuccessIT.java @@ -421,12 +421,28 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase { final BytesStreamOutput out = new BytesStreamOutput(); writer.accept(out); if (atomic) { - writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists); + if (randomBoolean()) { + writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists); + } else { + writeBlobAtomic(purpose, blobName, out.bytes().streamInput(), out.bytes().length(), failIfAlreadyExists); + } } else { writeBlob(purpose, blobName, out.bytes(), failIfAlreadyExists); } } + @Override + public void writeBlobAtomic( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) throws IOException { + assertPurpose(purpose); + writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists); + } + @Override public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {