diff --git a/docs/changelog/126783.yaml b/docs/changelog/126783.yaml new file mode 100644 index 000000000000..ac91c7cfd412 --- /dev/null +++ b/docs/changelog/126783.yaml @@ -0,0 +1,6 @@ +pr: 126783 +summary: Fix shard size of initializing restored shard +area: Allocation +type: bug +issues: + - 105331 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java index ac1c44de6dcf..93f8ce6bce52 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderIT.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteUtils; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.DiskUsageIntegTestCase; @@ -34,8 +35,8 @@ import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestIssueLogging; import org.hamcrest.Matcher; import java.util.Arrays; @@ -43,6 +44,7 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -54,8 +56,10 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { @@ -163,20 +167,10 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getSmallestShardIds()))); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/105331") - @TestIssueLogging( - value = "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer:TRACE," - + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceReconciler:DEBUG," - + "org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator:TRACE," - + "org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator:TRACE," - + "org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders:TRACE," - + "org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider:TRACE", - issueUrl = "https://github.com/elastic/elasticsearch/issues/105331" - ) - public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleShards() throws Exception { + public void testRestoreSnapshotAllocationDoesNotExceedWatermarkWithMultipleRestores() throws Exception { internalCluster().startMasterOnlyNode(); - internalCluster().startDataOnlyNode(); final String dataNodeName = internalCluster().startDataOnlyNode(); + internalCluster().startDataOnlyNode(); ensureStableCluster(3); assertAcked( @@ -185,26 +179,16 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { .setSettings(Settings.builder().put("location", randomRepoPath()).put("compress", randomBoolean())) ); - final AtomicBoolean allowRelocations = new AtomicBoolean(true); final InternalClusterInfoService clusterInfoService = getInternalClusterInfoService(); - internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { - ClusterInfoServiceUtils.refresh(clusterInfoService); - if (allowRelocations.get() == false) { - assertThat( - "Expects no relocating shards but got: " + event.state().getRoutingNodes(), - numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), - equalTo(0) - ); - } - }); - - final String dataNode0Id = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); + internalCluster().getCurrentMasterNodeInstance(ClusterService.class) + .addListener(event -> ClusterInfoServiceUtils.refresh(clusterInfoService)); final String indexName = randomIdentifier(); createIndex(indexName, indexSettings(6, 0).put(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), "0ms").build()); - var shardSizes = createReasonableSizedShards(indexName); + final var shardSizes = createReasonableSizedShards(indexName); final CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") + .setIndices(indexName) .setWaitForCompletion(true) .get(); final SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); @@ -213,13 +197,64 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { assertAcked(indicesAdmin().prepareDelete(indexName).get()); updateClusterSettings(Settings.builder().put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), Rebalance.NONE.toString())); - allowRelocations.set(false); - // reduce disk size of node 0 so that only 1 of 2 smallest shards can be allocated - var usableSpace = shardSizes.sizes().get(1).size(); + // Verify that from this point on we do not do any rebalancing + internalCluster().getCurrentMasterNodeInstance(ClusterService.class).addListener(event -> { + assertThat( + "Expects no relocating shards but got: " + event.state().getRoutingNodes(), + numberOfShardsWithState(event.state().getRoutingNodes(), ShardRoutingState.RELOCATING), + equalTo(0) + ); + }); + + // reduce disk size of one data node so that only one shard copy fits there, forcing all the other shards to be assigned to the + // other data node + final var usableSpace = randomLongBetween(shardSizes.getSmallestShardSize(), shardSizes.getSmallestShardSize() * 2 - 1L); getTestFileStore(dataNodeName).setTotalSpace(usableSpace + WATERMARK_BYTES); refreshDiskUsage(); + // We're going to restore the index twice in quick succession and verify that we don't assign more than one shard in total to the + // chosen node, but to do this we have to work backwards: first we have to set up listeners to react to events and then finally we + // trigger the whole chain by starting the first restore. + final var copyIndexName = indexName + "-copy"; + + // set up a listener that explicitly forbids more than one shard to be assigned to the tiny node + final var dataNodeId = internalCluster().getInstance(NodeEnvironment.class, dataNodeName).nodeId(); + final var allShardsActiveListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { + assertThat(cs.getRoutingNodes().toString(), cs.getRoutingNodes().node(dataNodeId).size(), lessThanOrEqualTo(1)); + var seenCopy = false; + for (final IndexRoutingTable indexRoutingTable : cs.routingTable()) { + if (indexRoutingTable.getIndex().getName().equals(copyIndexName)) { + seenCopy = true; + } + if (indexRoutingTable.allShardsActive() == false) { + return false; + } + } + return seenCopy; // only remove this listener when we've started both restores and all the resulting shards are complete + }); + + // set up a listener which waits for the shards from the first restore to start initializing and then kick off another restore + final var secondRestoreCompleteLatch = new CountDownLatch(1); + final var secondRestoreStartedListener = ClusterServiceUtils.addTemporaryStateListener(cs -> { + final var indexRoutingTable = cs.routingTable().index(indexName); + if (indexRoutingTable != null && indexRoutingTable.shardsWithState(ShardRoutingState.INITIALIZING).isEmpty() == false) { + clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") + .setWaitForCompletion(true) + .setRenamePattern(indexName) + .setRenameReplacement(indexName + "-copy") + .execute(ActionTestUtils.assertNoFailureListener(restoreSnapshotResponse -> { + final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); + assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); + assertThat(restoreInfo.failedShards(), is(0)); + secondRestoreCompleteLatch.countDown(); + })); + return true; + } + return false; + }); + + // now set the ball rolling by doing the first restore, waiting for it to complete final RestoreSnapshotResponse restoreSnapshotResponse = clusterAdmin().prepareRestoreSnapshot(TEST_REQUEST_TIMEOUT, "repo", "snap") .setWaitForCompletion(true) .get(); @@ -227,7 +262,17 @@ public class DiskThresholdDeciderIT extends DiskUsageIntegTestCase { assertThat(restoreInfo.successfulShards(), is(snapshotInfo.totalShards())); assertThat(restoreInfo.failedShards(), is(0)); - assertBusyWithDiskUsageRefresh(dataNode0Id, indexName, contains(in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace)))); + // wait for the second restore to complete too + safeAwait(secondRestoreStartedListener); + safeAwait(secondRestoreCompleteLatch); + + // wait for all the shards to finish moving + safeAwait(allShardsActiveListener); + ensureGreen(indexName, indexName + "-copy"); + + final var tinyNodeShardIds = getShardIds(dataNodeId, indexName); + assertThat(tinyNodeShardIds, hasSize(1)); + assertThat(tinyNodeShardIds.iterator().next(), in(shardSizes.getShardIdsWithSizeSmallerOrEqual(usableSpace))); } private Set getShardIds(final String nodeId, final String indexName) { diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index f1048c7939ef..8412acc739a4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -91,7 +91,7 @@ public class ClusterInfoSimulator { var size = getExpectedShardSize( shard, - UNAVAILABLE_EXPECTED_SHARD_SIZE, + shard.getExpectedShardSize(), getClusterInfo(), allocation.snapshotShardSizeInfo(), allocation.metadata(), diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index 8c6058b47cf0..52296f1d896c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -263,6 +263,15 @@ public class ClusterServiceUtils { ); } + /** + * Creates a {@link ClusterStateListener} which subscribes to the given {@link ClusterService} and waits for it to apply a cluster state + * that satisfies {@code predicate}, at which point it unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * given {@code clusterService}. If the current cluster state already matches {@code predicate} then the returned listener is + * already complete. If no matching cluster state is seen within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is + * completed exceptionally on the scheduler thread that belongs to {@code clusterService}. + */ public static SubscribableListener addTemporaryStateListener(ClusterService clusterService, Predicate predicate) { final var listener = new SubscribableListener(); final ClusterStateListener clusterStateListener = new ClusterStateListener() { @@ -291,4 +300,35 @@ public class ClusterServiceUtils { } return listener; } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of one of the nodes in the + * {@link ESIntegTestCase#internalCluster()}. When the chosen {@link ClusterService} applies a state that satisfies {@code predicate} + * the listener unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * {@link ClusterService} belonging to one of the nodes in the {@link ESIntegTestCase#internalCluster()}. If the current cluster + * state already matches {@code predicate} then the returned listener is already complete. If no matching cluster state is seen + * within {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that + * belongs to the chosen node's {@link ClusterService}. + */ + public static SubscribableListener addTemporaryStateListener(Predicate predicate) { + return addTemporaryStateListener(ESIntegTestCase.internalCluster().clusterService(), predicate); + } + + /** + * Creates a {@link ClusterStateListener} which subscribes to the {@link ClusterService} of the current elected master node in the + * {@link ESIntegTestCase#internalCluster()}. When this node's {@link ClusterService} applies a state that satisfies {@code predicate} + * the listener unsubscribes itself. + * + * @return A {@link SubscribableListener} which is completed when the first cluster state matching {@code predicate} is applied by the + * {@link ClusterService} belonging to the node that was the elected master node in the + * {@link ESIntegTestCase#internalCluster()} when this method was first called. If the current cluster state already matches + * {@code predicate} then the returned listener is already complete. If no matching cluster state is seen within + * {@link ESTestCase#SAFE_AWAIT_TIMEOUT} then the listener is completed exceptionally on the scheduler thread that belongs to + * the elected master node's {@link ClusterService}. + */ + public static SubscribableListener addMasterTemporaryStateListener(Predicate predicate) { + return addTemporaryStateListener(ESIntegTestCase.internalCluster().getCurrentMasterNodeInstance(ClusterService.class), predicate); + } }