mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
More efficient sort in tryRelocateShard
(#128063)
No need to do this via an allocation-heavy `Stream`, we can just put the objects straight into an array, sort them in-place, and keep hold of the array to avoid having to allocate anything on the next iteration. Also slims down `BY_DESCENDING_SHARD_ID`: it's always sorting the same index so we don't need to look at `ShardId#index` in the comparison, nor do we really need multiple layers of vtable lookups, we can just compare the shard IDs directly. Relates #128021
This commit is contained in:
parent
923a8e1c6f
commit
a84dff876e
2 changed files with 28 additions and 7 deletions
5
docs/changelog/128063.yaml
Normal file
5
docs/changelog/128063.yaml
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
pr: 128063
|
||||||
|
summary: More efficient sort in `tryRelocateShard`
|
||||||
|
area: Allocation
|
||||||
|
type: enhancement
|
||||||
|
issues: []
|
|
@ -54,7 +54,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.BiFunction;
|
import java.util.function.BiFunction;
|
||||||
import java.util.stream.StreamSupport;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
|
import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.REPLACE;
|
||||||
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
|
import static org.elasticsearch.cluster.routing.ExpectedShardSizeEstimator.getExpectedShardSize;
|
||||||
|
@ -1091,7 +1090,13 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||||
return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions);
|
return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.routingNode.node() : null, nodeDecisions);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed();
|
private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = (s1, s2) -> Integer.compare(s2.id(), s1.id());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scratch space for accumulating/sorting the {@link ShardRouting} instances when contemplating moving the shards away from a node
|
||||||
|
* in {@link #tryRelocateShard} - re-used to avoid extraneous allocations etc.
|
||||||
|
*/
|
||||||
|
private ShardRouting[] shardRoutingsOnMaxWeightNode;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
|
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
|
||||||
|
@ -1102,13 +1107,24 @@ public class BalancedShardsAllocator implements ShardsAllocator {
|
||||||
final ModelIndex index = maxNode.getIndex(idx);
|
final ModelIndex index = maxNode.getIndex(idx);
|
||||||
if (index != null) {
|
if (index != null) {
|
||||||
logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
|
logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
|
||||||
final Iterable<ShardRouting> shardRoutings = StreamSupport.stream(index.spliterator(), false)
|
if (shardRoutingsOnMaxWeightNode == null || shardRoutingsOnMaxWeightNode.length < index.numShards()) {
|
||||||
.filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway
|
shardRoutingsOnMaxWeightNode = new ShardRouting[index.numShards() * 2]; // oversized so reuse is more likely
|
||||||
.sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic
|
}
|
||||||
::iterator;
|
|
||||||
|
int startedShards = 0;
|
||||||
|
for (final var shardRouting : index) {
|
||||||
|
if (shardRouting.started()) { // cannot rebalance unassigned, initializing or relocating shards anyway
|
||||||
|
shardRoutingsOnMaxWeightNode[startedShards] = shardRouting;
|
||||||
|
startedShards += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// check in descending order of shard id so that the decision is deterministic
|
||||||
|
ArrayUtil.timSort(shardRoutingsOnMaxWeightNode, 0, startedShards, BY_DESCENDING_SHARD_ID);
|
||||||
|
|
||||||
final AllocationDeciders deciders = allocation.deciders();
|
final AllocationDeciders deciders = allocation.deciders();
|
||||||
for (ShardRouting shard : shardRoutings) {
|
for (int shardIndex = 0; shardIndex < startedShards; shardIndex++) {
|
||||||
|
final ShardRouting shard = shardRoutingsOnMaxWeightNode[shardIndex];
|
||||||
|
|
||||||
final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
||||||
if (rebalanceDecision.type() == Type.NO) {
|
if (rebalanceDecision.type() == Type.NO) {
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue