[8.18] Fix shard size of initializing restored shard (#126783) (#127169)

* Fix shard size of initializing restored shard (#126783)

For shards being restored from a snapshot we use `SnapshotShardSizeInfo`
to track their sizes while they're unassigned, and then use
`ShardRouting#expectedShardSize` when they start to recover. However we
were incorrectly ignoring the `ShardRouting#expectedShardSize` value
when accounting for the movements of shards in the
`ClusterInfoSimulator`, which would sometimes cause us to assign more
shards to a node than its disk space should have allowed.

Closes #105331

* Backport utils from 40095992c2

* Missing throws
This commit is contained in:
David Turner 2025-04-23 10:57:40 +01:00 committed by GitHub
parent cbf36bf2e2
commit 65c6ea6ab1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 123 additions and 32 deletions

View file

@ -0,0 +1,6 @@
pr: 126783
summary: Fix shard size of initializing restored shard
area: Allocation
type: bug
issues:
- 105331

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionTestUtils;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.DiskUsageIntegTestCase; import org.elasticsearch.cluster.DiskUsageIntegTestCase;
@ -34,8 +35,8 @@ import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import java.util.Arrays; import java.util.Arrays;
@ -43,6 +44,7 @@ import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -54,8 +56,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
@ -163,20 +167,10 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds()))); assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds())));
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331") public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() throws Exception {
@TestIssueLogging(
value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE,"
+ "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE",
issueUrl = "https://github.com/elastic/elasticsearch/issues/105331"
)
public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception {
internalCluster().startMasterOnlyNode(); internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String dataNodeName = internalCluster().startDataOnlyNode(); final String dataNodeName = internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode();
ensureStableCluster(3); ensureStableCluster(3);
assertAcked( assertAcked(
@ -185,26 +179,16 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
.setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean()))
); );
final AtomicBoolean allowRelocations = new AtomicBoolean(true);
final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
ClusterInfoServiceUtils.refresh(clusterInfoService); .addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService));
if (allowRelocations.get() == false) {
assertThat(
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
equalTo(0)
);
}
});
final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final String indexName = randomIdentifier(); final String indexName = randomIdentifier();
createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build());
var shardSizes = createReasonableSizedShards(indexName); final var shardSizes = createReasonableSizedShards(indexName);
final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setIndices(indexName)
.setWaitForCompletion(true) .setWaitForCompletion(true)
.get(); .get();
final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo();
@ -213,13 +197,64 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
assertAcked(indicesAdmin().prepareDelete(indexName).get()); assertAcked(indicesAdmin().prepareDelete(indexName).get());
updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString()));
allowRelocations.set(false);
// reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated // Verify that from this point on we do not do any rebalancing
var usableSpace = shardSizes.sizes().get(1).size(); internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> {
assertThat(
"Expects no relocating shards but got: " + event.state().getRoutingNodes(),
numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING),
equalTo(0)
);
});
// reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the
// other data node
final var usableSpace = randomLongBetween(shardSizes.getSmallestShardSize(), shardSizes.getSmallestShardSize() * 2 - 1L);
getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES); getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES);
refreshDiskUsage(); refreshDiskUsage();
// We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the
// chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we
// trigger the whole chain by starting the first restore.
final var copyIndexName = indexName + "-copy";
// set up a listener that explicitly forbids more than one shard to be assigned to the tiny node
final var dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId();
final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1));
var seenCopy = false;
for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) {
if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) {
seenCopy = true;
}
if (indexRoutingTable.allShardsActive() == false) {
return false;
}
}
return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete
});
// set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore
final var secondRestoreCompleteLatch = new CountDownLatch(1);
final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> {
final var indexRoutingTable = cs.routingTable().index(indexName);
if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) {
clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true)
.setRenamePattern(indexName)
.setRenameReplacement(indexName + "-copy")
.execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> {
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0));
secondRestoreCompleteLatch.countDown();
}));
return true;
}
return false;
});
// now set the ball rolling by doing the first restore, waiting for it to complete
final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap")
.setWaitForCompletion(true) .setWaitForCompletion(true)
.get(); .get();
@ -227,7 +262,17 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase {
assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards()));
assertThat(restoreInfo.failedShards(), is(0)); assertThat(restoreInfo.failedShards(), is(0));
assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)))); // wait for the second restore to complete too
safeAwait(secondRestoreStartedListener);
safeAwait(secondRestoreCompleteLatch);
// wait for all the shards to finish moving
safeAwait(allShardsActiveListener);
ensureGreen(indexName, indexName + "-copy");
final var tinyNodeShardIds = getShardIds(dataNodeId, indexName);
assertThat(tinyNodeShardIds, hasSize(1));
assertThat(tinyNodeShardIds.iterator().next(), in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)));
} }
private Set<ShardId> getShardIds(final String nodeId, final String indexName) { private Set<ShardId> getShardIds(final String nodeId, final String indexName) {

View file

@ -91,7 +91,7 @@ public class ClusterInfoSimulator {
var size = getExpectedShardSize( var size = getExpectedShardSize(
shard, shard,
UNAVAILABLE_EXPECTED_SHARD_SIZE, shard.getExpectedShardSize(),
getClusterInfo(), getClusterInfo(),
allocation.snapshotShardSizeInfo(), allocation.snapshotShardSizeInfo(),
allocation.metadata(), allocation.metadata(),

View file

@ -263,6 +263,15 @@ public class ClusterServiceUtils {
); );
} }
/**
* Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state
* that satisfies {@code predicate}, at which point it unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is
* already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is
* completed exceptionally on the scheduler thread that belongs to {@code clusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) { public static SubscribableListener<Void> addTemporaryStateListener(ClusterService clusterService, Predicate<ClusterState> predicate) {
final var listener = new SubscribableListener<Void>(); final var listener = new SubscribableListener<Void>();
final ClusterStateListener clusterStateListener = new ClusterStateListener() { final ClusterStateListener clusterStateListener = new ClusterStateListener() {
@ -291,4 +300,35 @@ public class ClusterServiceUtils {
} }
return listener; return listener;
} }
/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the
* {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster
* state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen
* within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that
* belongs to the chosen node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addTemporaryStateListener(Predicate<ClusterState> predicate) {
return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate);
}
/**
* Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the
* {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate}
* the listener unsubscribes itself.
*
* @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the
* {@link ClusterService} belonging to the node that was the elected master node in the
* {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches
* {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within
* {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to
* the elected master node's {@link ClusterService}.
*/
public static SubscribableListener<Void> addMasterTemporaryStateListener(Predicate<ClusterState> predicate) {
return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate);
}
} }