From f6a2b5c9ef0060bff1a18b9d7b9dc79b9d54d513 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Tue, 29 Aug 2023 12:25:03 +0200 Subject: [PATCH] Add bulk delete method to BlobStore interface and implementations (#98948) --- .../azure/AzureBlobContainer.java | 2 +- .../repositories/azure/AzureBlobStore.java | 3 +- .../gcs/GoogleCloudStorageBlobStore.java | 3 +- .../repositories/s3/S3BlobContainer.java | 61 +--------------- .../repositories/s3/S3BlobStore.java | 70 +++++++++++++++++++ .../common/blobstore/url/URLBlobStore.java | 7 ++ .../repositories/hdfs/HdfsBlobStore.java | 6 ++ .../hdfs/HdfsBlobStoreRepositoryTests.java | 5 ++ .../common/blobstore/BlobStore.java | 8 +++ .../common/blobstore/fs/FsBlobContainer.java | 27 +------ .../common/blobstore/fs/FsBlobStore.java | 34 +++++++++ .../LatencySimulatingBlobStoreRepository.java | 6 ++ .../ESBlobStoreRepositoryIntegTestCase.java | 35 ++++++++++ .../snapshots/mockstore/BlobStoreWrapper.java | 6 ++ ...archableSnapshotsPrewarmingIntegTests.java | 6 ++ .../testkit/RepositoryAnalysisFailureIT.java | 3 + .../testkit/RepositoryAnalysisSuccessIT.java | 3 + 17 files changed, 196 insertions(+), 89 deletions(-) diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 828ff053dcbe..b4e6039aae1e 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -123,7 +123,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { @Override public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { - blobStore.deleteBlobs(new Iterator<>() { + blobStore.deleteBlobsIgnoringIfNotExists(new Iterator<>() { @Override public boolean hasNext() { return blobNames.hasNext(); diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 5f1d22fa9ecc..70789c5568fb 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -263,7 +263,8 @@ public class AzureBlobStore implements BlobStore { throw exception; } - void deleteBlobs(Iterator blobs) throws IOException { + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobs) throws IOException { if (blobs.hasNext() == false) { return; } diff --git a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 7d6ac118ceb0..76fade3c5afa 100644 --- a/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/modules/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -524,7 +524,8 @@ class GoogleCloudStorageBlobStore implements BlobStore { * * @param blobNames names of the blobs to delete */ - void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { if (blobNames.hasNext() == false) { return; } diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java index 58c6586ccd04..2e98ce33b94d 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java @@ -13,13 +13,11 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AmazonS3Exception; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; -import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; import com.amazonaws.services.s3.model.ListNextBatchOfObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.MultipartUpload; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -32,7 +30,6 @@ import com.amazonaws.services.s3.model.UploadPartResult; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.RefCountingListener; @@ -70,12 +67,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead; -import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE; import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART; import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART; @@ -84,12 +79,6 @@ class S3BlobContainer extends AbstractBlobContainer { private static final Logger logger = LogManager.getLogger(S3BlobContainer.class); - /** - * Maximum number of deletes in a {@link DeleteObjectsRequest}. - * @see S3 Documentation. - */ - private static final int MAX_BULK_DELETES = 1000; - private final S3BlobStore blobStore; private final String keyPath; @@ -357,55 +346,7 @@ class S3BlobContainer extends AbstractBlobContainer { outstanding = blobNames; } - final List partition = new ArrayList<>(); - try (AmazonS3Reference clientReference = blobStore.clientReference()) { - // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes - final AtomicReference aex = new AtomicReference<>(); - SocketAccess.doPrivilegedVoid(() -> { - outstanding.forEachRemaining(key -> { - partition.add(key); - if (partition.size() == MAX_BULK_DELETES) { - deletePartition(clientReference, partition, aex); - partition.clear(); - } - }); - if (partition.isEmpty() == false) { - deletePartition(clientReference, partition, aex); - } - }); - if (aex.get() != null) { - throw aex.get(); - } - } catch (Exception e) { - throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e); - } - } - - private void deletePartition(AmazonS3Reference clientReference, List partition, AtomicReference aex) { - try { - clientReference.client().deleteObjects(bulkDelete(blobStore, partition)); - } catch (MultiObjectDeleteException e) { - // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead - // first remove all keys that were sent in the request and then add back those that ran into an exception. - logger.warn( - () -> format( - "Failed to delete some blobs %s", - e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList() - ), - e - ); - aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); - } catch (AmazonClientException e) { - // The AWS client threw any unexpected exception and did not execute the request at all so we do not - // remove any keys from the outstanding deletes set. - aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); - } - } - - private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List blobs) { - return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)) - .withQuiet(true) - .withRequestMetricCollector(blobStore.deleteMetricCollector); + blobStore.deleteBlobsIgnoringIfNotExists(outstanding); } @Override diff --git a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java index b1db2b3e0aae..a3fbc2cc13d5 100644 --- a/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java +++ b/modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobStore.java @@ -8,16 +8,21 @@ package org.elasticsearch.repositories.s3; +import com.amazonaws.AmazonClientException; import com.amazonaws.Request; import com.amazonaws.Response; import com.amazonaws.metrics.RequestMetricCollector; import com.amazonaws.services.s3.model.CannedAccessControlList; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.MultiObjectDeleteException; import com.amazonaws.services.s3.model.StorageClass; import com.amazonaws.util.AWSRequestMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; @@ -28,13 +33,25 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.core.Strings.format; class S3BlobStore implements BlobStore { + /** + * Maximum number of deletes in a {@link DeleteObjectsRequest}. + * @see S3 Documentation. + */ + private static final int MAX_BULK_DELETES = 1000; + private static final Logger logger = LogManager.getLogger(S3BlobStore.class); private final S3Service service; @@ -189,6 +206,59 @@ class S3BlobStore implements BlobStore { return new S3BlobContainer(path, this); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + final List partition = new ArrayList<>(); + try (AmazonS3Reference clientReference = clientReference()) { + // S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes + final AtomicReference aex = new AtomicReference<>(); + SocketAccess.doPrivilegedVoid(() -> { + blobNames.forEachRemaining(key -> { + partition.add(key); + if (partition.size() == MAX_BULK_DELETES) { + deletePartition(clientReference, partition, aex); + partition.clear(); + } + }); + if (partition.isEmpty() == false) { + deletePartition(clientReference, partition, aex); + } + }); + if (aex.get() != null) { + throw aex.get(); + } + } catch (Exception e) { + throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e); + } + } + + private void deletePartition(AmazonS3Reference clientReference, List partition, AtomicReference aex) { + try { + clientReference.client().deleteObjects(bulkDelete(this, partition)); + } catch (MultiObjectDeleteException e) { + // We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead + // first remove all keys that were sent in the request and then add back those that ran into an exception. + logger.warn( + () -> format( + "Failed to delete some blobs %s", + e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList() + ), + e + ); + aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + } catch (AmazonClientException e) { + // The AWS client threw any unexpected exception and did not execute the request at all so we do not + // remove any keys from the outstanding deletes set. + aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e)); + } + } + + private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List blobs) { + return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY)) + .withQuiet(true) + .withRequestMetricCollector(blobStore.deleteMetricCollector); + } + @Override public void close() throws IOException { this.service.close(); diff --git a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java index ce37aca2d7a3..327a66e94e6d 100644 --- a/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java +++ b/modules/repository-url/src/main/java/org/elasticsearch/common/blobstore/url/URLBlobStore.java @@ -21,8 +21,10 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.core.CheckedFunction; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.Iterator; import java.util.List; /** @@ -105,6 +107,11 @@ public class URLBlobStore implements BlobStore { } } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + throw new UnsupportedOperationException("Bulk deletes are not supported in URL repositories"); + } + @Override public void close() { // nothing to do here... diff --git a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java index b85acf5d328c..1dc246cdeeb6 100644 --- a/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java +++ b/plugins/repository-hdfs/src/main/java/org/elasticsearch/repositories/hdfs/HdfsBlobStore.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import java.io.IOException; +import java.util.Iterator; final class HdfsBlobStore implements BlobStore { @@ -69,6 +70,11 @@ final class HdfsBlobStore implements BlobStore { return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext, replicationFactor); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + throw new UnsupportedOperationException("Bulk deletes are not supported in Hdfs repositories"); + } + private Path buildHdfsPath(BlobPath blobPath) { final Path path = translateToHdfsPath(blobPath); if (readOnly == false) { diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java index 7a9260b38bcd..fed4411f6876 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsBlobStoreRepositoryTests.java @@ -44,6 +44,11 @@ public class HdfsBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTest testSnapshotAndRestore(false); } + @Override + public void testBlobStoreBulkDeletion() throws Exception { + // HDFS does not implement bulk deletion from different BlobContainers + } + @Override protected Collection> nodePlugins() { return Collections.singletonList(HdfsPlugin.class); diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java b/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java index 24e7c43533d6..d66b8b970437 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/BlobStore.java @@ -8,7 +8,9 @@ package org.elasticsearch.common.blobstore; import java.io.Closeable; +import java.io.IOException; import java.util.Collections; +import java.util.Iterator; import java.util.Map; /** @@ -21,6 +23,12 @@ public interface BlobStore extends Closeable { */ BlobContainer blobContainer(BlobPath path); + /** + * Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer} + * @param blobNames the blobs to be deleted + */ + void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException; + /** * Returns statistics on the count of operations that have been performed on this blob store */ diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java index 30a056503901..838d0e3f4d08 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobContainer.java @@ -181,32 +181,7 @@ public class FsBlobContainer extends AbstractBlobContainer { @Override public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { - IOException ioe = null; - long suppressedExceptions = 0; - while (blobNames.hasNext()) { - try { - Path resolve = path.resolve(blobNames.next()); - IOUtils.rm(resolve); - } catch (IOException e) { - // IOUtils.rm puts the original exception as a string in the IOException message. Ignore no such file exception. - if (e.getMessage().contains("NoSuchFileException") == false) { - // track up to 10 delete exceptions and try to continue deleting on exceptions - if (ioe == null) { - ioe = e; - } else if (ioe.getSuppressed().length < 10) { - ioe.addSuppressed(e); - } else { - ++suppressedExceptions; - } - } - } - } - if (ioe != null) { - if (suppressedExceptions > 0) { - ioe.addSuppressed(new IOException("Failed to delete files, suppressed [" + suppressedExceptions + "] failures")); - } - throw ioe; - } + blobStore.deleteBlobsIgnoringIfNotExists(Iterators.map(blobNames, blobName -> path.resolve(blobName).toString())); } @Override diff --git a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java index 784f69197672..77553ea21c5b 100644 --- a/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java +++ b/server/src/main/java/org/elasticsearch/common/blobstore/fs/FsBlobStore.java @@ -12,10 +12,12 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; +import org.elasticsearch.core.IOUtils; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Iterator; import java.util.List; public class FsBlobStore implements BlobStore { @@ -61,6 +63,38 @@ public class FsBlobStore implements BlobStore { return new FsBlobContainer(this, path, f); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + IOException ioe = null; + long suppressedExceptions = 0; + while (blobNames.hasNext()) { + try { + // FsBlobContainer uses this method to delete blobs; in that case each blob name is already an absolute path meaning that + // the resolution done here is effectively a non-op. + Path resolve = path.resolve(blobNames.next()); + IOUtils.rm(resolve); + } catch (IOException e) { + // IOUtils.rm puts the original exception as a string in the IOException message. Ignore no such file exception. + if (e.getMessage().contains("NoSuchFileException") == false) { + // track up to 10 delete exceptions and try to continue deleting on exceptions + if (ioe == null) { + ioe = e; + } else if (ioe.getSuppressed().length < 10) { + ioe.addSuppressed(e); + } else { + ++suppressedExceptions; + } + } + } + } + if (ioe != null) { + if (suppressedExceptions > 0) { + ioe.addSuppressed(new IOException("Failed to delete files, suppressed [" + suppressedExceptions + "] failures")); + } + throw ioe; + } + } + @Override public void close() { // nothing to do here... diff --git a/test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepository.java b/test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepository.java index 873577136cc2..c184dca3887b 100644 --- a/test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepository.java +++ b/test/external-modules/latency-simulating-directory/src/main/java/org/elasticsearch/test/simulatedlatencyrepo/LatencySimulatingBlobStoreRepository.java @@ -22,6 +22,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry; import java.io.IOException; import java.io.InputStream; +import java.util.Iterator; class LatencySimulatingBlobStoreRepository extends FsRepository { @@ -50,6 +51,11 @@ class LatencySimulatingBlobStoreRepository extends FsRepository { return new LatencySimulatingBlobContainer(blobContainer); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + fsBlobStore.deleteBlobsIgnoringIfNotExists(blobNames); + } + @Override public void close() throws IOException { fsBlobStore.close(); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java index 86ec7c77d14e..d1b211128c10 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESBlobStoreRepositoryIntegTestCase.java @@ -40,6 +40,7 @@ import org.hamcrest.CoreMatchers; import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -53,6 +54,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -481,6 +483,39 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase assertAcked(clusterAdmin().prepareDeleteSnapshot(repoName, "test-snap2").get()); } + public void testBlobStoreBulkDeletion() throws Exception { + Map> expectedBlobsPerContainer = new HashMap<>(); + try (BlobStore store = newBlobStore()) { + List blobsToDelete = new ArrayList<>(); + int numberOfContainers = randomIntBetween(2, 5); + for (int i = 0; i < numberOfContainers; i++) { + BlobPath containerPath = BlobPath.EMPTY.add(randomIdentifier()); + final BlobContainer container = store.blobContainer(containerPath); + int numberOfBlobsPerContainer = randomIntBetween(5, 10); + for (int j = 0; j < numberOfBlobsPerContainer; j++) { + byte[] bytes = randomBytes(randomInt(100)); + String blobName = randomAlphaOfLength(10); + container.writeBlob(blobName, new BytesArray(bytes), false); + if (randomBoolean()) { + blobsToDelete.add(containerPath.buildAsString() + blobName); + } else { + expectedBlobsPerContainer.computeIfAbsent(containerPath, unused -> new ArrayList<>()).add(blobName); + } + } + } + + store.deleteBlobsIgnoringIfNotExists(blobsToDelete.iterator()); + for (var containerEntry : expectedBlobsPerContainer.entrySet()) { + BlobContainer blobContainer = store.blobContainer(containerEntry.getKey()); + Map blobsInContainer = blobContainer.listBlobs(); + for (String expectedBlob : containerEntry.getValue()) { + assertThat(blobsInContainer, hasKey(expectedBlob)); + } + blobContainer.delete(); + } + } + } + protected void addRandomDocuments(String name, int numDocs) throws InterruptedException { IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java index b4aae75b2c8f..926f9dc2b2a8 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobStoreWrapper.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import java.io.IOException; +import java.util.Iterator; public class BlobStoreWrapper implements BlobStore { @@ -26,6 +27,11 @@ public class BlobStoreWrapper implements BlobStore { return delegate.blobContainer(path); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + delegate.deleteBlobsIgnoringIfNotExists(blobNames); + } + @Override public void close() throws IOException { delegate.close(); diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java index 961118d0ab84..1cdfa51bdf77 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/cache/full/SearchableSnapshotsPrewarmingIntegTests.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -461,6 +462,11 @@ public class SearchableSnapshotsPrewarmingIntegTests extends ESSingleNodeTestCas return new TrackingFilesBlobContainer(delegate.blobContainer(path)); } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) throws IOException { + delegate.deleteBlobsIgnoringIfNotExists(blobNames); + } + @Override public void close() throws IOException { delegate.close(); diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java index 2d498ff23b8e..a5ce2b49d9c2 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisFailureIT.java @@ -420,6 +420,9 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase { } } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) {} + private void deleteContainer(DisruptableBlobContainer container) { blobContainer = null; } diff --git a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java index 6c66ecb0674c..b29940964e94 100644 --- a/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java +++ b/x-pack/plugin/snapshot-repo-test-kit/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/testkit/RepositoryAnalysisSuccessIT.java @@ -240,6 +240,9 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase { } } + @Override + public void deleteBlobsIgnoringIfNotExists(Iterator blobNames) {} + @Override public void close() {}