Add heap usage estimate to ClusterInfo (#128723)

Co-authored-by: ywangd <yang.wang@elastic.co>
Co-authored-by: rjernst <ryan@elastic.co>
Relates: ES-11445
This commit is contained in:
Nick Tindall 2025-06-12 13:45:57 +10:00 committed by GitHub
parent be703a034f
commit 0702e429f0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 376 additions and 43 deletions

View file

@ -10,6 +10,7 @@ package org.elasticsearch.index.shard;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException; import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -19,6 +20,8 @@ import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterInfoServiceUtils; import org.elasticsearch.cluster.ClusterInfoServiceUtils;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.ShardHeapUsage;
import org.elasticsearch.cluster.ShardHeapUsageCollector;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@ -62,6 +65,7 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.DummyShardLock;
@ -82,6 +86,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -90,6 +95,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength; import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
@ -111,12 +117,13 @@ import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class IndexShardIT extends ESSingleNodeTestCase { public class IndexShardIT extends ESSingleNodeTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> getPlugins() { protected Collection<Class<? extends Plugin>> getPlugins() {
return pluginList(InternalSettingsPlugin.class); return pluginList(InternalSettingsPlugin.class, BogusShardHeapUsagePlugin.class);
} }
public void testLockTryingToDelete() throws Exception { public void testLockTryingToDelete() throws Exception {
@ -254,6 +261,20 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertThat(dataSetSize.get(), greaterThan(0L)); assertThat(dataSetSize.get(), greaterThan(0L));
} }
public void testHeapUsageEstimateIsPresent() {
InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
ClusterInfoServiceUtils.refresh(clusterInfoService);
ClusterState state = getInstanceFromNode(ClusterService.class).state();
Map<String, ShardHeapUsage> shardHeapUsages = clusterInfoService.getClusterInfo().getShardHeapUsages();
assertNotNull(shardHeapUsages);
assertEquals(state.nodes().size(), shardHeapUsages.size());
for (DiscoveryNode node : state.nodes()) {
assertTrue(shardHeapUsages.containsKey(node.getId()));
ShardHeapUsage shardHeapUsage = shardHeapUsages.get(node.getId());
assertThat(shardHeapUsage.estimatedFreeBytes(), lessThanOrEqualTo(shardHeapUsage.totalBytes()));
}
}
public void testIndexCanChangeCustomDataPath() throws Exception { public void testIndexCanChangeCustomDataPath() throws Exception {
final String index = "test-custom-data-path"; final String index = "test-custom-data-path";
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10)); final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));
@ -797,4 +818,40 @@ public class IndexShardIT extends ESSingleNodeTestCase {
assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES); assertBusy(() -> assertFalse(indicesService.hasUncompletedPendingDeletes()), 1, TimeUnit.MINUTES);
} }
} }
public static class BogusShardShardHeapUsageCollector implements ShardHeapUsageCollector {
private final BogusShardHeapUsagePlugin plugin;
public BogusShardShardHeapUsageCollector(BogusShardHeapUsagePlugin plugin) {
this.plugin = plugin;
}
@Override
public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) {
ActionListener.completeWith(
listener,
() -> plugin.getClusterService()
.state()
.nodes()
.stream()
.collect(Collectors.toUnmodifiableMap(DiscoveryNode::getId, node -> randomNonNegativeLong()))
);
}
}
public static class BogusShardHeapUsagePlugin extends Plugin implements ClusterPlugin {
private final SetOnce<ClusterService> clusterService = new SetOnce<>();
@Override
public Collection<?> createComponents(PluginServices services) {
clusterService.set(services.clusterService());
return List.of();
}
public ClusterService getClusterService() {
return clusterService.get();
}
}
} }

View file

@ -0,0 +1,10 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the "Elastic License
# 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
# Public License v 1"; you may not use this file except in compliance with, at
# your election, the "Elastic License 2.0", the "GNU Affero General Public
# License v3.0 only", or the "Server Side Public License, v 1".
#
org.elasticsearch.index.shard.IndexShardIT$BogusShardShardHeapUsageCollector

View file

