Support writeAtomicBlob from InputStream for repository blob container interface (#112754)

Mostly for fs and hdfs repos, similar to how writeAtomicBlob from
bytes is implemented (write temp file and rename atomically).

Relates ES-9248
This commit is contained in:
Iraklis Psaroudakis 2024-09-17 15:08:51 +02:00 committed by GitHub
parent f211f6a65b
commit 32937109ac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 241 additions and 24 deletions

View file

@ -105,6 +105,17 @@ public class AzureBlobContainer extends AbstractBlobContainer {
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -95,6 +95,17 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -428,9 +428,21 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
final BytesReference serialized = BytesReference.bytes(
modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), SnapshotsService.OLD_SNAPSHOT_FORMAT)
);
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
if (randomBoolean()) {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(randomNonDataPurpose(), getRepositoryDataBlobName(modifiedRepositoryData.getGenId()), serialized, true);
} else {
repository.blobStore()
.blobContainer(repository.basePath())
.writeBlobAtomic(
randomNonDataPurpose(),
getRepositoryDataBlobName(modifiedRepositoryData.getGenId()),
serialized.streamInput(),
serialized.length(),
true
);
}
final String newSnapshotName = "snapshot-new";
final long beforeThrottledSnapshot = repository.threadPool().relativeTimeInNanos();

View file

@ -297,10 +297,20 @@ class S3BlobContainer extends AbstractBlobContainer {
return blobStore.bufferSizeInBytes();
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
writeBlob(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
assert BlobContainer.assertPurposeConsistency(purpose, blobName);
writeBlob(purpose, blobName, bytes, failIfAlreadyExists);
}

View file

@ -139,6 +139,17 @@ public class URLBlobContainer extends AbstractBlobContainer {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException("URL repository doesn't support this operation");
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -221,6 +221,28 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
}
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
final String tempBlob = FsBlobContainer.tempBlobName(blobName);
final Path tempBlobPath = new Path(path, tempBlob);
final Path blob = new Path(path, blobName);
store.execute((Operation<Void>) fileContext -> {
writeToPath(inputStream, blobSize, fileContext, tempBlobPath, EnumSet.of(CreateFlag.CREATE, CreateFlag.SYNC_BLOCK));
try {
fileContext.rename(tempBlobPath, blob, failIfAlreadyExists ? Options.Rename.NONE : Options.Rename.OVERWRITE);
} catch (org.apache.hadoop.fs.FileAlreadyExistsException faee) {
throw new FileAlreadyExistsException(blob.toString(), null, faee.getMessage());
}
return null;
});
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
@ -260,6 +282,7 @@ final class HdfsBlobContainer extends AbstractBlobContainer {
while ((bytesRead = inputStream.read(buffer)) != -1) {
stream.write(buffer, 0, bytesRead);
}
assert stream.size() == blobSize : "Expected to write [" + blobSize + "] bytes but wrote [" + stream.size() + "] bytes";
}
}

View file

