mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Add test to show snapshot doesn't block shard close/fail (#113788)
Relates ES-9635
This commit is contained in:
parent
ab520d9a65
commit
4471e82dcc
1 changed files with 86 additions and 0 deletions
|
@ -63,6 +63,7 @@ import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
import org.elasticsearch.test.ClusterServiceUtils;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.nio.channels.SeekableByteChannel;
|
import java.nio.channels.SeekableByteChannel;
|
||||||
|
@ -76,8 +77,10 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
|
@ -87,6 +90,7 @@ import java.util.stream.IntStream;
|
||||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
|
||||||
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
|
import static org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY;
|
||||||
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
|
import static org.elasticsearch.index.shard.IndexShardTests.getEngineFromShard;
|
||||||
|
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.METADATA_NAME_FORMAT;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.READONLY_SETTING_KEY;
|
||||||
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT;
|
import static org.elasticsearch.repositories.blobstore.BlobStoreRepository.SNAPSHOT_NAME_FORMAT;
|
||||||
|
@ -1405,6 +1409,88 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
|
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCloseOrReallocateDuringPartialSnapshot() {
|
||||||
|
final var repoName = randomIdentifier();
|
||||||
|
createRepository(repoName, "mock");
|
||||||
|
|
||||||
|
final var blockingNode = internalCluster().startNode(
|
||||||
|
Settings.builder().put(NODE_ROLES_SETTING.getKey(), "data").put("thread_pool." + ThreadPool.Names.SNAPSHOT + ".max", 1)
|
||||||
|
);
|
||||||
|
|
||||||
|
// blocking the snapshot thread pool to ensure that we only retain the shard lock while actively running snapshot tasks
|
||||||
|
final var barrier = new CyclicBarrier(2);
|
||||||
|
final var keepGoing = new AtomicBoolean(true);
|
||||||
|
final var blockingNodeExecutor = internalCluster().getInstance(ThreadPool.class, blockingNode).executor(ThreadPool.Names.SNAPSHOT);
|
||||||
|
blockingNodeExecutor.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
safeAwait(barrier);
|
||||||
|
safeAwait(barrier);
|
||||||
|
if (keepGoing.get()) {
|
||||||
|
blockingNodeExecutor.execute(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final var indexName = randomIdentifier();
|
||||||
|
createIndex(indexName, indexSettings(1, 0).put(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", blockingNode).build());
|
||||||
|
indexRandomDocs(indexName, between(1, 100));
|
||||||
|
flushAndRefresh(indexName);
|
||||||
|
|
||||||
|
safeAwait(barrier);
|
||||||
|
|
||||||
|
final var snapshotName = randomIdentifier();
|
||||||
|
final var partialSnapshot = randomBoolean();
|
||||||
|
ActionFuture<CreateSnapshotResponse> snapshotFuture = clusterAdmin().prepareCreateSnapshot(
|
||||||
|
TEST_REQUEST_TIMEOUT,
|
||||||
|
repoName,
|
||||||
|
snapshotName
|
||||||
|
).setIndices(indexName).setWaitForCompletion(true).setPartial(partialSnapshot).execute();
|
||||||
|
|
||||||
|
// we have currently blocked the start-snapshot task from running, and it will be followed by at least three blob uploads
|
||||||
|
// (segments_N, .cfe, .cfs), executed one-at-a-time because of throttling to the max threadpool size, so it's safe to let up to
|
||||||
|
// three tasks through without the snapshot being able to complete
|
||||||
|
final var snapshotTasks = between(0, 3);
|
||||||
|
logger.info("--> running (at most) {} tasks", snapshotTasks);
|
||||||
|
for (int i = 0; i < snapshotTasks; i++) {
|
||||||
|
safeAwait(barrier);
|
||||||
|
safeAwait(barrier);
|
||||||
|
}
|
||||||
|
assertFalse(snapshotFuture.isDone());
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (partialSnapshot && randomBoolean()) {
|
||||||
|
logger.info("--> closing index [{}]", indexName);
|
||||||
|
safeGet(indicesAdmin().prepareClose(indexName).execute());
|
||||||
|
ensureGreen(indexName);
|
||||||
|
} else {
|
||||||
|
logger.info("--> failing index [{}] to trigger recovery", indexName);
|
||||||
|
IndexShard indexShard = null;
|
||||||
|
for (IndexService indexService : internalCluster().getInstance(IndicesService.class, blockingNode)) {
|
||||||
|
if (indexService.index().getName().equals(indexName)) {
|
||||||
|
indexShard = indexService.getShard(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(indexShard);
|
||||||
|
final var primaryTerm = indexShard.getOperationPrimaryTerm();
|
||||||
|
indexShard.failShard("simulated", new ElasticsearchException("simulated"));
|
||||||
|
safeAwait(
|
||||||
|
ClusterServiceUtils.addTemporaryStateListener(
|
||||||
|
internalCluster().getInstance(ClusterService.class),
|
||||||
|
cs -> cs.metadata().index(indexName).primaryTerm(0) > primaryTerm
|
||||||
|
)
|
||||||
|
);
|
||||||
|
ensureGreen(indexName);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
keepGoing.set(false);
|
||||||
|
safeAwait(barrier);
|
||||||
|
}
|
||||||
|
|
||||||
|
safeGet(snapshotFuture);
|
||||||
|
}
|
||||||
|
|
||||||
public void testCloseIndexDuringRestore() throws Exception {
|
public void testCloseIndexDuringRestore() throws Exception {
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue