Add new recovery source for reshard split target shards (#129159)

This commit is contained in:
Oleksandr Kolomiiets 2025-06-13 12:27:18 -07:00 committed by GitHub
parent 4656a53b14
commit b24bb3566e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 91 additions and 11 deletions

View file

@ -58,6 +58,7 @@ public class ExpectedShardSizeEstimator {
// shrink/split/clone operation is going to clone existing locally placed shards using file system hard links
// so no additional space is going to be used until future merges
case LOCAL_SHARDS -> false;
case RESHARD_SPLIT -> false;
};
}

View file

@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
@ -31,6 +32,7 @@ import java.util.Objects;
* - {@link PeerRecoverySource} recovery from a primary on another node
* - {@link SnapshotRecoverySource} recovery from a snapshot
* - {@link LocalShardsRecoverySource} recovery from other shards of another index on the same node
* - {@link ReshardSplitRecoverySource} recovery of a shard that is created as a result of a resharding split
*/
public abstract class RecoverySource implements Writeable, ToXContentObject {
@ -57,6 +59,7 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
case PEER -> PeerRecoverySource.INSTANCE;
case SNAPSHOT -> new SnapshotRecoverySource(in);
case LOCAL_SHARDS -> LocalShardsRecoverySource.INSTANCE;
case RESHARD_SPLIT -> new ReshardSplitRecoverySource(in);
};
}
@ -78,7 +81,8 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
EXISTING_STORE,
PEER,
SNAPSHOT,
LOCAL_SHARDS
LOCAL_SHARDS,
RESHARD_SPLIT
}
public abstract Type getType();
@ -319,4 +323,39 @@ public abstract class RecoverySource implements Writeable, ToXContentObject {
return false;
}
}
/**
* Recovery of a shard that is created as a result of a resharding split.
* Not to be confused with _split API.
*/
public static class ReshardSplitRecoverySource extends RecoverySource {
private final ShardId sourceShardId;
public ReshardSplitRecoverySource(ShardId sourceShardId) {
this.sourceShardId = sourceShardId;
}
ReshardSplitRecoverySource(StreamInput in) throws IOException {
sourceShardId = new ShardId(in);
}
@Override
public Type getType() {
return Type.RESHARD_SPLIT;
}
public ShardId getSourceShardId() {
return sourceShardId;
}
@Override
protected void writeAdditionalFields(StreamOutput out) throws IOException {
sourceShardId.writeTo(out);
}
@Override
public void addAdditionalFields(XContentBuilder builder, Params params) throws IOException {
sourceShardId.toXContent(builder, params);
}
}
}

View file

@ -3507,7 +3507,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// }
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE, EXISTING_STORE -> executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
case EMPTY_STORE, EXISTING_STORE, RESHARD_SPLIT -> executeRecovery(
"from store",
recoveryState,
recoveryListener,
this::recoverFromStore
);
case PEER -> {
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);

View file

@ -89,8 +89,9 @@ public final class StoreRecovery {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE
: "expected store recovery type but was: " + recoveryType;
assert recoveryType == RecoverySource.Type.EMPTY_STORE
|| recoveryType == RecoverySource.Type.EXISTING_STORE
|| recoveryType == RecoverySource.Type.RESHARD_SPLIT : "expected one of store recovery types but was: " + recoveryType;
logger.debug("starting recovery from store ...");
final var recoveryListener = recoveryListener(indexShard, listener);
try {

View file

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
@ -701,6 +702,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
logger.trace("ignoring initializing shard {} - no source node can be found.", shardId);
return;
}
} else if (shardRouting.recoverySource() instanceof RecoverySource.ReshardSplitRecoverySource reshardSplitRecoverySource) {
ShardId sourceShardId = reshardSplitRecoverySource.getSourceShardId();
sourceNode = findSourceNodeForReshardSplitRecovery(state.routingTable(project.id()), state.nodes(), sourceShardId);
if (sourceNode == null) {
logger.trace("ignoring initializing reshard target shard {} - no source node can be found.", shardId);
return;
}
} else {
sourceNode = null;
}
@ -988,6 +996,31 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
return sourceNode;
}
private static DiscoveryNode findSourceNodeForReshardSplitRecovery(
RoutingTable routingTable,
DiscoveryNodes nodes,
ShardId sourceShardId
) {
ShardRouting sourceShardRouting = routingTable.shardRoutingTable(sourceShardId).primaryShard();
if (sourceShardRouting.active() == false) {
assert false : sourceShardRouting;
logger.trace("can't find reshard split source node because source shard {} is not active.", sourceShardRouting);
return null;
}
DiscoveryNode sourceNode = nodes.get(sourceShardRouting.currentNodeId());
if (sourceNode == null) {
assert false : "Source node for reshard does not exist: " + sourceShardRouting.currentNodeId();
logger.trace(
"can't find reshard split source node because source shard {} is assigned to an unknown node.",
sourceShardRouting
);
return null;
}
return sourceNode;
}
private record PendingShardCreation(String clusterStateUUID, long startTimeMillis) {}
private class RecoveryListener implements PeerRecoveryTargetService.RecoveryListener {

View file

@ -110,11 +110,10 @@ public class RecoveryState implements ToXContentFragment, Writeable {
public RecoveryState(ShardRouting shardRouting, DiscoveryNode targetNode, @Nullable DiscoveryNode sourceNode, Index index) {
this(shardRouting.shardId(), shardRouting.primary(), shardRouting.recoverySource(), sourceNode, targetNode, index, new Timer());
assert shardRouting.initializing() : "only allow initializing shard routing to be recovered: " + shardRouting;
assert (shardRouting.recoverySource().getType() == RecoverySource.Type.PEER) == (sourceNode != null)
: "peer recovery requires source node, recovery type: "
+ shardRouting.recoverySource().getType()
+ " source node: "
+ sourceNode;
assert shardRouting.recoverySource().getType() != RecoverySource.Type.PEER || sourceNode != null
: "peer recovery requires source node but it is null";
assert shardRouting.recoverySource().getType() != RecoverySource.Type.RESHARD_SPLIT || sourceNode != null
: "reshard split target recovery requires source node but it is null";
timer.start();
}

View file

@ -34,10 +34,11 @@ public class RecoverySourceTests extends ESTestCase {
assertEquals(RecoverySource.Type.PEER.ordinal(), 2);
assertEquals(RecoverySource.Type.SNAPSHOT.ordinal(), 3);
assertEquals(RecoverySource.Type.LOCAL_SHARDS.ordinal(), 4);
assertEquals(RecoverySource.Type.RESHARD_SPLIT.ordinal(), 5);
// check exhaustiveness
for (RecoverySource.Type type : RecoverySource.Type.values()) {
assertThat(type.ordinal(), greaterThanOrEqualTo(0));
assertThat(type.ordinal(), lessThanOrEqualTo(4));
assertThat(type.ordinal(), lessThanOrEqualTo(5));
}
}
}

View file

@ -275,7 +275,8 @@ public class TestShardRouting {
new Snapshot("repo", new SnapshotId(randomIdentifier(), randomUUID())),
IndexVersion.current(),
new IndexId("some_index", randomUUID())
)
),
new RecoverySource.ReshardSplitRecoverySource(new ShardId("some_index", randomUUID(), randomIntBetween(0, 1000)))
);
}
}