@ -293,6 +293,7 @@ public class TransportVersions {
public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00); public static final TransportVersion SNAPSHOT_INDEX_SHARD_STATUS_MISSING_STATS = def(9_093_0_00);
public static final TransportVersion ML_INFERENCE_ELASTIC_RERANK = def(9_094_0_00); public static final TransportVersion ML_INFERENCE_ELASTIC_RERANK = def(9_094_0_00);
public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_095_0_00); public static final TransportVersion SEARCH_LOAD_PER_INDEX_STATS = def(9_095_0_00);
public static final TransportVersion HEAP_USAGE_IN_CLUSTER_INFO = def(9_096_0_00);
/* /*
* STOP! READ THIS FIRST! No, really, * STOP! READ THIS FIRST! No, really,

View file

@ -57,9 +57,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
final Map<ShardId, Long> shardDataSetSizes; final Map<ShardId, Long> shardDataSetSizes;
final Map<NodeAndShard, String> dataPath; final Map<NodeAndShard, String> dataPath;
final Map<NodeAndPath, ReservedSpace> reservedSpace; final Map<NodeAndPath, ReservedSpace> reservedSpace;
final Map<String, ShardHeapUsage> shardHeapUsages;
protected ClusterInfo() { protected ClusterInfo() {
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of()); this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
} }
/** /**
@ -71,6 +72,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
* @param shardDataSetSizes a shard id to data set size in bytes mapping per shard * @param shardDataSetSizes a shard id to data set size in bytes mapping per shard
* @param dataPath the shard routing to datapath mapping * @param dataPath the shard routing to datapath mapping
* @param reservedSpace reserved space per shard broken down by node and data path * @param reservedSpace reserved space per shard broken down by node and data path
* @param shardHeapUsages shard heap usage broken down by node
* @see #shardIdentifierFromRouting * @see #shardIdentifierFromRouting
*/ */
public ClusterInfo( public ClusterInfo(
@ -79,7 +81,8 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
Map<String, Long> shardSizes, Map<String, Long> shardSizes,
Map<ShardId, Long> shardDataSetSizes, Map<ShardId, Long> shardDataSetSizes,
Map<NodeAndShard, String> dataPath, Map<NodeAndShard, String> dataPath,
Map<NodeAndPath, ReservedSpace> reservedSpace Map<NodeAndPath, ReservedSpace> reservedSpace,
Map<String, ShardHeapUsage> shardHeapUsages
) { ) {
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage); this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage); this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@ -87,6 +90,7 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
this.shardDataSetSizes = Map.copyOf(shardDataSetSizes); this.shardDataSetSizes = Map.copyOf(shardDataSetSizes);
this.dataPath = Map.copyOf(dataPath); this.dataPath = Map.copyOf(dataPath);
this.reservedSpace = Map.copyOf(reservedSpace); this.reservedSpace = Map.copyOf(reservedSpace);
this.shardHeapUsages = Map.copyOf(shardHeapUsages);
} }
public ClusterInfo(StreamInput in) throws IOException { public ClusterInfo(StreamInput in) throws IOException {
@ -98,6 +102,11 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
? in.readImmutableMap(NodeAndShard::new, StreamInput::readString) ? in.readImmutableMap(NodeAndShard::new, StreamInput::readString)
: in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString); : in.readImmutableMap(nested -> NodeAndShard.from(new ShardRouting(nested)), StreamInput::readString);
this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new); this.reservedSpace = in.readImmutableMap(NodeAndPath::new, ReservedSpace::new);
if (in.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
this.shardHeapUsages = in.readImmutableMap(ShardHeapUsage::new);
} else {
this.shardHeapUsages = Map.of();
}
} }
@Override @Override
@ -112,6 +121,9 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString); out.writeMap(this.dataPath, (o, k) -> createFakeShardRoutingFromNodeAndShard(k).writeTo(o), StreamOutput::writeString);
} }
out.writeMap(this.reservedSpace); out.writeMap(this.reservedSpace);
if (out.getTransportVersion().onOrAfter(TransportVersions.HEAP_USAGE_IN_CLUSTER_INFO)) {
out.writeMap(this.shardHeapUsages, StreamOutput::writeWriteable);
}
} }
/** /**
@ -192,9 +204,22 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
return builder.endObject(); // NodeAndPath return builder.endObject(); // NodeAndPath
}), }),
endArray() // end "reserved_sizes" endArray() // end "reserved_sizes"
// NOTE: We don't serialize shardHeapUsages at this stage, to avoid
// committing to API payloads until the feature is settled
); );
} }
/**
* Returns a node id to estimated heap usage mapping for all nodes that we have such data for.
* Note that these estimates should be considered minimums. They may be used to determine whether
* there IS NOT capacity to do something, but not to determine that there IS capacity to do something.
* Also note that the map may not be complete, it may contain none, or a subset of the nodes in
* the cluster at any time. It may also contain entries for nodes that have since left the cluster.
*/
public Map<String, ShardHeapUsage> getShardHeapUsages() {
return shardHeapUsages;
}
/** /**
* Returns a node id to disk usage mapping for the path that has the least available space on the node. * Returns a node id to disk usage mapping for the path that has the least available space on the node.
* Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space.

View file

@ -153,6 +153,7 @@ public class ClusterInfoSimulator {
shardSizes.toImmutableMap(), shardSizes.toImmutableMap(),
shardDataSetSizes, shardDataSetSizes,
dataPath, dataPath,
Map.of(),
Map.of() Map.of()
); );
} }

View file

@ -34,6 +34,7 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -82,12 +83,14 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
Property.NodeScope Property.NodeScope
); );
private volatile boolean enabled; private volatile boolean diskThresholdEnabled;
private volatile TimeValue updateFrequency; private volatile TimeValue updateFrequency;
private volatile TimeValue fetchTimeout; private volatile TimeValue fetchTimeout;
private volatile Map<String, DiskUsage> leastAvailableSpaceUsages; private volatile Map<String, DiskUsage> leastAvailableSpaceUsages;
private volatile Map<String, DiskUsage> mostAvailableSpaceUsages; private volatile Map<String, DiskUsage> mostAvailableSpaceUsages;
private volatile Map<String, ByteSizeValue> maxHeapPerNode;
private volatile Map<String, Long> shardHeapUsagePerNode;
private volatile IndicesStatsSummary indicesStatsSummary; private volatile IndicesStatsSummary indicesStatsSummary;
private final ThreadPool threadPool; private final ThreadPool threadPool;
@ -96,31 +99,41 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
private final Object mutex = new Object(); private final Object mutex = new Object();
private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>(); private final List<ActionListener<ClusterInfo>> nextRefreshListeners = new ArrayList<>();
private final ShardHeapUsageCollector shardHeapUsageCollector;
private AsyncRefresh currentRefresh; private AsyncRefresh currentRefresh;
private RefreshScheduler refreshScheduler; private RefreshScheduler refreshScheduler;
@SuppressWarnings("this-escape") @SuppressWarnings("this-escape")
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) { public InternalClusterInfoService(
Settings settings,
ClusterService clusterService,
ThreadPool threadPool,
Client client,
ShardHeapUsageCollector shardHeapUsageCollector
) {
this.leastAvailableSpaceUsages = Map.of(); this.leastAvailableSpaceUsages = Map.of();
this.mostAvailableSpaceUsages = Map.of(); this.mostAvailableSpaceUsages = Map.of();
this.maxHeapPerNode = Map.of();
this.shardHeapUsagePerNode = Map.of();
this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.indicesStatsSummary = IndicesStatsSummary.EMPTY;
this.threadPool = threadPool; this.threadPool = threadPool;
this.client = client; this.client = client;
this.shardHeapUsageCollector = shardHeapUsageCollector;
this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings); this.updateFrequency = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings);
this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings); this.fetchTimeout = INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING.get(settings);
this.enabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings); this.diskThresholdEnabled = DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
ClusterSettings clusterSettings = clusterService.getClusterSettings(); ClusterSettings clusterSettings = clusterService.getClusterSettings();
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency); clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
clusterSettings.addSettingsUpdateConsumer( clusterSettings.addSettingsUpdateConsumer(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
this::setEnabled this::setDiskThresholdEnabled
); );
} }
private void setEnabled(boolean enabled) { private void setDiskThresholdEnabled(boolean diskThresholdEnabled) {
this.enabled = enabled; this.diskThresholdEnabled = diskThresholdEnabled;
} }
private void setFetchTimeout(TimeValue fetchTimeout) { private void setFetchTimeout(TimeValue fetchTimeout) {
@ -169,27 +182,41 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
} }
void execute() { void execute() {
if (enabled == false) {
logger.trace("skipping collecting info from cluster, notifying listeners with empty cluster info");
leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of();
indicesStatsSummary = IndicesStatsSummary.EMPTY;
callListeners();
return;
}
logger.trace("starting async refresh"); logger.trace("starting async refresh");
try (var ignoredRefs = fetchRefs) { try (var ignoredRefs = fetchRefs) {
if (diskThresholdEnabled) {
try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats();
}
} else {
logger.trace("skipping collecting disk usage info from cluster, notifying listeners with empty cluster info");
indicesStatsSummary = IndicesStatsSummary.EMPTY;
}
try (var ignored = threadPool.getThreadContext().clearTraceContext()) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchNodeStats(); fetchNodeStats();
} }
try (var ignored = threadPool.getThreadContext().clearTraceContext()) { try (var ignored = threadPool.getThreadContext().clearTraceContext()) {
fetchIndicesStats(); fetchNodesHeapUsage();
} }
} }
} }
private void fetchNodesHeapUsage() {
shardHeapUsageCollector.collectClusterHeapUsage(ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Map<String, Long> currentShardHeapUsages) {
shardHeapUsagePerNode = currentShardHeapUsages;
}
@Override
public void onFailure(Exception e) {
logger.warn("failed to fetch heap usage for nodes", e);
shardHeapUsagePerNode = Map.of();
}
}, fetchRefs.acquire()));
}
private void fetchIndicesStats() { private void fetchIndicesStats() {
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.clear(); indicesStatsRequest.clear();
@ -285,6 +312,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
nodesStatsRequest.setIncludeShardsStats(false); nodesStatsRequest.setIncludeShardsStats(false);
nodesStatsRequest.clear(); nodesStatsRequest.clear();
nodesStatsRequest.addMetric(NodesStatsRequestParameters.Metric.FS); nodesStatsRequest.addMetric(NodesStatsRequestParameters.Metric.FS);
nodesStatsRequest.addMetric(NodesStatsRequestParameters.Metric.JVM);
nodesStatsRequest.setTimeout(fetchTimeout); nodesStatsRequest.setTimeout(fetchTimeout);
client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.releaseAfter(new ActionListener<>() { client.admin().cluster().nodesStats(nodesStatsRequest, ActionListener.releaseAfter(new ActionListener<>() {
@Override @Override
@ -297,13 +325,16 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
Map<String, DiskUsage> leastAvailableUsagesBuilder = new HashMap<>(); Map<String, DiskUsage> leastAvailableUsagesBuilder = new HashMap<>();
Map<String, DiskUsage> mostAvailableUsagesBuilder = new HashMap<>(); Map<String, DiskUsage> mostAvailableUsagesBuilder = new HashMap<>();
fillDiskUsagePerNode( Map<String, ByteSizeValue> maxHeapPerNodeBuilder = new HashMap<>();
processNodeStatsArray(
adjustNodesStats(nodesStatsResponse.getNodes()), adjustNodesStats(nodesStatsResponse.getNodes()),
leastAvailableUsagesBuilder, leastAvailableUsagesBuilder,
mostAvailableUsagesBuilder mostAvailableUsagesBuilder,
maxHeapPerNodeBuilder
); );
leastAvailableSpaceUsages = Map.copyOf(leastAvailableUsagesBuilder); leastAvailableSpaceUsages = Map.copyOf(leastAvailableUsagesBuilder);
mostAvailableSpaceUsages = Map.copyOf(mostAvailableUsagesBuilder); mostAvailableSpaceUsages = Map.copyOf(mostAvailableUsagesBuilder);
maxHeapPerNode = Map.copyOf(maxHeapPerNodeBuilder);
} }
@Override @Override
@ -315,6 +346,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
} }
leastAvailableSpaceUsages = Map.of(); leastAvailableSpaceUsages = Map.of();
mostAvailableSpaceUsages = Map.of(); mostAvailableSpaceUsages = Map.of();
maxHeapPerNode = Map.of();
} }
}, fetchRefs.acquire())); }, fetchRefs.acquire()));
} }
@ -407,13 +439,21 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
@Override @Override
public ClusterInfo getClusterInfo() { public ClusterInfo getClusterInfo() {
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
final Map<String, ShardHeapUsage> shardHeapUsages = new HashMap<>();
maxHeapPerNode.forEach((nodeId, maxHeapSize) -> {
final Long estimatedHeapUsage = shardHeapUsagePerNode.get(nodeId);
if (estimatedHeapUsage != null) {
shardHeapUsages.put(nodeId, new ShardHeapUsage(nodeId, maxHeapSize.getBytes(), estimatedHeapUsage));
}
});
return new ClusterInfo( return new ClusterInfo(
leastAvailableSpaceUsages, leastAvailableSpaceUsages,
mostAvailableSpaceUsages, mostAvailableSpaceUsages,
indicesStatsSummary.shardSizes, indicesStatsSummary.shardSizes,
indicesStatsSummary.shardDataSetSizes, indicesStatsSummary.shardDataSetSizes,
indicesStatsSummary.dataPath, indicesStatsSummary.dataPath,
indicesStatsSummary.reservedSpace indicesStatsSummary.reservedSpace,
shardHeapUsages
); );
} }
@ -476,10 +516,11 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
} }
} }
private static void fillDiskUsagePerNode( private static void processNodeStatsArray(
List<NodeStats> nodeStatsArray, List<NodeStats> nodeStatsArray,
Map<String, DiskUsage> newLeastAvailableUsages, Map<String, DiskUsage> newLeastAvailableUsages,
Map<String, DiskUsage> newMostAvailableUsages Map<String, DiskUsage> newMostAvailableUsages,
Map<String, ByteSizeValue> maxHeapPerNodeBuilder
) { ) {
for (NodeStats nodeStats : nodeStatsArray) { for (NodeStats nodeStats : nodeStatsArray) {
DiskUsage leastAvailableUsage = DiskUsage.findLeastAvailablePath(nodeStats); DiskUsage leastAvailableUsage = DiskUsage.findLeastAvailablePath(nodeStats);
@ -490,6 +531,7 @@ public class InternalClusterInfoService implements ClusterInfoService, ClusterSt
if (mostAvailableUsage != null) { if (mostAvailableUsage != null) {
newMostAvailableUsages.put(nodeStats.getNode().getId(), mostAvailableUsage); newMostAvailableUsages.put(nodeStats.getNode().getId(), mostAvailableUsage);
} }
maxHeapPerNodeBuilder.put(nodeStats.getNode().getId(), nodeStats.getJvm().getMem().getHeapMax());
} }
} }

View file

@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
/**
* Record representing an estimate of the heap used by allocated shards and ongoing merges on a particular node
*/
public record ShardHeapUsage(String nodeId, long totalBytes, long estimatedUsageBytes) implements Writeable {
public ShardHeapUsage {
assert totalBytes >= 0;
assert estimatedUsageBytes >= 0;
}
public ShardHeapUsage(StreamInput in) throws IOException {
this(in.readString(), in.readVLong(), in.readVLong());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(this.nodeId);
out.writeVLong(this.totalBytes);
out.writeVLong(this.estimatedUsageBytes);
}
public long estimatedFreeBytes() {
return totalBytes - estimatedUsageBytes;
}
public double estimatedFreeBytesAsPercentage() {
return 100.0 - estimatedUsageAsPercentage();
}
public double estimatedUsageAsPercentage() {
return 100.0 * estimatedUsageBytes / (double) totalBytes;
}
}

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster;
import org.elasticsearch.action.ActionListener;
import java.util.Map;
/**
* Collect the shard heap usage for each node in the cluster.
* <p>
* Results are returned as a map of node ID to estimated heap usage in bytes
*
* @see ShardHeapUsage
*/
public interface ShardHeapUsageCollector {
/**
* This will be used when there is no ShardHeapUsageCollector available
*/
ShardHeapUsageCollector EMPTY = listener -> listener.onResponse(Map.of());
/**
* Collect the shard heap usage for every node in the cluster
*
* @param listener The listener which will receive the results
*/
void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener);
}

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.search.OnlinePrewarmingService;
import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.ShardHeapUsageCollector;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreaker;
@ -74,7 +75,17 @@ class NodeServiceProvider {
ThreadPool threadPool, ThreadPool threadPool,
NodeClient client NodeClient client
) { ) {
final InternalClusterInfoService service = new InternalClusterInfoService(settings, clusterService, threadPool, client); final ShardHeapUsageCollector shardHeapUsageCollector = pluginsService.loadSingletonServiceProvider(
ShardHeapUsageCollector.class,
() -> ShardHeapUsageCollector.EMPTY
);
final InternalClusterInfoService service = new InternalClusterInfoService(
settings,
clusterService,
threadPool,
client,
shardHeapUsageCollector
);
if (DiscoveryNode.isMasterNode(settings)) { if (DiscoveryNode.isMasterNode(settings)) {
// listen for state changes (this node starts/stops being the elected master, or new nodes are added) // listen for state changes (this node starts/stops being the elected master, or new nodes are added)
clusterService.addListener(service); clusterService.addListener(service);

View file

@ -41,10 +41,23 @@ public class ClusterInfoTests extends AbstractWireSerializingTestCase<ClusterInf
randomShardSizes(), randomShardSizes(),
randomDataSetSizes(), randomDataSetSizes(),
randomRoutingToDataPath(), randomRoutingToDataPath(),
randomReservedSpace() randomReservedSpace(),
randomNodeHeapUsage()
); );
} }
private static Map<String, ShardHeapUsage> randomNodeHeapUsage() {
int numEntries = randomIntBetween(0, 128);
Map<String, ShardHeapUsage> nodeHeapUsage = new HashMap<>(numEntries);
for (int i = 0; i < numEntries; i++) {
String key = randomAlphaOfLength(32);
final int totalBytes = randomIntBetween(0, Integer.MAX_VALUE);
final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomAlphaOfLength(4), totalBytes, randomIntBetween(0, totalBytes));
nodeHeapUsage.put(key, shardHeapUsage);
}
return nodeHeapUsage;
}
private static Map<String, DiskUsage> randomDiskUsage() { private static Map<String, DiskUsage> randomDiskUsage() {
int numEntries = randomIntBetween(0, 128); int numEntries = randomIntBetween(0, 128);
Map<String, DiskUsage> builder = new HashMap<>(numEntries); Map<String, DiskUsage> builder = new HashMap<>(numEntries);

View file

@ -34,12 +34,17 @@ import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.Mockito;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING; import static org.elasticsearch.cluster.InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class InternalClusterInfoServiceSchedulingTests extends ESTestCase { public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
@ -71,7 +76,14 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService); final ClusterService clusterService = new ClusterService(settings, clusterSettings, masterService, clusterApplierService);
final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool); final FakeClusterInfoServiceClient client = new FakeClusterInfoServiceClient(threadPool);
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(settings, clusterService, threadPool, client); final ShardHeapUsageCollector mockShardHeapUsageCollector = spy(new StubShardShardHeapUsageCollector());
final InternalClusterInfoService clusterInfoService = new InternalClusterInfoService(
settings,
clusterService,
threadPool,
client,
mockShardHeapUsageCollector
);
clusterService.addListener(clusterInfoService); clusterService.addListener(clusterInfoService);
clusterInfoService.addListener(ignored -> {}); clusterInfoService.addListener(ignored -> {});
@ -107,11 +119,13 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
Mockito.clearInvocations(mockShardHeapUsageCollector);
final int initialRequestCount = client.requestCount; final int initialRequestCount = client.requestCount;
final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis(); final long duration = INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING.get(settings).millis();
runFor(deterministicTaskQueue, duration); runFor(deterministicTaskQueue, duration);
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval assertThat(client.requestCount, equalTo(initialRequestCount + 2)); // should have run two client requests per interval
verify(mockShardHeapUsageCollector).collectClusterHeapUsage(any()); // Should poll for heap usage once per interval
} }
final AtomicBoolean failMaster2 = new AtomicBoolean(); final AtomicBoolean failMaster2 = new AtomicBoolean();
@ -128,6 +142,14 @@ public class InternalClusterInfoServiceSchedulingTests extends ESTestCase {
assertFalse(deterministicTaskQueue.hasDeferredTasks()); assertFalse(deterministicTaskQueue.hasDeferredTasks());
} }
private static class StubShardShardHeapUsageCollector implements ShardHeapUsageCollector {
@Override
public void collectClusterHeapUsage(ActionListener<Map<String, Long>> listener) {
listener.onResponse(Map.of());
}
}
private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) { private static void runFor(DeterministicTaskQueue deterministicTaskQueue, long duration) {
final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration; final long endTime = deterministicTaskQueue.getCurrentTimeMillis() + duration;
while (deterministicTaskQueue.getCurrentTimeMillis() < endTime while (deterministicTaskQueue.getCurrentTimeMillis() < endTime

View file

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ShardHeapUsageTests extends ESTestCase {
public void testEstimatedUsageAsPercentage() {
final long totalBytes = randomNonNegativeLong();
final long estimatedUsageBytes = randomLongBetween(0, totalBytes);
final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomUUID(), totalBytes, estimatedUsageBytes);
assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), greaterThanOrEqualTo(0.0));
assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), lessThanOrEqualTo(100.0));
assertEquals(shardHeapUsage.estimatedUsageAsPercentage(), 100.0 * estimatedUsageBytes / totalBytes, 0.0001);
}
public void testEstimatedFreeBytesAsPercentage() {
final long totalBytes = randomNonNegativeLong();
final long estimatedUsageBytes = randomLongBetween(0, totalBytes);
final long estimatedFreeBytes = totalBytes - estimatedUsageBytes;
final ShardHeapUsage shardHeapUsage = new ShardHeapUsage(randomUUID(), totalBytes, estimatedUsageBytes);
assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), greaterThanOrEqualTo(0.0));
assertThat(shardHeapUsage.estimatedFreeBytesAsPercentage(), lessThanOrEqualTo(100.0));
assertEquals(shardHeapUsage.estimatedFreeBytesAsPercentage(), 100.0 * estimatedFreeBytes / totalBytes, 0.0001);
}
}

View file

@ -205,6 +205,7 @@ public class ExpectedShardSizeEstimatorTests extends ESAllocationTestCase {
Map.of(ClusterInfo.shardIdentifierFromRouting(shard), size), Map.of(ClusterInfo.shardIdentifierFromRouting(shard), size),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
} }

View file

@ -78,6 +78,7 @@ public class AllocationStatsServiceTests extends ESAllocationTestCase {
Map.of(ClusterInfo.shardIdentifierFromRouting(shardId, true), currentShardSize), Map.of(ClusterInfo.shardIdentifierFromRouting(shardId, true), currentShardSize),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );

View file

@ -1580,7 +1580,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
Map<String, DiskUsage> diskUsages, Map<String, DiskUsage> diskUsages,
Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace Map<ClusterInfo.NodeAndPath, ClusterInfo.ReservedSpace> reservedSpace
) { ) {
return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace); return new ClusterInfo(diskUsages, Map.of(), Map.of(), Map.of(), Map.of(), reservedSpace, Map.of());
} }
private static DiscoveryNode newFrozenOnlyNode(String nodeId) { private static DiscoveryNode newFrozenOnlyNode(String nodeId) {

View file

@ -258,11 +258,12 @@ public class ExpectedShardSizeAllocationTests extends ESAllocationTestCase {
), ),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
} }
private static ClusterInfo createClusterInfo(Map<String, DiskUsage> diskUsage, Map<String, Long> shardSizes) { private static ClusterInfo createClusterInfo(Map<String, DiskUsage> diskUsage, Map<String, Long> shardSizes) {
return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of()); return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of());
} }
} }

