From 0702e429f045ccd9ad09cd0ae58c04beaa0c500c Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Thu, 12 Jun 2025 13:45:57 +1000 Subject: [PATCH] Add heap usage estimate to ClusterInfo (#128723) Co-authored-by: ywangd Co-authored-by: rjernst Relates: ES-11445 --- .../index/shard/IndexShardIT.java | 59 ++++++++++++- ...sticsearch.cluster.ShardHeapUsageCollector | 10 +++ .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/cluster/ClusterInfo.java | 29 ++++++- .../cluster/ClusterInfoSimulator.java | 1 + .../cluster/InternalClusterInfoService.java | 84 ++++++++++++++----- .../elasticsearch/cluster/ShardHeapUsage.java | 50 +++++++++++ .../cluster/ShardHeapUsageCollector.java | 36 ++++++++ .../node/NodeServiceProvider.java | 13 ++- .../cluster/ClusterInfoTests.java | 15 +++- ...rnalClusterInfoServiceSchedulingTests.java | 24 +++++- .../cluster/ShardHeapUsageTests.java | 37 ++++++++ .../ExpectedShardSizeEstimatorTests.java | 1 + .../AllocationStatsServiceTests.java | 1 + .../allocation/DiskThresholdMonitorTests.java | 2 +- .../ExpectedShardSizeAllocationTests.java | 3 +- .../BalancedShardsAllocatorTests.java | 3 +- .../ClusterAllocationSimulationTests.java | 2 +- .../allocator/ClusterBalanceStatsTests.java | 1 + .../allocator/ClusterInfoSimulatorTests.java | 10 ++- .../DesiredBalanceComputerTests.java | 4 +- .../DesiredBalanceReconcilerTests.java | 1 + .../decider/DiskThresholdDeciderTests.java | 2 +- .../DiskThresholdDeciderUnitTests.java | 14 +++- .../MockInternalClusterInfoService.java | 2 +- .../ReactiveStorageDeciderService.java | 1 + ...oscalingCalculateCapacityServiceTests.java | 4 +- .../FrozenStorageDeciderServiceTests.java | 2 +- .../ProactiveStorageDeciderServiceTests.java | 2 +- .../ReactiveStorageDeciderServiceTests.java | 4 +- ...nsportNodeDeprecationCheckActionTests.java | 1 + 31 files changed, 376 insertions(+), 43 deletions(-) create mode 100644 server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ShardHeapUsageCollector create mode 100644 server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java create mode 100644 server/src/main/java/org/elasticsearch/cluster/ShardHeapUsageCollector.java create mode 100644 server/src/test/java/org/elasticsearch/cluster/ShardHeapUsageTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index 11e8b15432d3..4967888e021c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -10,6 +10,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; @@ -19,6 +20,8 @@ import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.ShardHeapUsage; +import org.elasticsearch.cluster.ShardHeapUsageCollector; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -62,6 +65,7 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.test.DummyShardLock; @@ -82,6 +86,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -90,6 +95,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; @@ -111,12 +117,13 @@ import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; public class IndexShardIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return pluginList(InternalSettingsPlugin.class); + return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class); } public void testLockTryingToDelete() throws Exception { @@ -254,6 +261,20 @@ public class IndexShardIT extends ESSingleNodeTestCase { assertThat(dataSetSize.get(), greaterThan(0L)); } + public void testHeapUsageEstimateIsPresent() { + InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class); + ClusterInfoServiceUtils.refresh(clusterInfoService); + ClusterState state = getInstanceFromNode(ClusterService.class).state(); + Map shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages(); + assertNotNull(shardHeapUsages); + assertEquals(state.nodes().size(), shardHeapUsages.size()); + for (DiscoveryNode node : state.nodes()) { + assertTrue(shardHeapUsages.containsKey(node.getId())); + ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId()); + assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes())); + } + } + public void testIndexCanChangeCustomDataPath() throws Exception { final String index = "test-custom-data-path"; final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); @@ -797,4 +818,40 @@ public class IndexShardIT extends ESSingleNodeTestCase { assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES); } } + + public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector { + + private final BogusShardHeapUsagePlugin plugin; + + public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) { + this.plugin = plugin; + } + + @Override + public void collectClusterHeapUsage(ActionListener> listener) { + ActionListener.completeWith( + listener, + () -> plugin.getClusterService() + .state() + .nodes() + .stream() + .collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong())) + ); + } + } + + public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin { + + private final SetOnce clusterService = new SetOnce<>(); + + @Override + public Collection createComponents(PluginServices services) { + clusterService.set(services.clusterService()); + return List.of(); + } + + public ClusterService getClusterService() { + return clusterService.get(); + } + } } diff --git a/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ShardHeapUsageCollector b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ShardHeapUsageCollector new file mode 100644 index 000000000000..15b62c8240f2 --- /dev/null +++ b/server/src/internalClusterTest/resources/META-INF/services/org.elasticsearch.cluster.ShardHeapUsageCollector @@ -0,0 +1,10 @@ +# +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the "Elastic License +# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side +# Public License v 1"; you may not use this file except in compliance with, at +# your election, the "Elastic License 2.0", the "GNU Affero General Public +# License v3.0 only", or the "Server Side Public License, v 1". +# + +org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageCollector diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index bca87f8dd875..6e942cdc263e 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -293,6 +293,7 @@ public class TransportVersions { public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00); public static final TransportVersion ML_INFERENCE_ELASTIC_RERANK = def(9_094_0_00); public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_095_0_00); + public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_096_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index a2c260e8699e..460ed5e119c1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { final Map shardDataSetSizes; final Map dataPath; final Map reservedSpace; + final Map shardHeapUsages; protected ClusterInfo() { - this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); + this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); } /** @@ -71,6 +72,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { * @param shardDataSetSizes a shard id to data set size in bytes mapping per shard * @param dataPath the shard routing to datapath mapping * @param reservedSpace reserved space per shard broken down by node and data path + * @param shardHeapUsages shard heap usage broken down by node * @see #shardIdentifierFromRouting */ public ClusterInfo( @@ -79,7 +81,8 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { Map shardSizes, Map shardDataSetSizes, Map dataPath, - Map reservedSpace + Map reservedSpace, + Map shardHeapUsages ) { this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); @@ -87,6 +90,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { this.shardDataSetSizes = Map.copyOf(shardDataSetSizes); this.dataPath = Map.copyOf(dataPath); this.reservedSpace = Map.copyOf(reservedSpace); + this.shardHeapUsages = Map.copyOf(shardHeapUsages); } public ClusterInfo(StreamInput in) throws IOException { @@ -98,6 +102,11 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { ? in.readImmutableMap(NodeAndShard::new, StreamInput::readString) : in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString); this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new); + if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { + this.shardHeapUsages = in.readImmutableMap(ShardHeapUsage::new); + } else { + this.shardHeapUsages = Map.of(); + } } @Override @@ -112,6 +121,9 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString); } out.writeMap(this.reservedSpace); + if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) { + out.writeMap(this.shardHeapUsages, StreamOutput::writeWriteable); + } } /** @@ -192,9 +204,22 @@ public class ClusterInfo implements ChunkedToXContent, Writeable { return builder.endObject(); // NodeAndPath }), endArray() // end "reserved_sizes" + // NOTE: We don't serialize shardHeapUsages at this stage, to avoid + // committing to API payloads until the feature is settled ); } + /** + * Returns a node id to estimated heap usage mapping for all nodes that we have such data for. + * Note that these estimates should be considered minimums. They may be used to determine whether + * there IS NOT capacity to do something, but not to determine that there IS capacity to do something. + * Also note that the map may not be complete, it may contain none, or a subset of the nodes in + * the cluster at any time. It may also contain entries for nodes that have since left the cluster. + */ + public Map getShardHeapUsages() { + return shardHeapUsages; + } + /** * Returns a node id to disk usage mapping for the path that has the least available space on the node. * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java index 17cf5c7b8b7c..0536322b1d73 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java @@ -153,6 +153,7 @@ public class ClusterInfoSimulator { shardSizes.toImmutableMap(), shardDataSetSizes, dataPath, + Map.of(), Map.of() ); } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 17bd913ffb72..4c8655118dd8 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.shard.ShardId; @@ -82,12 +83,14 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt Property.NodeScope ); - private volatile boolean enabled; + private volatile boolean diskThresholdEnabled; private volatile TimeValue updateFrequency; private volatile TimeValue fetchTimeout; private volatile Map leastAvailableSpaceUsages; private volatile Map mostAvailableSpaceUsages; + private volatile Map maxHeapPerNode; + private volatile Map shardHeapUsagePerNode; private volatile IndicesStatsSummary indicesStatsSummary; private final ThreadPool threadPool; @@ -96,31 +99,41 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt private final Object mutex = new Object(); private final List> nextRefreshListeners = new ArrayList<>(); + private final ShardHeapUsageCollector shardHeapUsageCollector; private AsyncRefresh currentRefresh; private RefreshScheduler refreshScheduler; @SuppressWarnings("this-escape") - public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) { + public InternalClusterInfoService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + ShardHeapUsageCollector shardHeapUsageCollector + ) { this.leastAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of(); + this.maxHeapPerNode = Map.of(); + this.shardHeapUsagePerNode = Map.of(); this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.threadPool = threadPool; this.client = client; + this.shardHeapUsageCollector = shardHeapUsageCollector; this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); - this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); + this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); ClusterSettings clusterSettings = clusterService.getClusterSettings(); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); clusterSettings.addSettingsUpdateConsumer( DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, - this::setEnabled + this::setDiskThresholdEnabled ); } - private void setEnabled(boolean enabled) { - this.enabled = enabled; + private void setDiskThresholdEnabled(boolean diskThresholdEnabled) { + this.diskThresholdEnabled = diskThresholdEnabled; } private void setFetchTimeout(TimeValue fetchTimeout) { @@ -169,27 +182,41 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt } void execute() { - if (enabled == false) { - logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info"); - leastAvailableSpaceUsages = Map.of(); - mostAvailableSpaceUsages = Map.of(); - indicesStatsSummary = IndicesStatsSummary.EMPTY; - callListeners(); - return; - } - logger.trace("starting async refresh"); try (var ignoredRefs = fetchRefs) { + if (diskThresholdEnabled) { + try (var ignored = threadPool.getThreadContext().clearTraceContext()) { + fetchIndicesStats(); + } + } else { + logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info"); + indicesStatsSummary = IndicesStatsSummary.EMPTY; + } try (var ignored = threadPool.getThreadContext().clearTraceContext()) { fetchNodeStats(); } try (var ignored = threadPool.getThreadContext().clearTraceContext()) { - fetchIndicesStats(); + fetchNodesHeapUsage(); } } } + private void fetchNodesHeapUsage() { + shardHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() { + @Override + public void onResponse(Map currentShardHeapUsages) { + shardHeapUsagePerNode = currentShardHeapUsages; + } + + @Override + public void onFailure(Exception e) { + logger.warn("failed to fetch heap usage for nodes", e); + shardHeapUsagePerNode = Map.of(); + } + }, fetchRefs.acquire())); + } + private void fetchIndicesStats() { final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); indicesStatsRequest.clear(); @@ -285,6 +312,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt nodesStatsRequest.setIncludeShardsStats(false); nodesStatsRequest.clear(); nodesStatsRequest.addMetric(NodesStatsRequestParameters.Metric.FS); + nodesStatsRequest.addMetric(NodesStatsRequestParameters.Metric.JVM); nodesStatsRequest.setTimeout(fetchTimeout); client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.releaseAfter(new ActionListener<>() { @Override @@ -297,13 +325,16 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt Map leastAvailableUsagesBuilder = new HashMap<>(); Map mostAvailableUsagesBuilder = new HashMap<>(); - fillDiskUsagePerNode( + Map maxHeapPerNodeBuilder = new HashMap<>(); + processNodeStatsArray( adjustNodesStats(nodesStatsResponse.getNodes()), leastAvailableUsagesBuilder, - mostAvailableUsagesBuilder + mostAvailableUsagesBuilder, + maxHeapPerNodeBuilder ); leastAvailableSpaceUsages = Map.copyOf(leastAvailableUsagesBuilder); mostAvailableSpaceUsages = Map.copyOf(mostAvailableUsagesBuilder); + maxHeapPerNode = Map.copyOf(maxHeapPerNodeBuilder); } @Override @@ -315,6 +346,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt } leastAvailableSpaceUsages = Map.of(); mostAvailableSpaceUsages = Map.of(); + maxHeapPerNode = Map.of(); } }, fetchRefs.acquire())); } @@ -407,13 +439,21 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt @Override public ClusterInfo getClusterInfo() { final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read + final Map shardHeapUsages = new HashMap<>(); + maxHeapPerNode.forEach((nodeId, maxHeapSize) -> { + final Long estimatedHeapUsage = shardHeapUsagePerNode.get(nodeId); + if (estimatedHeapUsage != null) { + shardHeapUsages.put(nodeId, new ShardHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage)); + } + }); return new ClusterInfo( leastAvailableSpaceUsages, mostAvailableSpaceUsages, indicesStatsSummary.shardSizes, indicesStatsSummary.shardDataSetSizes, indicesStatsSummary.dataPath, - indicesStatsSummary.reservedSpace + indicesStatsSummary.reservedSpace, + shardHeapUsages ); } @@ -476,10 +516,11 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt } } - private static void fillDiskUsagePerNode( + private static void processNodeStatsArray( List nodeStatsArray, Map newLeastAvailableUsages, - Map newMostAvailableUsages + Map newMostAvailableUsages, + Map maxHeapPerNodeBuilder ) { for (NodeStats nodeStats : nodeStatsArray) { DiskUsage leastAvailableUsage = DiskUsage.findLeastAvailablePath(nodeStats); @@ -490,6 +531,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt if (mostAvailableUsage != null) { newMostAvailableUsages.put(nodeStats.getNode().getId(), mostAvailableUsage); } + maxHeapPerNodeBuilder.put(nodeStats.getNode().getId(), nodeStats.getJvm().getMem().getHeapMax()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java new file mode 100644 index 000000000000..3da97ac946f5 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsage.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Record representing an estimate of the heap used by allocated shards and ongoing merges on a particular node + */ +public record ShardHeapUsage(String nodeId, long totalBytes, long estimatedUsageBytes) implements Writeable { + + public ShardHeapUsage { + assert totalBytes >= 0; + assert estimatedUsageBytes >= 0; + } + + public ShardHeapUsage(StreamInput in) throws IOException { + this(in.readString(), in.readVLong(), in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); + out.writeVLong(this.totalBytes); + out.writeVLong(this.estimatedUsageBytes); + } + + public long estimatedFreeBytes() { + return totalBytes - estimatedUsageBytes; + } + + public double estimatedFreeBytesAsPercentage() { + return 100.0 - estimatedUsageAsPercentage(); + } + + public double estimatedUsageAsPercentage() { + return 100.0 * estimatedUsageBytes / (double) totalBytes; + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsageCollector.java b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsageCollector.java new file mode 100644 index 000000000000..c3f3213e035a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/cluster/ShardHeapUsageCollector.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.action.ActionListener; + +import java.util.Map; + +/** + * Collect the shard heap usage for each node in the cluster. + *

+ * Results are returned as a map of node ID to estimated heap usage in bytes + * + * @see ShardHeapUsage + */ +public interface ShardHeapUsageCollector { + + /** + * This will be used when there is no ShardHeapUsageCollector available + */ + ShardHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of()); + + /** + * Collect the shard heap usage for every node in the cluster + * + * @param listener The listener which will receive the results + */ + void collectClusterHeapUsage(ActionListener> listener); +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 1c3f19b20b4d..8ef81eb35543 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.ShardHeapUsageCollector; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; @@ -74,7 +75,17 @@ class NodeServiceProvider { ThreadPool threadPool, NodeClient client ) { - final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client); + final ShardHeapUsageCollector shardHeapUsageCollector = pluginsService.loadSingletonServiceProvider( + ShardHeapUsageCollector.class, + () -> ShardHeapUsageCollector.EMPTY + ); + final InternalClusterInfoService service = new InternalClusterInfoService( + settings, + clusterService, + threadPool, + client, + shardHeapUsageCollector + ); if (DiscoveryNode.isMasterNode(settings)) { // listen for state changes (this node starts/stops being the elected master, or new nodes are added) clusterService.addListener(service); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java index 70adfa61dd85..532f9b649fbf 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterInfoTests.java @@ -41,10 +41,23 @@ public class ClusterInfoTests extends AbstractWireSerializingTestCase randomNodeHeapUsage() { + int numEntries = randomIntBetween(0, 128); + Map nodeHeapUsage = new HashMap<>(numEntries); + for (int i = 0; i < numEntries; i++) { + String key = randomAlphaOfLength(32); + final int totalBytes = randomIntBetween(0, Integer.MAX_VALUE); + final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomAlphaOfLength(4), totalBytes, randomIntBetween(0, totalBytes)); + nodeHeapUsage.put(key, shardHeapUsage); + } + return nodeHeapUsage; + } + private static Map randomDiskUsage() { int numEntries = randomIntBetween(0, 128); Map builder = new HashMap<>(numEntries); diff --git a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java index bcf5be82f6da..67a745e743b0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/InternalClusterInfoServiceSchedulingTests.java @@ -34,12 +34,17 @@ import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.ThreadPool; +import org.mockito.Mockito; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { @@ -71,7 +76,14 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); - final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client); + final ShardHeapUsageCollector mockShardHeapUsageCollector = spy(new StubShardShardHeapUsageCollector()); + final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService( + settings, + clusterService, + threadPool, + client, + mockShardHeapUsageCollector + ); clusterService.addListener(clusterInfoService); clusterInfoService.addListener(ignored -> {}); @@ -107,11 +119,13 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); for (int i = 0; i < 3; i++) { + Mockito.clearInvocations(mockShardHeapUsageCollector); final int initialRequestCount = client.requestCount; final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); runFor(deterministicTaskQueue, duration); deterministicTaskQueue.runAllRunnableTasks(); assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval + verify(mockShardHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval } final AtomicBoolean failMaster2 = new AtomicBoolean(); @@ -128,6 +142,14 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { assertFalse(deterministicTaskQueue.hasDeferredTasks()); } + private static class StubShardShardHeapUsageCollector implements ShardHeapUsageCollector { + + @Override + public void collectClusterHeapUsage(ActionListener> listener) { + listener.onResponse(Map.of()); + } + } + private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; while (deterministicTaskQueue.getCurrentTimeMillis() < endTime diff --git a/server/src/test/java/org/elasticsearch/cluster/ShardHeapUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/ShardHeapUsageTests.java new file mode 100644 index 000000000000..f41cc8fafd88 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/ShardHeapUsageTests.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ShardHeapUsageTests extends ESTestCase { + + public void testEstimatedUsageAsPercentage() { + final long totalBytes = randomNonNegativeLong(); + final long estimatedUsageBytes = randomLongBetween(0, totalBytes); + final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomUUID(), totalBytes, estimatedUsageBytes); + assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), greaterThanOrEqualTo(0.0)); + assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), lessThanOrEqualTo(100.0)); + assertEquals(shardHeapUsage.estimatedUsageAsPercentage(), 100.0 * estimatedUsageBytes / totalBytes, 0.0001); + } + + public void testEstimatedFreeBytesAsPercentage() { + final long totalBytes = randomNonNegativeLong(); + final long estimatedUsageBytes = randomLongBetween(0, totalBytes); + final long estimatedFreeBytes = totalBytes - estimatedUsageBytes; + final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomUUID(), totalBytes, estimatedUsageBytes); + assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), greaterThanOrEqualTo(0.0)); + assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), lessThanOrEqualTo(100.0)); + assertEquals(shardHeapUsage.estimatedFreeBytesAsPercentage(), 100.0 * estimatedFreeBytes / totalBytes, 0.0001); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java index a2e2f3326f52..754b4d2b22d0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/ExpectedShardSizeEstimatorTests.java @@ -205,6 +205,7 @@ public class ExpectedShardSizeEstimatorTests extends ESAllocationTestCase { Map.of(ClusterInfo.shardIdentifierFromRouting(shard), size), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java index 4a07b837b08a..14e0aaa25374 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java @@ -78,6 +78,7 @@ public class AllocationStatsServiceTests extends ESAllocationTestCase { Map.of(ClusterInfo.shardIdentifierFromRouting(shardId, true), currentShardSize), Map.of(), Map.of(), + Map.of(), Map.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 6ce417456d30..df0fa875a724 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -1580,7 +1580,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { Map diskUsages, Map reservedSpace ) { - return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace); + return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of()); } private static DiscoveryNode newFrozenOnlyNode(String nodeId) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java index cea6b2468497..f1a2b4b1358f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/ExpectedShardSizeAllocationTests.java @@ -258,11 +258,12 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase { ), Map.of(), Map.of(), + Map.of(), Map.of() ); } private static ClusterInfo createClusterInfo(Map diskUsage, Map shardSizes) { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java index 7e05ae7c57f7..8ab031aa53fe 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java @@ -608,6 +608,7 @@ public class BalancedShardsAllocatorTests extends ESAllocationTestCase { ), Map.of(), Map.of(), + Map.of(), Map.of() ) ); @@ -704,7 +705,7 @@ public class BalancedShardsAllocatorTests extends ESAllocationTestCase { } private static ClusterInfo createClusterInfo(Map indexSizes) { - return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of()); + return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of()); } private static IndexMetadata.Builder anIndex(String name) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java index c75912fda27e..277521c5832a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterAllocationSimulationTests.java @@ -561,7 +561,7 @@ public class ClusterAllocationSimulationTests extends ESAllocationTestCase { dataPath.put(new ClusterInfo.NodeAndShard(shardRouting.currentNodeId(), shardRouting.shardId()), "/data"); } - return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of()); + return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java index 6adb9f1e8ad2..80fe603488fd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterBalanceStatsTests.java @@ -345,6 +345,7 @@ public class ClusterBalanceStatsTests extends ESAllocationTestCase { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), Map.of(), Map.of(), + Map.of(), Map.of() ); } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java index cdb8c5f60203..b67e248999ce 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/ClusterInfoSimulatorTests.java @@ -690,7 +690,15 @@ public class ClusterInfoSimulatorTests extends ESAllocationTestCase { } public ClusterInfo build() { - return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace); + return new ClusterInfo( + leastAvailableSpaceUsage, + mostAvailableSpaceUsage, + shardSizes, + Map.of(), + Map.of(), + reservedSpace, + Map.of() + ); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java index 6e496b85ede9..a0d28ce12458 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java @@ -690,7 +690,7 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase { .stream() .collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue()))); - var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of()); + var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of()); var settings = Settings.EMPTY; @@ -1196,7 +1196,7 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase { } public ClusterInfo build() { - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java index 4eed552d5f1a..844912cba4c1 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java @@ -619,6 +619,7 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase { shardSizesBuilder.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), + ImmutableOpenMap.of(), ImmutableOpenMap.of() ); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 9522629de8f0..5467d313834b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -1406,7 +1406,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { Map shardSizes, Map reservedSpace ) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace); + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of()); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index d8cb13d7a7ba..7da75f61da80 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -108,6 +108,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { Map.of("[test][0][p]", 10L), // 10 bytes, Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -179,6 +180,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { Map.of("[test][0][p]", shardSize), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -324,6 +326,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { shardSizes, Map.of(), shardRoutingMap, + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -843,6 +846,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { Map.of("[test][0][p]", 10L), Map.of(), Map.of(), + Map.of(), Map.of() ); RoutingAllocation allocation = new RoutingAllocation( @@ -904,7 +908,15 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase { // bigger than available space final long shardSize = randomIntBetween(1, 10); shardSizes.put("[test][0][p]", shardSize); - ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); + ClusterInfo clusterInfo = new ClusterInfo( + leastAvailableUsages, + mostAvailableUsage, + shardSizes, + Map.of(), + Map.of(), + Map.of(), + Map.of() + ); RoutingAllocation allocation = new RoutingAllocation( new AllocationDeciders(Collections.singleton(decider)), clusterState, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index 49a36f64e281..64cd81e1a9ab 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -43,7 +43,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { private volatile BiFunction diskUsageFunction; public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { - super(settings, clusterService, threadPool, client); + super(settings, clusterService, threadPool, client, ShardHeapUsageCollector.EMPTY); } public void setDiskUsageFunctionAndRefresh(BiFunction diskUsageFn) { diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 77ba019835ec..e451b1d45817 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -959,6 +959,7 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService extraShardSizes, Map.of(), Map.of(), + Map.of(), Map.of() ); this.delegate = info; diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java index 4061d3783218..12f7dde103c9 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityServiceTests.java @@ -262,7 +262,7 @@ public class AutoscalingCalculateCapacityServiceTests extends AutoscalingTestCas } } state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, @@ -311,7 +311,7 @@ public class AutoscalingCalculateCapacityServiceTests extends AutoscalingTestCas ) ); - info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of()); + info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( roleNames, state, diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java index ab09f20e3439..37295ebf4420 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/FrozenStorageDeciderServiceTests.java @@ -109,7 +109,7 @@ public class FrozenStorageDeciderServiceTests extends AutoscalingTestCase { // add irrelevant shards noise for completeness (should not happen IRL). sizes.put(new ShardId(index, i), randomLongBetween(0, Integer.MAX_VALUE)); } - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of()); return Tuple.tuple(totalSize, info); } } diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java index 1960b7f2028e..b252fdf5564d 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ProactiveStorageDeciderServiceTests.java @@ -397,7 +397,7 @@ public class ProactiveStorageDeciderServiceTests extends AutoscalingTestCase { for (var id : state.nodes().getDataNodes().keySet()) { diskUsage.put(id, new DiskUsage(id, id, "/test", Long.MAX_VALUE, Long.MAX_VALUE)); } - return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of()); + return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of()); } private ClusterState.Builder applyCreatedDates( diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java index d159e6748269..2ee94340f6d2 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderServiceTests.java @@ -379,7 +379,7 @@ public class ReactiveStorageDeciderServiceTests extends AutoscalingTestCase { } private ReactiveStorageDeciderService.AllocationState createAllocationState(Map shardSize, ClusterState clusterState) { - ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, null, @@ -544,7 +544,7 @@ public class ReactiveStorageDeciderServiceTests extends AutoscalingTestCase { } var diskUsages = Map.of(nodeId, new DiskUsage(nodeId, null, null, ByteSizeUnit.KB.toBytes(100), ByteSizeUnit.KB.toBytes(5))); - ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of()); + ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of()); ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( clusterState, diff --git a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java index a0a37f2bb52d..40a564088aee 100644 --- a/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java +++ b/x-pack/plugin/deprecation/src/test/java/org/elasticsearch/xpack/deprecation/TransportNodeDeprecationCheckActionTests.java @@ -173,6 +173,7 @@ public class TransportNodeDeprecationCheckActionTests extends ESTestCase { Map.of(), Map.of(), Map.of(), + Map.of(), Map.of() ); DeprecationIssue issue = TransportNodeDeprecationCheckAction.checkDiskLowWatermark(