diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java b/server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java index 4fd8cc113fcf..793127bf9527 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimator.java @@ -58,6 +58,7 @@ public class ExpectedShardSizeEstimator { // shrink/split/clone operation is going to clone existing locally placed shards using file system hard links // so no additional space is going to be used until future merges case LOCAL_SHARDS -> false; + case RESHARD_SPLIT -> false; }; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java index a6d46ce1ff7b..838d3cf539b3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RecoverySource.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.xcontent.ToXContent; @@ -31,6 +32,7 @@ import java.util.Objects; * - {@link PeerRecoverySource} recovery from a primary on another node * - {@link SnapshotRecoverySource} recovery from a snapshot * - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node + * - {@link ReshardSplitRecoverySource} recovery of a shard that is created as a result of a resharding split */ public abstract class RecoverySource implements Writeable, ToXContentObject { @@ -57,6 +59,7 @@ public abstract class RecoverySource implements Writeable, ToXContentObject { case PEER -> PeerRecoverySource.INSTANCE; case SNAPSHOT -> new SnapshotRecoverySource(in); case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE; + case RESHARD_SPLIT -> new ReshardSplitRecoverySource(in); }; } @@ -78,7 +81,8 @@ public abstract class RecoverySource implements Writeable, ToXContentObject { EXISTING_STORE, PEER, SNAPSHOT, - LOCAL_SHARDS + LOCAL_SHARDS, + RESHARD_SPLIT } public abstract Type getType(); @@ -319,4 +323,39 @@ public abstract class RecoverySource implements Writeable, ToXContentObject { return false; } } + + /** + * Recovery of a shard that is created as a result of a resharding split. + * Not to be confused with _split API. + */ + public static class ReshardSplitRecoverySource extends RecoverySource { + private final ShardId sourceShardId; + + public ReshardSplitRecoverySource(ShardId sourceShardId) { + this.sourceShardId = sourceShardId; + } + + ReshardSplitRecoverySource(StreamInput in) throws IOException { + sourceShardId = new ShardId(in); + } + + @Override + public Type getType() { + return Type.RESHARD_SPLIT; + } + + public ShardId getSourceShardId() { + return sourceShardId; + } + + @Override + protected void writeAdditionalFields(StreamOutput out) throws IOException { + sourceShardId.writeTo(out); + } + + @Override + public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException { + sourceShardId.toXContent(builder, params); + } + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 56dc710f9a75..01b7e63280d7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -3507,7 +3507,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // } assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource()); switch (recoveryState.getRecoverySource().getType()) { - case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); + case EMPTY_STORE, EXISTING_STORE, RESHARD_SPLIT -> executeRecovery( + "from store", + recoveryState, + recoveryListener, + this::recoverFromStore + ); case PEER -> { try { markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState); diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 89d9a780728f..290d0e98ef63 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -89,8 +89,9 @@ public final class StoreRecovery { void recoverFromStore(final IndexShard indexShard, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); - assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE - : "expected store recovery type but was: " + recoveryType; + assert recoveryType == RecoverySource.Type.EMPTY_STORE + || recoveryType == RecoverySource.Type.EXISTING_STORE + || recoveryType == RecoverySource.Type.RESHARD_SPLIT : "expected one of store recovery types but was: " + recoveryType; logger.debug("starting recovery from store ..."); final var recoveryListener = recoveryListener(indexShard, listener); try { diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index ff1a83d26592..f1cb74e6d0e4 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.Type; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingTable; @@ -701,6 +702,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple logger.trace("ignoring initializing shard {} - no source node can be found.", shardId); return; } + } else if (shardRouting.recoverySource() instanceof RecoverySource.ReshardSplitRecoverySource reshardSplitRecoverySource) { + ShardId sourceShardId = reshardSplitRecoverySource.getSourceShardId(); + sourceNode = findSourceNodeForReshardSplitRecovery(state.routingTable(project.id()), state.nodes(), sourceShardId); + if (sourceNode == null) { + logger.trace("ignoring initializing reshard target shard {} - no source node can be found.", shardId); + return; + } } else { sourceNode = null; } @@ -988,6 +996,31 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple return sourceNode; } + private static DiscoveryNode findSourceNodeForReshardSplitRecovery( + RoutingTable routingTable, + DiscoveryNodes nodes, + ShardId sourceShardId + ) { + ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard(); + + if (sourceShardRouting.active() == false) { + assert false : sourceShardRouting; + logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting); + return null; + } + + DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId()); + if (sourceNode == null) { + assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId(); + logger.trace( + "can't find reshard split source node because source shard {} is assigned to an unknown node.", + sourceShardRouting + ); + return null; + } + return sourceNode; + } + private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {} private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener { diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 7cab02c8a3c8..96cebbb68251 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -110,11 +110,10 @@ public class RecoveryState implements ToXContentFragment, Writeable { public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) { this(shardRouting.shardId(), shardRouting.primary(), shardRouting.recoverySource(), sourceNode, targetNode, index, new Timer()); assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting; - assert (shardRouting.recoverySource().getType() == RecoverySource.Type.PEER) == (sourceNode != null) - : "peer recovery requires source node, recovery type: " - + shardRouting.recoverySource().getType() - + " source node: " - + sourceNode; + assert shardRouting.recoverySource().getType() != RecoverySource.Type.PEER || sourceNode != null + : "peer recovery requires source node but it is null"; + assert shardRouting.recoverySource().getType() != RecoverySource.Type.RESHARD_SPLIT || sourceNode != null + : "reshard split target recovery requires source node but it is null"; timer.start(); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java index 1856773000a0..47358ed2574b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RecoverySourceTests.java @@ -34,10 +34,11 @@ public class RecoverySourceTests extends ESTestCase { assertEquals(RecoverySource.Type.PEER.ordinal(), 2); assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3); assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4); + assertEquals(RecoverySource.Type.RESHARD_SPLIT.ordinal(), 5); // check exhaustiveness for (RecoverySource.Type type : RecoverySource.Type.values()) { assertThat(type.ordinal(), greaterThanOrEqualTo(0)); - assertThat(type.ordinal(), lessThanOrEqualTo(4)); + assertThat(type.ordinal(), lessThanOrEqualTo(5)); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index 6d3fe05620f4..731faccdeede 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -275,7 +275,8 @@ public class TestShardRouting { new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())), IndexVersion.current(), new IndexId("some_index", randomUUID()) - ) + ), + new RecoverySource.ReshardSplitRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000))) ); } }