View file

@ -608,6 +608,7 @@ public class BalancedShardsAllocatorTests extends ESAllocationTestCase {
), ),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
) )
); );
@ -704,7 +705,7 @@ public class BalancedShardsAllocatorTests extends ESAllocationTestCase {
} }
private static ClusterInfo createClusterInfo(Map<String, Long> indexSizes) { private static ClusterInfo createClusterInfo(Map<String, Long> indexSizes) {
return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of()); return new ClusterInfo(Map.of(), Map.of(), indexSizes, Map.of(), Map.of(), Map.of(), Map.of());
} }
private static IndexMetadata.Builder anIndex(String name) { private static IndexMetadata.Builder anIndex(String name) {

View file

@ -561,7 +561,7 @@ public class ClusterAllocationSimulationTests extends ESAllocationTestCase {
dataPath.put(new ClusterInfo.NodeAndShard(shardRouting.currentNodeId(), shardRouting.shardId()), "/data"); dataPath.put(new ClusterInfo.NodeAndShard(shardRouting.currentNodeId(), shardRouting.shardId()), "/data");
} }
return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of()); return new ClusterInfo(diskSpaceUsage, diskSpaceUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of());
} }
} }

View file

@ -345,6 +345,7 @@ public class ClusterBalanceStatsTests extends ESAllocationTestCase {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)), .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
} }

View file

@ -690,7 +690,15 @@ public class ClusterInfoSimulatorTests extends ESAllocationTestCase {
} }
public ClusterInfo build() { public ClusterInfo build() {
return new ClusterInfo(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace); return new ClusterInfo(
leastAvailableSpaceUsage,
mostAvailableSpaceUsage,
shardSizes,
Map.of(),
Map.of(),
reservedSpace,
Map.of()
);
} }
} }

View file

@ -690,7 +690,7 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase {
.stream() .stream()
.collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue()))); .collect(toMap(Map.Entry::getKey, it -> new DiskUsage(it.getKey(), it.getKey(), "/data", diskSize, diskSize - it.getValue())));
var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of()); var clusterInfo = new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), dataPath, Map.of(), Map.of());
var settings = Settings.EMPTY; var settings = Settings.EMPTY;
@ -1196,7 +1196,7 @@ public class DesiredBalanceComputerTests extends ESAllocationTestCase {
} }
public ClusterInfo build() { public ClusterInfo build() {
return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace); return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of());
} }
} }

View file

@ -619,6 +619,7 @@ public class DesiredBalanceReconcilerTests extends ESAllocationTestCase {
shardSizesBuilder.build(), shardSizesBuilder.build(),
ImmutableOpenMap.of(), ImmutableOpenMap.of(),
ImmutableOpenMap.of(), ImmutableOpenMap.of(),
ImmutableOpenMap.of(),
ImmutableOpenMap.of() ImmutableOpenMap.of()
); );

View file

@ -1406,7 +1406,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
Map<String, Long> shardSizes, Map<String, Long> shardSizes,
Map<NodeAndPath, ReservedSpace> reservedSpace Map<NodeAndPath, ReservedSpace> reservedSpace
) { ) {
super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace); super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, Map.of(), Map.of(), reservedSpace, Map.of());
} }
@Override @Override

View file

@ -108,6 +108,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
Map.of("[test][0][p]", 10L), // 10 bytes, Map.of("[test][0][p]", 10L), // 10 bytes,
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
RoutingAllocation allocation = new RoutingAllocation( RoutingAllocation allocation = new RoutingAllocation(
@ -179,6 +180,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
Map.of("[test][0][p]", shardSize), Map.of("[test][0][p]", shardSize),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
RoutingAllocation allocation = new RoutingAllocation( RoutingAllocation allocation = new RoutingAllocation(
@ -324,6 +326,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
shardSizes, shardSizes,
Map.of(), Map.of(),
shardRoutingMap, shardRoutingMap,
Map.of(),
Map.of() Map.of()
); );
RoutingAllocation allocation = new RoutingAllocation( RoutingAllocation allocation = new RoutingAllocation(
@ -843,6 +846,7 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
Map.of("[test][0][p]", 10L), Map.of("[test][0][p]", 10L),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
RoutingAllocation allocation = new RoutingAllocation( RoutingAllocation allocation = new RoutingAllocation(
@ -904,7 +908,15 @@ public class DiskThresholdDeciderUnitTests extends ESAllocationTestCase {
// bigger than available space // bigger than available space
final long shardSize = randomIntBetween(1, 10); final long shardSize = randomIntBetween(1, 10);
shardSizes.put("[test][0][p]", shardSize); shardSizes.put("[test][0][p]", shardSize);
ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages, mostAvailableUsage, shardSizes, Map.of(), Map.of(), Map.of()); ClusterInfo clusterInfo = new ClusterInfo(
leastAvailableUsages,
mostAvailableUsage,
shardSizes,
Map.of(),
Map.of(),
Map.of(),
Map.of()
);
RoutingAllocation allocation = new RoutingAllocation( RoutingAllocation allocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(decider)), new AllocationDeciders(Collections.singleton(decider)),
clusterState, clusterState,

View file

@ -43,7 +43,7 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService {
private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction; private volatile BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFunction;
public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { public MockInternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
super(settings, clusterService, threadPool, client); super(settings, clusterService, threadPool, client, ShardHeapUsageCollector.EMPTY);
} }
public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFn) { public void setDiskUsageFunctionAndRefresh(BiFunction<DiscoveryNode, FsInfo.Path, FsInfo.Path> diskUsageFn) {

View file

@ -959,6 +959,7 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService
extraShardSizes, extraShardSizes,
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
this.delegate = info; this.delegate = info;

View file

@ -262,7 +262,7 @@ public class AutoscalingCalculateCapacityServiceTests extends AutoscalingTestCas
} }
} }
state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build(); state = ClusterState.builder(ClusterName.DEFAULT).nodes(nodes).build();
info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of()); info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext(
roleNames, roleNames,
state, state,
@ -311,7 +311,7 @@ public class AutoscalingCalculateCapacityServiceTests extends AutoscalingTestCas
) )
); );
info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of()); info = new ClusterInfo(leastUsages, mostUsages, Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext( context = new AutoscalingCalculateCapacityService.DefaultAutoscalingDeciderContext(
roleNames, roleNames,
state, state,

View file

@ -109,7 +109,7 @@ public class FrozenStorageDeciderServiceTests extends AutoscalingTestCase {
// add irrelevant shards noise for completeness (should not happen IRL). // add irrelevant shards noise for completeness (should not happen IRL).
sizes.put(new ShardId(index, i), randomLongBetween(0, Integer.MAX_VALUE)); sizes.put(new ShardId(index, i), randomLongBetween(0, Integer.MAX_VALUE));
} }
ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of()); ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), Map.of(), sizes, Map.of(), Map.of(), Map.of());
return Tuple.tuple(totalSize, info); return Tuple.tuple(totalSize, info);
} }
} }

View file

@ -397,7 +397,7 @@ public class ProactiveStorageDeciderServiceTests extends AutoscalingTestCase {
for (var id : state.nodes().getDataNodes().keySet()) { for (var id : state.nodes().getDataNodes().keySet()) {
diskUsage.put(id, new DiskUsage(id, id, "/test", Long.MAX_VALUE, Long.MAX_VALUE)); diskUsage.put(id, new DiskUsage(id, id, "/test", Long.MAX_VALUE, Long.MAX_VALUE));
} }
return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of()); return new ClusterInfo(diskUsage, diskUsage, shardSizes, Map.of(), Map.of(), Map.of(), Map.of());
} }
private ClusterState.Builder applyCreatedDates( private ClusterState.Builder applyCreatedDates(

View file

@ -379,7 +379,7 @@ public class ReactiveStorageDeciderServiceTests extends AutoscalingTestCase {
} }
private ReactiveStorageDeciderService.AllocationState createAllocationState(Map<String, Long> shardSize, ClusterState clusterState) { private ReactiveStorageDeciderService.AllocationState createAllocationState(Map<String, Long> shardSize, ClusterState clusterState) {
ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of()); ClusterInfo info = new ClusterInfo(Map.of(), Map.of(), shardSize, Map.of(), Map.of(), Map.of(), Map.of());
ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState(
clusterState, clusterState,
null, null,
@ -544,7 +544,7 @@ public class ReactiveStorageDeciderServiceTests extends AutoscalingTestCase {
} }
var diskUsages = Map.of(nodeId, new DiskUsage(nodeId, null, null, ByteSizeUnit.KB.toBytes(100), ByteSizeUnit.KB.toBytes(5))); var diskUsages = Map.of(nodeId, new DiskUsage(nodeId, null, null, ByteSizeUnit.KB.toBytes(100), ByteSizeUnit.KB.toBytes(5)));
ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of()); ClusterInfo info = new ClusterInfo(diskUsages, diskUsages, shardSize, Map.of(), Map.of(), Map.of(), Map.of());
ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState( ReactiveStorageDeciderService.AllocationState allocationState = new ReactiveStorageDeciderService.AllocationState(
clusterState, clusterState,

View file

@ -173,6 +173,7 @@ public class TransportNodeDeprecationCheckActionTests extends ESTestCase {
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(), Map.of(),
Map.of(),
Map.of() Map.of()
); );
DeprecationIssue issue = TransportNodeDeprecationCheckAction.checkDiskLowWatermark( DeprecationIssue issue = TransportNodeDeprecationCheckAction.checkDiskLowWatermark(