From 4471e82dcca711bf2b4fe4e0b51f455a617efa30 Mon Sep 17 00:00:00 2001 From: David Turner Date: Tue, 1 Oct 2024 08:13:20 +0100 Subject: [PATCH] Add test to show snapshot doesn't block shard close/fail (#113788) Relates ES-9635 --- .../SharedClusterSnapshotRestoreIT.java | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 386dd7f9587f..2b00efa49ca5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -63,6 +63,7 @@ import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.snapshots.mockstore.MockRepository; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.threadpool.ThreadPool; import java.nio.channels.SeekableByteChannel; @@ -76,8 +77,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Predicate; @@ -87,6 +90,7 @@ import java.util.stream.IntStream; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY; import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY; import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT; @@ -1405,6 +1409,88 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); } + public void testCloseOrReallocateDuringPartialSnapshot() { + final var repoName = randomIdentifier(); + createRepository(repoName, "mock"); + + final var blockingNode = internalCluster().startNode( + Settings.builder().put(NODE_ROLES_SETTING.getKey(), "data").put("thread_pool." + ThreadPool.Names.SNAPSHOT + ".max", 1) + ); + + // blocking the snapshot thread pool to ensure that we only retain the shard lock while actively running snapshot tasks + final var barrier = new CyclicBarrier(2); + final var keepGoing = new AtomicBoolean(true); + final var blockingNodeExecutor = internalCluster().getInstance(ThreadPool.class, blockingNode).executor(ThreadPool.Names.SNAPSHOT); + blockingNodeExecutor.execute(new Runnable() { + @Override + public void run() { + safeAwait(barrier); + safeAwait(barrier); + if (keepGoing.get()) { + blockingNodeExecutor.execute(this); + } + } + }); + + final var indexName = randomIdentifier(); + createIndex(indexName, indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", blockingNode).build()); + indexRandomDocs(indexName, between(1, 100)); + flushAndRefresh(indexName); + + safeAwait(barrier); + + final var snapshotName = randomIdentifier(); + final var partialSnapshot = randomBoolean(); + ActionFuture snapshotFuture = clusterAdmin().prepareCreateSnapshot( + TEST_REQUEST_TIMEOUT, + repoName, + snapshotName + ).setIndices(indexName).setWaitForCompletion(true).setPartial(partialSnapshot).execute(); + + // we have currently blocked the start-snapshot task from running, and it will be followed by at least three blob uploads + // (segments_N, .cfe, .cfs), executed one-at-a-time because of throttling to the max threadpool size, so it's safe to let up to + // three tasks through without the snapshot being able to complete + final var snapshotTasks = between(0, 3); + logger.info("--> running (at most) {} tasks", snapshotTasks); + for (int i = 0; i < snapshotTasks; i++) { + safeAwait(barrier); + safeAwait(barrier); + } + assertFalse(snapshotFuture.isDone()); + + try { + if (partialSnapshot && randomBoolean()) { + logger.info("--> closing index [{}]", indexName); + safeGet(indicesAdmin().prepareClose(indexName).execute()); + ensureGreen(indexName); + } else { + logger.info("--> failing index [{}] to trigger recovery", indexName); + IndexShard indexShard = null; + for (IndexService indexService : internalCluster().getInstance(IndicesService.class, blockingNode)) { + if (indexService.index().getName().equals(indexName)) { + indexShard = indexService.getShard(0); + break; + } + } + assertNotNull(indexShard); + final var primaryTerm = indexShard.getOperationPrimaryTerm(); + indexShard.failShard("simulated", new ElasticsearchException("simulated")); + safeAwait( + ClusterServiceUtils.addTemporaryStateListener( + internalCluster().getInstance(ClusterService.class), + cs -> cs.metadata().index(indexName).primaryTerm(0) > primaryTerm + ) + ); + ensureGreen(indexName); + } + } finally { + keepGoing.set(false); + safeAwait(barrier); + } + + safeGet(snapshotFuture); + } + public void testCloseIndexDuringRestore() throws Exception { Client client = client();