@ -190,6 +190,19 @@ public class BlobStoreRepositoryOperationPurposeIT extends AbstractSnapshotInteg
super.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assertEquals(blobName, OperationPurpose.SNAPSHOT_METADATA, purpose);
assertPurposeConsistency(purpose, blobName);
super.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -99,8 +99,8 @@ public interface BlobContainer {
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param inputStream The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
* implementation dependent whether this value is used in writing the blob to the repository.
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
@ -144,6 +144,22 @@ public interface BlobContainer {
CheckedConsumer<OutputStream, IOException> writer
) throws IOException;
/**
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
*
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param inputStream The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
* implementation dependent whether this value is used in writing the blob to the repository.
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException;
/**
* Reads blob content from a {@link BytesReference} and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it.
@ -155,7 +171,11 @@ public interface BlobContainer {
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException;
default void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
assert assertPurposeConsistency(purpose, blobName);
writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
}
/**
* Deletes this container and all its contents from the repository.

View file

@ -302,6 +302,32 @@ public class FsBlobContainer extends AbstractBlobContainer {
IOUtils.fsync(file, false);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assert purpose != OperationPurpose.SNAPSHOT_DATA && BlobContainer.assertPurposeConsistency(purpose, blobName) : purpose;
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
writeToPath(inputStream, tempBlobPath, blobSize);
moveBlobAtomic(purpose, tempBlob, blobName, failIfAlreadyExists);
} catch (IOException ex) {
try {
deleteBlobsIgnoringIfNotExists(purpose, Iterators.single(tempBlob));
} catch (IOException e) {
ex.addSuppressed(e);
}
throw ex;
} finally {
IOUtils.fsync(path, true);
}
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, final String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
@ -333,11 +359,12 @@ public class FsBlobContainer extends AbstractBlobContainer {
private void writeToPath(InputStream inputStream, Path tempBlobPath, long blobSize) throws IOException {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
final int bufferSize = blobStore.bufferSizeInBytes();
org.elasticsearch.core.Streams.copy(
long bytesWritten = org.elasticsearch.core.Streams.copy(
inputStream,
outputStream,
new byte[blobSize < bufferSize ? Math.toIntExact(blobSize) : bufferSize]
);
assert bytesWritten == blobSize : "expected [" + blobSize + "] bytes but wrote [" + bytesWritten + "]";
}
IOUtils.fsync(tempBlobPath, false);
}

View file

@ -88,6 +88,17 @@ public abstract class FilterBlobContainer implements BlobContainer {
delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
delegate.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -350,14 +350,18 @@ public class FsBlobContainerTests extends ESTestCase {
BlobPath.EMPTY,
path
);
container.writeBlobAtomic(
randomNonDataPurpose(),
blobName,
new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512))),
true
);
final var randomData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
if (randomBoolean()) {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, randomData, true);
} else {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, randomData.streamInput(), randomData.length(), true);
}
final var blobData = new BytesArray(randomByteArrayOfLength(randomIntBetween(1, 512)));
container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData, false);
if (randomBoolean()) {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData, false);
} else {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, blobData.streamInput(), blobData.length(), false);
}
assertEquals(blobData, Streams.readFully(container.readBlob(randomPurpose(), blobName)));
expectThrows(
FileAlreadyExistsException.class,

View file

@ -236,7 +236,17 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
if (randomBoolean()) {
container.writeBlob(randomPurpose(), blobName, bytesArray, failIfAlreadyExists);
} else {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, bytesArray, failIfAlreadyExists);
if (randomBoolean()) {
container.writeBlobAtomic(randomNonDataPurpose(), blobName, bytesArray, failIfAlreadyExists);
} else {
container.writeBlobAtomic(
randomNonDataPurpose(),
blobName,
bytesArray.streamInput(),
bytesArray.length(),
failIfAlreadyExists
);
}
}
}

View file

@ -664,16 +664,17 @@ public class MockRepository extends FsRepository {
@Override
public void writeBlobAtomic(
final OperationPurpose purpose,
final String blobName,
final BytesReference bytes,
final boolean failIfAlreadyExists
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
final Random random = beforeAtomicWrite(blobName);
if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
// Simulate a failure between the write and move operation in FsBlobContainer
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
super.writeBlob(purpose, tempBlobName, bytes, failIfAlreadyExists);
super.writeBlob(purpose, tempBlobName, inputStream, blobSize, failIfAlreadyExists);
maybeIOExceptionOrBlock(blobName);
final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
fsBlobContainer.moveBlobAtomic(purpose, tempBlobName, blobName, failIfAlreadyExists);
@ -681,10 +682,20 @@ public class MockRepository extends FsRepository {
// Atomic write since it is potentially supported
// by the delegating blob container
maybeIOExceptionOrBlock(blobName);
super.writeBlobAtomic(purpose, blobName, bytes, failIfAlreadyExists);
super.writeBlobAtomic(purpose, blobName, inputStream, blobSize, failIfAlreadyExists);
}
}
@Override
public void writeBlobAtomic(
final OperationPurpose purpose,
final String blobName,
final BytesReference bytes,
final boolean failIfAlreadyExists
) throws IOException {
writeBlobAtomic(purpose, blobName, bytes.streamInput(), bytes.length(), failIfAlreadyExists);
}
private Random beforeAtomicWrite(String blobName) throws IOException {
final Random random = RandomizedContext.current().getRandom();
if (failOnIndexLatest && BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) {

View file

@ -222,6 +222,17 @@ public final class TestUtils {
throw unsupportedException();
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
throw unsupportedException();
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
throw unsupportedException();

View file

@ -699,12 +699,28 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
final BytesStreamOutput out = new BytesStreamOutput();
writer.accept(out);
if (atomic) {
writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists);
if (randomBoolean()) {
writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists);
} else {
writeBlobAtomic(purpose, blobName, out.bytes().streamInput(), out.bytes().length(), failIfAlreadyExists);
}
} else {
writeBlob(purpose, blobName, out.bytes(), failIfAlreadyExists);
}
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assertPurpose(purpose);
writeBlobAtomic(blobName, inputStream, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {

View file

@ -421,12 +421,28 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
final BytesStreamOutput out = new BytesStreamOutput();
writer.accept(out);
if (atomic) {
writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists);
if (randomBoolean()) {
writeBlobAtomic(purpose, blobName, out.bytes(), failIfAlreadyExists);
} else {
writeBlobAtomic(purpose, blobName, out.bytes().streamInput(), out.bytes().length(), failIfAlreadyExists);
}
} else {
writeBlob(purpose, blobName, out.bytes(), failIfAlreadyExists);
}
}
@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
InputStream inputStream,
long blobSize,
boolean failIfAlreadyExists
) throws IOException {
assertPurpose(purpose);
writeBlobAtomic(blobName, inputStream, blobSize, failIfAlreadyExists);
}
@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {