mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Add bulk delete method to BlobStore interface and implementations (#98948)
This commit is contained in:
parent
01686a8093
commit
f6a2b5c9ef
17 changed files with 196 additions and 89 deletions
|
@ -123,7 +123,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
|
|||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
blobStore.deleteBlobs(new Iterator<>() {
|
||||
blobStore.deleteBlobsIgnoringIfNotExists(new Iterator<>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return blobNames.hasNext();
|
||||
|
|
|
@ -263,7 +263,8 @@ public class AzureBlobStore implements BlobStore {
|
|||
throw exception;
|
||||
}
|
||||
|
||||
void deleteBlobs(Iterator<String> blobs) throws IOException {
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobs) throws IOException {
|
||||
if (blobs.hasNext() == false) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -524,7 +524,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
*
|
||||
* @param blobNames names of the blobs to delete
|
||||
*/
|
||||
void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
if (blobNames.hasNext() == false) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -13,13 +13,11 @@ import com.amazonaws.services.s3.AmazonS3;
|
|||
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
|
||||
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
|
||||
import com.amazonaws.services.s3.model.ListNextBatchOfObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import com.amazonaws.services.s3.model.MultipartUpload;
|
||||
import com.amazonaws.services.s3.model.ObjectListing;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -32,7 +30,6 @@ import com.amazonaws.services.s3.model.UploadPartResult;
|
|||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRunnable;
|
||||
import org.elasticsearch.action.support.RefCountingListener;
|
||||
|
@ -70,12 +67,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.common.blobstore.support.BlobContainerUtils.getRegisterUsingConsistentRead;
|
||||
import static org.elasticsearch.core.Strings.format;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
|
||||
import static org.elasticsearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
|
||||
|
@ -84,12 +79,6 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
|
||||
private static final Logger logger = LogManager.getLogger(S3BlobContainer.class);
|
||||
|
||||
/**
|
||||
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
|
||||
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
|
||||
*/
|
||||
private static final int MAX_BULK_DELETES = 1000;
|
||||
|
||||
private final S3BlobStore blobStore;
|
||||
private final String keyPath;
|
||||
|
||||
|
@ -357,55 +346,7 @@ class S3BlobContainer extends AbstractBlobContainer {
|
|||
outstanding = blobNames;
|
||||
}
|
||||
|
||||
final List<String> partition = new ArrayList<>();
|
||||
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
|
||||
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
|
||||
final AtomicReference<Exception> aex = new AtomicReference<>();
|
||||
SocketAccess.doPrivilegedVoid(() -> {
|
||||
outstanding.forEachRemaining(key -> {
|
||||
partition.add(key);
|
||||
if (partition.size() == MAX_BULK_DELETES) {
|
||||
deletePartition(clientReference, partition, aex);
|
||||
partition.clear();
|
||||
}
|
||||
});
|
||||
if (partition.isEmpty() == false) {
|
||||
deletePartition(clientReference, partition, aex);
|
||||
}
|
||||
});
|
||||
if (aex.get() != null) {
|
||||
throw aex.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void deletePartition(AmazonS3Reference clientReference, List<String> partition, AtomicReference<Exception> aex) {
|
||||
try {
|
||||
clientReference.client().deleteObjects(bulkDelete(blobStore, partition));
|
||||
} catch (MultiObjectDeleteException e) {
|
||||
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
|
||||
// first remove all keys that were sent in the request and then add back those that ran into an exception.
|
||||
logger.warn(
|
||||
() -> format(
|
||||
"Failed to delete some blobs %s",
|
||||
e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList()
|
||||
),
|
||||
e
|
||||
);
|
||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
||||
} catch (AmazonClientException e) {
|
||||
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
|
||||
// remove any keys from the outstanding deletes set.
|
||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
||||
}
|
||||
}
|
||||
|
||||
private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List<String> blobs) {
|
||||
return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY))
|
||||
.withQuiet(true)
|
||||
.withRequestMetricCollector(blobStore.deleteMetricCollector);
|
||||
blobStore.deleteBlobsIgnoringIfNotExists(outstanding);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -8,16 +8,21 @@
|
|||
|
||||
package org.elasticsearch.repositories.s3;
|
||||
|
||||
import com.amazonaws.AmazonClientException;
|
||||
import com.amazonaws.Request;
|
||||
import com.amazonaws.Response;
|
||||
import com.amazonaws.metrics.RequestMetricCollector;
|
||||
import com.amazonaws.services.s3.model.CannedAccessControlList;
|
||||
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
|
||||
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
|
||||
import com.amazonaws.services.s3.model.StorageClass;
|
||||
import com.amazonaws.util.AWSRequestMetrics;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
|
@ -28,13 +33,25 @@ import org.elasticsearch.core.TimeValue;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.core.Strings.format;
|
||||
|
||||
class S3BlobStore implements BlobStore {
|
||||
|
||||
/**
|
||||
* Maximum number of deletes in a {@link DeleteObjectsRequest}.
|
||||
* @see <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html">S3 Documentation</a>.
|
||||
*/
|
||||
private static final int MAX_BULK_DELETES = 1000;
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(S3BlobStore.class);
|
||||
|
||||
private final S3Service service;
|
||||
|
@ -189,6 +206,59 @@ class S3BlobStore implements BlobStore {
|
|||
return new S3BlobContainer(path, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
final List<String> partition = new ArrayList<>();
|
||||
try (AmazonS3Reference clientReference = clientReference()) {
|
||||
// S3 API only allows 1k blobs per delete so we split up the given blobs into requests of max. 1k deletes
|
||||
final AtomicReference<Exception> aex = new AtomicReference<>();
|
||||
SocketAccess.doPrivilegedVoid(() -> {
|
||||
blobNames.forEachRemaining(key -> {
|
||||
partition.add(key);
|
||||
if (partition.size() == MAX_BULK_DELETES) {
|
||||
deletePartition(clientReference, partition, aex);
|
||||
partition.clear();
|
||||
}
|
||||
});
|
||||
if (partition.isEmpty() == false) {
|
||||
deletePartition(clientReference, partition, aex);
|
||||
}
|
||||
});
|
||||
if (aex.get() != null) {
|
||||
throw aex.get();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Failed to delete blobs " + partition.stream().limit(10).toList(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private void deletePartition(AmazonS3Reference clientReference, List<String> partition, AtomicReference<Exception> aex) {
|
||||
try {
|
||||
clientReference.client().deleteObjects(bulkDelete(this, partition));
|
||||
} catch (MultiObjectDeleteException e) {
|
||||
// We are sending quiet mode requests so we can't use the deleted keys entry on the exception and instead
|
||||
// first remove all keys that were sent in the request and then add back those that ran into an exception.
|
||||
logger.warn(
|
||||
() -> format(
|
||||
"Failed to delete some blobs %s",
|
||||
e.getErrors().stream().map(err -> "[" + err.getKey() + "][" + err.getCode() + "][" + err.getMessage() + "]").toList()
|
||||
),
|
||||
e
|
||||
);
|
||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
||||
} catch (AmazonClientException e) {
|
||||
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
|
||||
// remove any keys from the outstanding deletes set.
|
||||
aex.set(ExceptionsHelper.useOrSuppress(aex.get(), e));
|
||||
}
|
||||
}
|
||||
|
||||
private static DeleteObjectsRequest bulkDelete(S3BlobStore blobStore, List<String> blobs) {
|
||||
return new DeleteObjectsRequest(blobStore.bucket()).withKeys(blobs.toArray(Strings.EMPTY_ARRAY))
|
||||
.withQuiet(true)
|
||||
.withRequestMetricCollector(blobStore.deleteMetricCollector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
this.service.close();
|
||||
|
|
|
@ -21,8 +21,10 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.core.CheckedFunction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.net.URL;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
|
@ -105,6 +107,11 @@ public class URLBlobStore implements BlobStore {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
throw new UnsupportedOperationException("Bulk deletes are not supported in URL repositories");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// nothing to do here...
|
||||
|
|
|
@ -17,6 +17,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
|||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
final class HdfsBlobStore implements BlobStore {
|
||||
|
||||
|
@ -69,6 +70,11 @@ final class HdfsBlobStore implements BlobStore {
|
|||
return new HdfsBlobContainer(path, this, buildHdfsPath(path), bufferSize, securityContext, replicationFactor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
throw new UnsupportedOperationException("Bulk deletes are not supported in Hdfs repositories");
|
||||
}
|
||||
|
||||
private Path buildHdfsPath(BlobPath blobPath) {
|
||||
final Path path = translateToHdfsPath(blobPath);
|
||||
if (readOnly == false) {
|
||||
|
|
|
@ -44,6 +44,11 @@ public class HdfsBlobStoreRepositoryTests extends ESBlobStoreRepositoryIntegTest
|
|||
testSnapshotAndRestore(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void testBlobStoreBulkDeletion() throws Exception {
|
||||
// HDFS does not implement bulk deletion from different BlobContainers
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Collections.singletonList(HdfsPlugin.class);
|
||||
|
|
|
@ -8,7 +8,9 @@
|
|||
package org.elasticsearch.common.blobstore;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -21,6 +23,12 @@ public interface BlobStore extends Closeable {
|
|||
*/
|
||||
BlobContainer blobContainer(BlobPath path);
|
||||
|
||||
/**
|
||||
* Delete all the provided blobs from the blob store. Each blob could belong to a different {@code BlobContainer}
|
||||
* @param blobNames the blobs to be deleted
|
||||
*/
|
||||
void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns statistics on the count of operations that have been performed on this blob store
|
||||
*/
|
||||
|
|
|
@ -181,32 +181,7 @@ public class FsBlobContainer extends AbstractBlobContainer {
|
|||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
IOException ioe = null;
|
||||
long suppressedExceptions = 0;
|
||||
while (blobNames.hasNext()) {
|
||||
try {
|
||||
Path resolve = path.resolve(blobNames.next());
|
||||
IOUtils.rm(resolve);
|
||||
} catch (IOException e) {
|
||||
// IOUtils.rm puts the original exception as a string in the IOException message. Ignore no such file exception.
|
||||
if (e.getMessage().contains("NoSuchFileException") == false) {
|
||||
// track up to 10 delete exceptions and try to continue deleting on exceptions
|
||||
if (ioe == null) {
|
||||
ioe = e;
|
||||
} else if (ioe.getSuppressed().length < 10) {
|
||||
ioe.addSuppressed(e);
|
||||
} else {
|
||||
++suppressedExceptions;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ioe != null) {
|
||||
if (suppressedExceptions > 0) {
|
||||
ioe.addSuppressed(new IOException("Failed to delete files, suppressed [" + suppressedExceptions + "] failures"));
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
blobStore.deleteBlobsIgnoringIfNotExists(Iterators.map(blobNames, blobName -> path.resolve(blobName).toString()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -12,10 +12,12 @@ import org.elasticsearch.ElasticsearchException;
|
|||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.core.IOUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class FsBlobStore implements BlobStore {
|
||||
|
@ -61,6 +63,38 @@ public class FsBlobStore implements BlobStore {
|
|||
return new FsBlobContainer(this, path, f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
IOException ioe = null;
|
||||
long suppressedExceptions = 0;
|
||||
while (blobNames.hasNext()) {
|
||||
try {
|
||||
// FsBlobContainer uses this method to delete blobs; in that case each blob name is already an absolute path meaning that
|
||||
// the resolution done here is effectively a non-op.
|
||||
Path resolve = path.resolve(blobNames.next());
|
||||
IOUtils.rm(resolve);
|
||||
} catch (IOException e) {
|
||||
// IOUtils.rm puts the original exception as a string in the IOException message. Ignore no such file exception.
|
||||
if (e.getMessage().contains("NoSuchFileException") == false) {
|
||||
// track up to 10 delete exceptions and try to continue deleting on exceptions
|
||||
if (ioe == null) {
|
||||
ioe = e;
|
||||
} else if (ioe.getSuppressed().length < 10) {
|
||||
ioe.addSuppressed(e);
|
||||
} else {
|
||||
++suppressedExceptions;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ioe != null) {
|
||||
if (suppressedExceptions > 0) {
|
||||
ioe.addSuppressed(new IOException("Failed to delete files, suppressed [" + suppressedExceptions + "] failures"));
|
||||
}
|
||||
throw ioe;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
// nothing to do here...
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Iterator;
|
||||
|
||||
class LatencySimulatingBlobStoreRepository extends FsRepository {
|
||||
|
||||
|
@ -50,6 +51,11 @@ class LatencySimulatingBlobStoreRepository extends FsRepository {
|
|||
return new LatencySimulatingBlobContainer(blobContainer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
fsBlobStore.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
fsBlobStore.close();
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.hamcrest.CoreMatchers;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -53,6 +54,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasKey;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -481,6 +483,39 @@ public abstract class ESBlobStoreRepositoryIntegTestCase extends ESIntegTestCase
|
|||
assertAcked(clusterAdmin().prepareDeleteSnapshot(repoName, "test-snap2").get());
|
||||
}
|
||||
|
||||
public void testBlobStoreBulkDeletion() throws Exception {
|
||||
Map<BlobPath, List<String>> expectedBlobsPerContainer = new HashMap<>();
|
||||
try (BlobStore store = newBlobStore()) {
|
||||
List<String> blobsToDelete = new ArrayList<>();
|
||||
int numberOfContainers = randomIntBetween(2, 5);
|
||||
for (int i = 0; i < numberOfContainers; i++) {
|
||||
BlobPath containerPath = BlobPath.EMPTY.add(randomIdentifier());
|
||||
final BlobContainer container = store.blobContainer(containerPath);
|
||||
int numberOfBlobsPerContainer = randomIntBetween(5, 10);
|
||||
for (int j = 0; j < numberOfBlobsPerContainer; j++) {
|
||||
byte[] bytes = randomBytes(randomInt(100));
|
||||
String blobName = randomAlphaOfLength(10);
|
||||
container.writeBlob(blobName, new BytesArray(bytes), false);
|
||||
if (randomBoolean()) {
|
||||
blobsToDelete.add(containerPath.buildAsString() + blobName);
|
||||
} else {
|
||||
expectedBlobsPerContainer.computeIfAbsent(containerPath, unused -> new ArrayList<>()).add(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
store.deleteBlobsIgnoringIfNotExists(blobsToDelete.iterator());
|
||||
for (var containerEntry : expectedBlobsPerContainer.entrySet()) {
|
||||
BlobContainer blobContainer = store.blobContainer(containerEntry.getKey());
|
||||
Map<String, BlobMetadata> blobsInContainer = blobContainer.listBlobs();
|
||||
for (String expectedBlob : containerEntry.getValue()) {
|
||||
assertThat(blobsInContainer, hasKey(expectedBlob));
|
||||
}
|
||||
blobContainer.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void addRandomDocuments(String name, int numDocs) throws InterruptedException {
|
||||
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
|
|
|
@ -12,6 +12,7 @@ import org.elasticsearch.common.blobstore.BlobPath;
|
|||
import org.elasticsearch.common.blobstore.BlobStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
public class BlobStoreWrapper implements BlobStore {
|
||||
|
||||
|
@ -26,6 +27,11 @@ public class BlobStoreWrapper implements BlobStore {
|
|||
return delegate.blobContainer(path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
delegate.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
|
|
|
@ -65,6 +65,7 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -461,6 +462,11 @@ public class SearchableSnapshotsPrewarmingIntegTests extends ESSingleNodeTestCas
|
|||
return new TrackingFilesBlobContainer(delegate.blobContainer(path));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) throws IOException {
|
||||
delegate.deleteBlobsIgnoringIfNotExists(blobNames);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegate.close();
|
||||
|
|
|
@ -420,6 +420,9 @@ public class RepositoryAnalysisFailureIT extends AbstractSnapshotIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) {}
|
||||
|
||||
private void deleteContainer(DisruptableBlobContainer container) {
|
||||
blobContainer = null;
|
||||
}
|
||||
|
|
|
@ -240,6 +240,9 @@ public class RepositoryAnalysisSuccessIT extends AbstractSnapshotIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteBlobsIgnoringIfNotExists(Iterator<String> blobNames) {}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue