diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index b1a8e54ba617..8f7c22dd7e35 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -154,4 +154,19 @@ public class AzureBlobContainer extends AbstractBlobContainer { protected String buildKey(String blobName) { return keyPath + (blobName == null ? "" : blobName); } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public boolean compareAndSetRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public long getRegister(String key) throws IOException { + throw new UnsupportedOperationException(); // TODO + } } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java index 404831939b9f..876657e7e720 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainer.java @@ -117,4 +117,19 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer { assert blobName != null; return path + blobName; } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public boolean compareAndSetRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public long getRegister(String key) throws IOException { + throw new UnsupportedOperationException(); // TODO + } } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 03c2773999dd..825be0593326 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -607,4 +607,19 @@ class S3BlobContainer extends AbstractBlobContainer { return Tuple.tuple(parts + 1, remaining); } } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public boolean compareAndSetRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException(); // TODO + } + + @Override + public long getRegister(String key) throws IOException { + throw new UnsupportedOperationException(); // TODO + } } diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java index 709d91e5eebc..3318e02f1fdb 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobContainer.java @@ -147,4 +147,9 @@ public class URLBlobContainer extends AbstractBlobContainer { } } + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException("URL repository doesn't support this operation"); + } + } diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java index e7ec51002610..42673a2bc917 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobContainer.java @@ -316,4 +316,9 @@ final class HdfsBlobContainer extends AbstractBlobContainer { }); } } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + throw new UnsupportedOperationException("HDFS repositories do not support this operation"); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java index 3ef4a46fc042..c1545d11bdfe 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobContainer.java @@ -194,4 +194,40 @@ public interface BlobContainer { * @throws IOException if there were any failures in reading from the blob container. */ Map listBlobsByPrefix(String blobNamePrefix) throws IOException; + + /** + * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. + * Keys not yet used start at initial value 0. Returns the current value (before it was updated). + * + * @param key key of the value to update + * @param expected the expected value + * @param updated the new value + * @return the value read from the register (before it was updated) + */ + long compareAndExchangeRegister(String key, long expected, long updated) throws IOException; + + /** + * Atomically sets the value stored at the given key to {@code updated} if the {@code current value == expected}. + * Keys not yet used start at initial value 0. + * + * @param key key of the value to update + * @param expected the expected value + * @param updated the new value + * @return true if successful, false if the expected value did not match the updated value + */ + default boolean compareAndSetRegister(String key, long expected, long updated) throws IOException { + return compareAndExchangeRegister(key, expected, updated) == expected; + } + + /** + * Gets the value set by {@link #compareAndSetRegister(String, long, long)} for a given key. + * If a key has not yet been used, the initial value is 0. + * + * @param key key of the value to get + * @return value found + */ + default long getRegister(String key) throws IOException { + return compareAndExchangeRegister(key, 0, 0); + } + } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index fafbc0e4fc2f..11771145494b 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -20,15 +20,21 @@ import org.elasticsearch.common.blobstore.support.BlobMetadata; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Strings; +import org.elasticsearch.core.SuppressForbidden; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.channels.SeekableByteChannel; import java.nio.file.AccessDeniedException; import java.nio.file.DirectoryStream; @@ -369,4 +375,54 @@ public class FsBlobContainer extends AbstractBlobContainer { private static OutputStream blobOutputStream(Path file) throws IOException { return Files.newOutputStream(file, StandardOpenOption.CREATE_NEW); } + + private static final KeyedLock registerLocks = new KeyedLock<>(); + + @Override + @SuppressForbidden(reason = "write to channel that we have open for locking purposes already directly") + public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException { + try ( + FileChannel channel = openOrCreateAtomic(path.resolve(key)); + FileLock ignored1 = channel.lock(); + Releasable ignored2 = registerLocks.acquire(key) + ) { + final ByteBuffer buf = ByteBuffer.allocate(Long.BYTES); + final long found; + while (buf.remaining() > 0) { + if (channel.read(buf) == -1) { + break; + } + } + if (buf.position() == 0) { + found = 0L; + } else if (buf.position() == Long.BYTES) { + found = buf.getLong(0); + buf.clear(); + if (channel.read(buf) != -1) { + throw new IllegalStateException("Read file of length greater than [" + Long.BYTES + "] for [" + key + "]"); + } + } else { + throw new IllegalStateException("Read file of length [" + buf.position() + "] for [" + key + "]"); + } + if (found == expected) { + buf.clear().putLong(updated).flip(); + while (buf.remaining() > 0) { + channel.write(buf, buf.position()); + } + channel.force(true); + } + return found; + } + } + + private static FileChannel openOrCreateAtomic(Path path) throws IOException { + try { + if (Files.exists(path) == false) { + return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + } + } catch (FileAlreadyExistsException e) { + // ok, created concurrently + } + return FileChannel.open(path, StandardOpenOption.READ, StandardOpenOption.WRITE); + } } diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java index ddd725b82f4e..18cface95caf 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/support/FilterBlobContainer.java @@ -101,4 +101,19 @@ public abstract class FilterBlobContainer implements BlobContainer { public Map listBlobsByPrefix(String blobNamePrefix) throws IOException { return delegate.listBlobsByPrefix(blobNamePrefix); } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException { + return delegate.compareAndExchangeRegister(key, expected, updated); + } + + @Override + public boolean compareAndSetRegister(String key, long expected, long updated) throws IOException { + return delegate.compareAndSetRegister(key, expected, updated); + } + + @Override + public long getRegister(String key) throws IOException { + return delegate.getRegister(key); + } } diff --git a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java index a7578902589b..97f377766354 100644 --- a/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java +++ b/server/src/test/java/org/elasticsearch/common/blobstore/fs/FsBlobContainerTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.tests.mockfile.FilterFileSystemProvider; import org.apache.lucene.tests.mockfile.FilterSeekableByteChannel; import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.common.blobstore.BlobPath; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.Streams; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.PathUtils; @@ -94,6 +95,48 @@ public class FsBlobContainerTests extends ESTestCase { assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true)); } + public void testCompareAndExchange() throws Exception { + final Path path = PathUtils.get(createTempDir().toString()); + final FsBlobContainer container = new FsBlobContainer( + new FsBlobStore(randomIntBetween(1, 8) * 1024, path, false), + BlobPath.EMPTY, + path + ); + + final String key = randomAlphaOfLength(10); + final AtomicLong expectedValue = new AtomicLong(); + + for (int i = 0; i < 5; i++) { + switch (between(1, 4)) { + case 1 -> assertEquals(expectedValue.get(), container.getRegister(key)); + case 2 -> assertFalse( + container.compareAndSetRegister(key, randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong), randomLong()) + ); + case 3 -> assertEquals( + expectedValue.get(), + container.compareAndExchangeRegister( + key, + randomValueOtherThan(expectedValue.get(), ESTestCase::randomLong), + randomLong() + ) + ); + case 4 -> {/* no-op */} + } + + final var newValue = randomLong(); + if (randomBoolean()) { + assertTrue(container.compareAndSetRegister(key, expectedValue.get(), newValue)); + } else { + assertEquals(expectedValue.get(), container.compareAndExchangeRegister(key, expectedValue.get(), newValue)); + } + expectedValue.set(newValue); + } + + final byte[] corruptContents = new byte[9]; + container.writeBlob(key, new BytesArray(corruptContents, 0, randomFrom(1, 7, 9)), false); + expectThrows(IllegalStateException.class, () -> container.compareAndExchangeRegister(key, expectedValue.get(), 0)); + } + static class MockFileSystemProvider extends FilterFileSystemProvider { final Consumer onRead; diff --git a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java index 6415ff468198..a7f3db69c43d 100644 --- a/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java +++ b/x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/common/TestUtils.java @@ -228,6 +228,11 @@ public final class TestUtils { throw unsupportedException(); } + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) throws IOException { + throw unsupportedException(); + } + private UnsupportedOperationException unsupportedException() { assert false : "this operation is not supported and should have not be called"; return new UnsupportedOperationException("This operation is not supported"); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index bafff4e21aba..f05d3ddd02dc 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -537,6 +537,12 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { blobMetadataByName.keySet().removeIf(s -> s.startsWith(blobNamePrefix) == false); return blobMetadataByName; } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + assert false : "should not have been called"; + throw new UnsupportedOperationException(); + } } } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java index defb80ef7893..9a514a24fb4b 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java @@ -404,6 +404,12 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase { blobMetadataByName.keySet().removeIf(s -> s.startsWith(blobNamePrefix) == false); return blobMetadataByName; } + + @Override + public long compareAndExchangeRegister(String key, long expected, long updated) { + assert false : "should not have been called"; + throw new UnsupportedOperationException(); + } } }