mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 01:22:26 -04:00
Setting for estimated shard heap allocation decider (#128722)
This PR adds a new setting to toggle the collection for shard heap usages as well as wiring ShardHeapUsage into ClusterInfoSimulator. Relates: #128723
This commit is contained in:
parent
7fb130c53b
commit
adf4d1005f
8 changed files with 90 additions and 17 deletions
|
@ -118,7 +118,8 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase {
|
|||
updateIndexSettings(indexSettings);
|
||||
}
|
||||
|
||||
private void updateClusterSettings(Settings settings) {
|
||||
@Override
|
||||
protected void updateClusterSettings(Settings settings) {
|
||||
clusterAdmin().updateSettings(
|
||||
new ClusterUpdateSettingsRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).persistentSettings(settings)
|
||||
).actionGet();
|
||||
|
|
|
@ -264,14 +264,33 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
public void testHeapUsageEstimateIsPresent() {
|
||||
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
|
||||
ClusterInfoServiceUtils.refresh(clusterInfoService);
|
||||
ClusterState state = getInstanceFromNode(ClusterService.class).state();
|
||||
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
|
||||
assertNotNull(shardHeapUsages);
|
||||
assertEquals(state.nodes().size(), shardHeapUsages.size());
|
||||
for (DiscoveryNode node : state.nodes()) {
|
||||
assertTrue(shardHeapUsages.containsKey(node.getId()));
|
||||
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
|
||||
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
|
||||
// Not collecting yet because it is disabled
|
||||
assertTrue(shardHeapUsages.isEmpty());
|
||||
|
||||
// Enable collection for shard heap usages
|
||||
updateClusterSettings(
|
||||
Settings.builder()
|
||||
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true)
|
||||
.build()
|
||||
);
|
||||
try {
|
||||
ClusterInfoServiceUtils.refresh(clusterInfoService);
|
||||
ClusterState state = getInstanceFromNode(ClusterService.class).state();
|
||||
shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
|
||||
assertEquals(state.nodes().size(), shardHeapUsages.size());
|
||||
for (DiscoveryNode node : state.nodes()) {
|
||||
assertTrue(shardHeapUsages.containsKey(node.getId()));
|
||||
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
|
||||
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
|
||||
}
|
||||
} finally {
|
||||
updateClusterSettings(
|
||||
Settings.builder()
|
||||
.putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ public class ClusterInfoSimulator {
|
|||
private final CopyOnFirstWriteMap<String, Long> shardSizes;
|
||||
private final Map<ShardId, Long> shardDataSetSizes;
|
||||
private final Map<NodeAndShard, String> dataPath;
|
||||
private final Map<String, ShardHeapUsage> shardHeapUsages;
|
||||
|
||||
public ClusterInfoSimulator(RoutingAllocation allocation) {
|
||||
this.allocation = allocation;
|
||||
|
@ -41,6 +42,7 @@ public class ClusterInfoSimulator {
|
|||
this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes);
|
||||
this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes);
|
||||
this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath);
|
||||
this.shardHeapUsages = allocation.clusterInfo().getShardHeapUsages();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -154,7 +156,7 @@ public class ClusterInfoSimulator {
|
|||
shardDataSetSizes,
|
||||
dataPath,
|
||||
Map.of(),
|
||||
Map.of()
|
||||
shardHeapUsages
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,15 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
|
|||
Property.NodeScope
|
||||
);
|
||||
|
||||
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting(
|
||||
"cluster.routing.allocation.shard_heap.threshold_enabled",
|
||||
false,
|
||||
Property.Dynamic,
|
||||
Property.NodeScope
|
||||
);
|
||||
|
||||
private volatile boolean diskThresholdEnabled;
|
||||
private volatile boolean shardHeapThresholdEnabled;
|
||||
private volatile TimeValue updateFrequency;
|
||||
private volatile TimeValue fetchTimeout;
|
||||
|
||||
|
@ -130,12 +138,20 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
|
|||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
|
||||
this::setDiskThresholdEnabled
|
||||
);
|
||||
clusterSettings.initializeAndWatch(
|
||||
CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED,
|
||||
this::setShardHeapThresholdEnabled
|
||||
);
|
||||
}
|
||||
|
||||
private void setDiskThresholdEnabled(boolean diskThresholdEnabled) {
|
||||
this.diskThresholdEnabled = diskThresholdEnabled;
|
||||
}
|
||||
|
||||
private void setShardHeapThresholdEnabled(boolean shardHeapThresholdEnabled) {
|
||||
this.shardHeapThresholdEnabled = shardHeapThresholdEnabled;
|
||||
}
|
||||
|
||||
private void setFetchTimeout(TimeValue fetchTimeout) {
|
||||
this.fetchTimeout = fetchTimeout;
|
||||
}
|
||||
|
@ -185,20 +201,44 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
|
|||
logger.trace("starting async refresh");
|
||||
|
||||
try (var ignoredRefs = fetchRefs) {
|
||||
if (diskThresholdEnabled) {
|
||||
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
|
||||
fetchIndicesStats();
|
||||
}
|
||||
} else {
|
||||
logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info");
|
||||
indicesStatsSummary = IndicesStatsSummary.EMPTY;
|
||||
maybeFetchIndicesStats(diskThresholdEnabled);
|
||||
maybeFetchNodeStats(diskThresholdEnabled || shardHeapThresholdEnabled);
|
||||
maybeFetchNodesHeapUsage(shardHeapThresholdEnabled);
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeFetchIndicesStats(boolean shouldFetch) {
|
||||
if (shouldFetch) {
|
||||
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
|
||||
fetchIndicesStats();
|
||||
}
|
||||
} else {
|
||||
logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty indices stats");
|
||||
indicesStatsSummary = IndicesStatsSummary.EMPTY;
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeFetchNodeStats(boolean shouldFetch) {
|
||||
if (shouldFetch) {
|
||||
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
|
||||
fetchNodeStats();
|
||||
}
|
||||
} else {
|
||||
logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats");
|
||||
leastAvailableSpaceUsages = Map.of();
|
||||
mostAvailableSpaceUsages = Map.of();
|
||||
maxHeapPerNode = Map.of();
|
||||
}
|
||||
}
|
||||
|
||||
private void maybeFetchNodesHeapUsage(boolean shouldFetch) {
|
||||
if (shouldFetch) {
|
||||
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
|
||||
fetchNodesHeapUsage();
|
||||
}
|
||||
} else {
|
||||
logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage");
|
||||
shardHeapUsagePerNode = Map.of();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,10 @@ public record ShardHeapUsage(String nodeId, long totalBytes, long estimatedUsage
|
|||
}
|
||||
|
||||
public double estimatedUsageAsPercentage() {
|
||||
return 100.0 * estimatedUsageBytes / (double) totalBytes;
|
||||
return 100.0 * estimatedUsageAsRatio();
|
||||
}
|
||||
|
||||
public double estimatedUsageAsRatio() {
|
||||
return estimatedUsageBytes / (double) totalBytes;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -292,6 +292,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
|||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING,
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
|
||||
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING,
|
||||
InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED,
|
||||
SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING,
|
||||
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING,
|
||||
InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING,
|
||||
|
|
|
@ -53,7 +53,9 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
|
|||
final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build();
|
||||
final DiscoveryNodes localMaster = noMaster.withMasterNodeId(discoveryNode.getId());
|
||||
|
||||
final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName());
|
||||
final Settings.Builder settingsBuilder = Settings.builder()
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName())
|
||||
.put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true);
|
||||
if (randomBoolean()) {
|
||||
settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms");
|
||||
}
|
||||
|
|
|
@ -536,4 +536,8 @@ public abstract class ESSingleNodeTestCase extends ESTestCase {
|
|||
)
|
||||
);
|
||||
}
|
||||
|
||||
protected void updateClusterSettings(Settings settings) {
|
||||
safeGet(clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).execute());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue