From adf4d1005f70d4df3ba9c70f8a1b597473fcecae Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 17 Jun 2025 13:28:00 +1000 Subject: [PATCH] Setting for estimated shard heap allocation decider (#128722) This PR adds a new setting to toggle the collection for shard heap usages as well as wiring ShardHeapUsage into ClusterInfoSimulator. Relates: #128723 --- .../datastreams/LookAHeadTimeTests.java | 3 +- .../index/shard/IndexShardIT.java | 31 ++++++++--- .../cluster/ClusterInfoSimulator.java | 4 +- .../cluster/InternalClusterInfoService.java | 54 ++++++++++++++++--- .../elasticsearch/cluster/ShardHeapUsage.java | 6 ++- .../common/settings/ClusterSettings.java | 1 + ...rnalClusterInfoServiceSchedulingTests.java | 4 +- .../test/ESSingleNodeTestCase.java | 4 ++ 8 files changed, 90 insertions(+), 17 deletions(-) diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java index 199bc36d833a..20e17720e1ef 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/LookAHeadTimeTests.java @@ -118,7 +118,8 @@ public class LookAHeadTimeTests extends ESSingleNodeTestCase { updateIndexSettings(indexSettings); } - private void updateClusterSettings(Settings settings) { + @Override + protected void updateClusterSettings(Settings settings) { clusterAdmin().updateSettings( new ClusterUpdateSettingsRequest(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).persistentSettings(settings) ).actionGet(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 4967888e021c..5c705569a0b1 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -264,14 +264,33 @@ public class IndexShardIT extends ESSingleNodeTestCase { public void testHeapUsageEstimateIsPresent() { InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); ClusterInfoServiceUtils.refresh(clusterInfoService); - ClusterState state = getInstanceFromNode(ClusterService.class).state(); Map shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); assertNotNull(shardHeapUsages); - assertEquals(state.nodes().size(), shardHeapUsages.size()); - for (DiscoveryNode node : state.nodes()) { - assertTrue(shardHeapUsages.containsKey(node.getId())); - ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); - assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); + // Not collecting yet because it is disabled + assertTrue(shardHeapUsages.isEmpty()); + + // Enable collection for shard heap usages + updateClusterSettings( + Settings.builder() + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true) + .build() + ); + try { + ClusterInfoServiceUtils.refresh(clusterInfoService); + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); + assertEquals(state.nodes().size(), shardHeapUsages.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(shardHeapUsages.containsKey(node.getId())); + ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); + assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); + } + } finally { + updateClusterSettings( + Settings.builder() + .putNull(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey()) + .build() + ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 0536322b1d73..ac983672642a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -33,6 +33,7 @@ public class ClusterInfoSimulator { private final CopyOnFirstWriteMap shardSizes; private final Map shardDataSetSizes; private final Map dataPath; + private final Map shardHeapUsages; public ClusterInfoSimulator(RoutingAllocation allocation) { this.allocation = allocation; @@ -41,6 +42,7 @@ public class ClusterInfoSimulator { this.shardSizes = new CopyOnFirstWriteMap<>(allocation.clusterInfo().shardSizes); this.shardDataSetSizes = Map.copyOf(allocation.clusterInfo().shardDataSetSizes); this.dataPath = Map.copyOf(allocation.clusterInfo().dataPath); + this.shardHeapUsages = allocation.clusterInfo().getShardHeapUsages(); } /** @@ -154,7 +156,7 @@ public class ClusterInfoSimulator { shardDataSetSizes, dataPath, Map.of(), - Map.of() + shardHeapUsages ); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 4c8655118dd8..c792ce377ef3 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -83,7 +83,15 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt Property.NodeScope ); + public static final Setting CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED = Setting.boolSetting( + "cluster.routing.allocation.shard_heap.threshold_enabled", + false, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean diskThresholdEnabled; + private volatile boolean shardHeapThresholdEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; @@ -130,12 +138,20 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setDiskThresholdEnabled ); + clusterSettings.initializeAndWatch( + CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED, + this::setShardHeapThresholdEnabled + ); } private void setDiskThresholdEnabled(boolean diskThresholdEnabled) { this.diskThresholdEnabled = diskThresholdEnabled; } + private void setShardHeapThresholdEnabled(boolean shardHeapThresholdEnabled) { + this.shardHeapThresholdEnabled = shardHeapThresholdEnabled; + } + private void setFetchTimeout(TimeValue fetchTimeout) { this.fetchTimeout = fetchTimeout; } @@ -185,20 +201,44 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { - if (diskThresholdEnabled) { - try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchIndicesStats(); - } - } else { - logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info"); - indicesStatsSummary = IndicesStatsSummary.EMPTY; + maybeFetchIndicesStats(diskThresholdEnabled); + maybeFetchNodeStats(diskThresholdEnabled || shardHeapThresholdEnabled); + maybeFetchNodesHeapUsage(shardHeapThresholdEnabled); + } + } + + private void maybeFetchIndicesStats(boolean shouldFetch) { + if (shouldFetch) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchIndicesStats(); } + } else { + logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty indices stats"); + indicesStatsSummary = IndicesStatsSummary.EMPTY; + } + } + + private void maybeFetchNodeStats(boolean shouldFetch) { + if (shouldFetch) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { fetchNodeStats(); } + } else { + logger.trace("skipping collecting node stats from cluster, notifying listeners with empty node stats"); + leastAvailableSpaceUsages = Map.of(); + mostAvailableSpaceUsages = Map.of(); + maxHeapPerNode = Map.of(); + } + } + + private void maybeFetchNodesHeapUsage(boolean shouldFetch) { + if (shouldFetch) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) { fetchNodesHeapUsage(); } + } else { + logger.trace("skipping collecting shard heap usage from cluster, notifying listeners with empty shard heap usage"); + shardHeapUsagePerNode = Map.of(); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java index 3da97ac946f5..cc6a00421a29 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java +++ b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java @@ -45,6 +45,10 @@ public record ShardHeapUsage(String nodeId, long totalBytes, long estimatedUsage } public double estimatedUsageAsPercentage() { - return 100.0 * estimatedUsageBytes / (double) totalBytes; + return 100.0 * estimatedUsageAsRatio(); + } + + public double estimatedUsageAsRatio() { + return estimatedUsageBytes / (double) totalBytes; } } diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index e9982b476c52..9b197baef406 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public final class ClusterSettings extends AbstractScopedSettings { DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_FROZEN_MAX_HEADROOM_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, + InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED, SameShardAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SAME_HOST_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index 67a745e743b0..f7e667fd5aa9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -53,7 +53,9 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { final DiscoveryNodes noMaster = DiscoveryNodes.builder().add(discoveryNode).localNodeId(discoveryNode.getId()).build(); final DiscoveryNodes localMaster = noMaster.withMasterNodeId(discoveryNode.getId()); - final Settings.Builder settingsBuilder = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()); + final Settings.Builder settingsBuilder = Settings.builder() + .put(Node.NODE_NAME_SETTING.getKey(), discoveryNode.getName()) + .put(InternalClusterInfoService.CLUSTER_ROUTING_ALLOCATION_SHARD_HEAP_THRESHOLD_DECIDER_ENABLED.getKey(), true); if (randomBoolean()) { settingsBuilder.put(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.getKey(), randomIntBetween(10000, 60000) + "ms"); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java index a786eaf4aca5..7ebc5765bda6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESSingleNodeTestCase.java @@ -536,4 +536,8 @@ public abstract class ESSingleNodeTestCase extends ESTestCase { ) ); } + + protected void updateClusterSettings(Settings settings) { + safeGet(clusterAdmin().prepareUpdateSettings(TEST_REQUEST_TIMEOUT, TEST_REQUEST_TIMEOUT).setPersistentSettings(settings).execute()); + } }