mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 15:47:23 -04:00
Adding DiskHealthIndicatorService (#90041)
Adds a new health indicator that reports problems if indexes have a block placed on them, or if any nodes in the cluster are running low on disk space.
This commit is contained in:
parent
15932d5168
commit
2566cd1738
6 changed files with 1549 additions and 4 deletions
|
@ -64,6 +64,9 @@ step by step troubleshooting guide to fix the diagnosed problem.
|
|||
`shards_availability`::
|
||||
Reports health issues regarding shard assignments.
|
||||
|
||||
`disk`::
|
||||
Reports health issues caused by lack of disk space.
|
||||
|
||||
`ilm`::
|
||||
Reports health issues related to
|
||||
Indexing Lifecycle Management.
|
||||
|
@ -311,6 +314,28 @@ details have contents and a structure that is unique to each indicator.
|
|||
`started_replicas`::
|
||||
(int) The number of replica shards that are active and available on the sysetm.
|
||||
|
||||
[[health-api-response-details-disk]]
|
||||
===== disk
|
||||
|
||||
`nodes`::
|
||||
(Optional, array) A list of nodes that have reported disk usage information. This field
|
||||
is present if any node has reported disk usage.
|
||||
+
|
||||
.Properties of `nodes`
|
||||
[%collapsible%open]
|
||||
====
|
||||
`node_id`::
|
||||
(string) The node id of the node reporting disk usage.
|
||||
|
||||
`name`::
|
||||
(Optional, string) The node name of the node reporting disk usage.
|
||||
|
||||
`status`::
|
||||
(string) The status of the disk indicator on the node.
|
||||
|
||||
`cause`::
|
||||
(Optional, string) The cause for the status not being GREEN, if known.
|
||||
====
|
||||
|
||||
[[health-api-response-details-repository-integrity]]
|
||||
===== repository_integrity
|
||||
|
|
|
@ -137,7 +137,7 @@ public class StableMasterDisruptionIT extends ESIntegTestCase {
|
|||
assertBusy(() -> {
|
||||
GetHealthAction.Response healthResponse = client.execute(GetHealthAction.INSTANCE, new GetHealthAction.Request(true)).get();
|
||||
String debugInformation = xContentToString(healthResponse);
|
||||
assertThat(debugInformation, healthResponse.getStatus(), equalTo(expectedStatus));
|
||||
assertThat(debugInformation, healthResponse.findIndicator("master_is_stable").status(), equalTo(expectedStatus));
|
||||
assertThat(debugInformation, healthResponse.findIndicator("master_is_stable").symptom(), expectedMatcher);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.health.node;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.health.HealthIndicatorResult;
|
||||
import org.elasticsearch.health.HealthService;
|
||||
import org.elasticsearch.health.HealthStatus;
|
||||
import org.elasticsearch.health.node.selection.HealthNode;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
|
||||
public class DiskHealthIndicatorServiceIT extends ESIntegTestCase {
|
||||
|
||||
public void testGreen() throws Exception {
|
||||
try (InternalTestCluster internalCluster = internalCluster()) {
|
||||
internalCluster.startMasterOnlyNode();
|
||||
internalCluster.startDataOnlyNode();
|
||||
ensureStableCluster(internalCluster.getNodeNames().length);
|
||||
waitForAllNodesToReportHealth();
|
||||
for (String node : internalCluster.getNodeNames()) {
|
||||
HealthService healthService = internalCluster.getInstance(HealthService.class, node);
|
||||
List<HealthIndicatorResult> resultList = getHealthServiceResults(healthService, node);
|
||||
assertNotNull(resultList);
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
HealthIndicatorResult testIndicatorResult = resultList.get(0);
|
||||
assertThat(testIndicatorResult.status(), equalTo(HealthStatus.GREEN));
|
||||
assertThat(testIndicatorResult.symptom(), equalTo("The cluster has enough available disk space."));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testRed() throws Exception {
|
||||
try (InternalTestCluster internalCluster = internalCluster()) {
|
||||
internalCluster.startMasterOnlyNode(getVeryLowWatermarksSettings());
|
||||
internalCluster.startDataOnlyNode(getVeryLowWatermarksSettings());
|
||||
ensureStableCluster(internalCluster.getNodeNames().length);
|
||||
waitForAllNodesToReportHealth();
|
||||
for (String node : internalCluster.getNodeNames()) {
|
||||
HealthService healthService = internalCluster.getInstance(HealthService.class, node);
|
||||
List<HealthIndicatorResult> resultList = getHealthServiceResults(healthService, node);
|
||||
assertNotNull(resultList);
|
||||
assertThat(resultList.size(), equalTo(1));
|
||||
HealthIndicatorResult testIndicatorResult = resultList.get(0);
|
||||
assertThat(testIndicatorResult.status(), equalTo(HealthStatus.RED));
|
||||
assertThat(
|
||||
testIndicatorResult.symptom(),
|
||||
equalTo("2 nodes with roles: [data, master] are out of disk or running low on disk space.")
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<HealthIndicatorResult> getHealthServiceResults(HealthService healthService, String node) throws Exception {
|
||||
AtomicReference<List<HealthIndicatorResult>> resultListReference = new AtomicReference<>();
|
||||
ActionListener<List<HealthIndicatorResult>> listener = new ActionListener<>() {
|
||||
@Override
|
||||
public void onResponse(List<HealthIndicatorResult> healthIndicatorResults) {
|
||||
resultListReference.set(healthIndicatorResults);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
};
|
||||
healthService.getHealth(internalCluster().client(node), DiskHealthIndicatorService.NAME, true, listener);
|
||||
assertBusy(() -> assertNotNull(resultListReference.get()));
|
||||
return resultListReference.get();
|
||||
}
|
||||
|
||||
private Settings getVeryLowWatermarksSettings() {
|
||||
return Settings.builder()
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), "0.5%")
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), "0.5%")
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0.5%")
|
||||
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms")
|
||||
.build();
|
||||
}
|
||||
|
||||
private void waitForAllNodesToReportHealth() throws Exception {
|
||||
assertBusy(() -> {
|
||||
ClusterState state = internalCluster().clusterService().state();
|
||||
DiscoveryNode healthNode = HealthNode.findHealthNode(state);
|
||||
assertNotNull(healthNode);
|
||||
Map<String, DiskHealthInfo> healthInfoCache = internalCluster().getInstance(HealthInfoCache.class, healthNode.getName())
|
||||
.getHealthInfo()
|
||||
.diskInfoByNode();
|
||||
assertThat(healthInfoCache.size(), equalTo(state.getNodes().getNodes().keySet().size()));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,448 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.health.node;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.health.Diagnosis;
|
||||
import org.elasticsearch.health.HealthIndicatorDetails;
|
||||
import org.elasticsearch.health.HealthIndicatorImpact;
|
||||
import org.elasticsearch.health.HealthIndicatorResult;
|
||||
import org.elasticsearch.health.HealthIndicatorService;
|
||||
import org.elasticsearch.health.HealthStatus;
|
||||
import org.elasticsearch.health.ImpactArea;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class DiskHealthIndicatorService implements HealthIndicatorService {
|
||||
public static final String NAME = "disk";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(DiskHealthIndicatorService.class);
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
public DiskHealthIndicatorService(ClusterService clusterService) {
|
||||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HealthIndicatorResult calculate(boolean explain, HealthInfo healthInfo) {
|
||||
Map<String, DiskHealthInfo> diskHealthInfoMap = healthInfo.diskInfoByNode();
|
||||
if (diskHealthInfoMap == null || diskHealthInfoMap.isEmpty()) {
|
||||
/*
|
||||
* If there is no disk health info, that either means that a new health node was just elected, or something is seriously
|
||||
* wrong with health data collection on the health node. Either way, we immediately return UNKNOWN. If there are at least
|
||||
* some health info results then we work with what we have (and log any missing ones at debug level immediately below this).
|
||||
*/
|
||||
return createIndicator(
|
||||
HealthStatus.UNKNOWN,
|
||||
"No disk usage data.",
|
||||
HealthIndicatorDetails.EMPTY,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList()
|
||||
);
|
||||
}
|
||||
ClusterState clusterState = clusterService.state();
|
||||
logMissingHealthInfoData(diskHealthInfoMap, clusterState);
|
||||
|
||||
/*
|
||||
* If there are any index blocks in the cluster state, that makes the overall status automatically red, regardless of the statuses
|
||||
* returned by the nodes. If there is no cluster block, we just use the merged statuses of the nodes.
|
||||
*/
|
||||
Set<String> indicesWithBlock = clusterState.blocks()
|
||||
.indices()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().contains(IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
boolean clusterHasBlockedIndex = indicesWithBlock.isEmpty() == false;
|
||||
HealthIndicatorDetails details = getDetails(explain, diskHealthInfoMap, clusterState);
|
||||
final HealthStatus healthStatusFromNodes = HealthStatus.merge(
|
||||
diskHealthInfoMap.values().stream().map(DiskHealthInfo::healthStatus)
|
||||
);
|
||||
final HealthStatus healthStatus = clusterHasBlockedIndex ? HealthStatus.RED : healthStatusFromNodes;
|
||||
|
||||
final HealthIndicatorResult healthIndicatorResult;
|
||||
if (HealthStatus.GREEN.equals(healthStatus)) {
|
||||
healthIndicatorResult = createIndicator(
|
||||
healthStatus,
|
||||
"The cluster has enough available disk space.",
|
||||
details,
|
||||
List.of(),
|
||||
List.of()
|
||||
);
|
||||
} else {
|
||||
/*
|
||||
* In this case the status is either RED or YELLOW. So we collect information about red and yellow indices (including indices
|
||||
* with blocks placed on them), and red and yellow nodes (including those with a blocked index). We then use that
|
||||
* information to get the sympotom, impacts, and diagnoses for the result.
|
||||
*/
|
||||
Set<String> nodesWithBlockedIndices = getNodeIdsForIndices(indicesWithBlock, clusterState);
|
||||
Set<String> nodesReportingRed = getNodeIdsReportingStatus(diskHealthInfoMap, HealthStatus.RED);
|
||||
Set<String> indicesOnRedNodes = getIndicesForNodes(nodesReportingRed, clusterState);
|
||||
Set<String> nodesReportingYellow = getNodeIdsReportingStatus(diskHealthInfoMap, HealthStatus.YELLOW);
|
||||
Set<String> indicesOnYellowNodes = getIndicesForNodes(nodesReportingYellow, clusterState);
|
||||
Set<String> redDataNodes = getNodesWithDataRole(nodesReportingRed, clusterState);
|
||||
Set<String> yellowDataNodes = getNodesWithDataRole(nodesReportingYellow, clusterState);
|
||||
Set<String> redMasterNodes = getNodesWithMasterRole(nodesReportingRed, clusterState);
|
||||
Set<String> yellowMasterNodes = getNodesWithMasterRole(nodesReportingYellow, clusterState);
|
||||
Set<String> redNonDataNonMasterNodes = getNodesWithNonDataNonMasterRoles(nodesReportingRed, clusterState);
|
||||
Set<String> yellowNonDataNonMasterNodes = getNodesWithNonDataNonMasterRoles(nodesReportingYellow, clusterState);
|
||||
|
||||
String symptom = getSymptom(
|
||||
clusterHasBlockedIndex,
|
||||
indicesWithBlock,
|
||||
nodesWithBlockedIndices,
|
||||
nodesReportingRed,
|
||||
nodesReportingYellow,
|
||||
clusterState
|
||||
);
|
||||
List<HealthIndicatorImpact> impacts = getImpacts(
|
||||
indicesWithBlock,
|
||||
indicesOnRedNodes,
|
||||
indicesOnYellowNodes,
|
||||
nodesWithBlockedIndices,
|
||||
redDataNodes,
|
||||
yellowDataNodes,
|
||||
redMasterNodes,
|
||||
yellowMasterNodes,
|
||||
redNonDataNonMasterNodes,
|
||||
yellowNonDataNonMasterNodes
|
||||
);
|
||||
List<Diagnosis> diagnosisList = getDiagnoses(
|
||||
indicesWithBlock,
|
||||
indicesOnRedNodes,
|
||||
indicesOnYellowNodes,
|
||||
nodesWithBlockedIndices,
|
||||
redDataNodes,
|
||||
yellowDataNodes,
|
||||
redMasterNodes,
|
||||
yellowMasterNodes,
|
||||
redNonDataNonMasterNodes,
|
||||
yellowNonDataNonMasterNodes
|
||||
);
|
||||
healthIndicatorResult = createIndicator(healthStatus, symptom, details, impacts, diagnosisList);
|
||||
}
|
||||
return healthIndicatorResult;
|
||||
}
|
||||
|
||||
private String getSymptom(
|
||||
boolean clusterHasBlockedIndex,
|
||||
Set<String> blockedIndices,
|
||||
Set<String> nodesWithBlockedIndices,
|
||||
Set<String> nodesReportingRed,
|
||||
Set<String> nodesReportingYellow,
|
||||
ClusterState clusterState
|
||||
) {
|
||||
Set<String> allUnhealthyNodes = (Stream.concat(
|
||||
Stream.concat(nodesWithBlockedIndices.stream(), nodesReportingRed.stream()),
|
||||
nodesReportingYellow.stream()
|
||||
)).collect(Collectors.toSet());
|
||||
Set<String> allRolesOnUnhealthyNodes = getRolesOnNodes(allUnhealthyNodes, clusterState).stream()
|
||||
.map(DiscoveryNodeRole::roleName)
|
||||
.collect(Collectors.toSet());
|
||||
final String symptom;
|
||||
if (clusterHasBlockedIndex && allUnhealthyNodes.isEmpty()) {
|
||||
// In this case the disk issue has been resolved but the index block has not been automatically removed yet:
|
||||
symptom = String.format(
|
||||
Locale.ROOT,
|
||||
"%d %s blocked and cannot be updated but 0 nodes are currently out of space.",
|
||||
blockedIndices.size(),
|
||||
blockedIndices.size() == 1 ? "index is" : "indices are"
|
||||
);
|
||||
} else {
|
||||
symptom = String.format(
|
||||
Locale.ROOT,
|
||||
"%d node%s with roles: [%s] %s out of disk or running low on disk space.",
|
||||
allUnhealthyNodes.size(),
|
||||
allUnhealthyNodes.size() == 1 ? "" : "s",
|
||||
allRolesOnUnhealthyNodes.stream().sorted().collect(Collectors.joining(", ")),
|
||||
allUnhealthyNodes.size() == 1 ? "is" : "are"
|
||||
);
|
||||
}
|
||||
return symptom;
|
||||
}
|
||||
|
||||
private List<HealthIndicatorImpact> getImpacts(
|
||||
Set<String> indicesWithBlock,
|
||||
Set<String> indicesOnRedNodes,
|
||||
Set<String> indicesOnYellowNodes,
|
||||
Set<String> nodesWithBlockedIndices,
|
||||
Set<String> redDataNodes,
|
||||
Set<String> yellowDataNodes,
|
||||
Set<String> redMasterNodes,
|
||||
Set<String> yellowMasterNodes,
|
||||
Set<String> redNonDataNonMasterNodes,
|
||||
Set<String> yellowNonDataNonMasterNodes
|
||||
) {
|
||||
List<HealthIndicatorImpact> impacts = new ArrayList<>();
|
||||
if (indicesWithBlock.isEmpty() == false
|
||||
|| indicesOnRedNodes.isEmpty() == false
|
||||
|| nodesWithBlockedIndices.isEmpty() == false
|
||||
|| redDataNodes.isEmpty() == false) {
|
||||
impacts.add(
|
||||
new HealthIndicatorImpact(1, "Cannot insert or update documents in the affected indices.", List.of(ImpactArea.INGEST))
|
||||
);
|
||||
} else if (indicesOnYellowNodes.isEmpty() == false || yellowDataNodes.isEmpty() == false) {
|
||||
impacts.add(
|
||||
new HealthIndicatorImpact(
|
||||
1,
|
||||
"At risk of not being able to insert or update documents in the affected indices.",
|
||||
List.of(ImpactArea.INGEST)
|
||||
)
|
||||
);
|
||||
}
|
||||
if (redMasterNodes.isEmpty() == false || yellowMasterNodes.isEmpty() == false) {
|
||||
impacts.add(new HealthIndicatorImpact(2, "Cluster stability might be impaired.", List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
}
|
||||
if (redNonDataNonMasterNodes.isEmpty() == false || yellowNonDataNonMasterNodes.isEmpty() == false) {
|
||||
impacts.add(
|
||||
new HealthIndicatorImpact(2, "Some cluster functionality might be unavailable.", List.of(ImpactArea.DEPLOYMENT_MANAGEMENT))
|
||||
);
|
||||
}
|
||||
return impacts;
|
||||
}
|
||||
|
||||
private List<Diagnosis> getDiagnoses(
|
||||
Set<String> indicesWithBlock,
|
||||
Set<String> indicesOnRedNodes,
|
||||
Set<String> indicesOnYellowNodes,
|
||||
Set<String> nodesWithBlockedIndices,
|
||||
Set<String> redDataNodes,
|
||||
Set<String> yellowDataNodes,
|
||||
Set<String> redMasterNodes,
|
||||
Set<String> yellowMasterNodes,
|
||||
Set<String> redNonDataNonMasterNodes,
|
||||
Set<String> yellowNonDataNonMasterNodes
|
||||
) {
|
||||
List<Diagnosis> diagnosisList = new ArrayList<>();
|
||||
if (indicesWithBlock.isEmpty() == false
|
||||
|| nodesWithBlockedIndices.isEmpty() == false
|
||||
|| indicesOnRedNodes.isEmpty() == false
|
||||
|| redDataNodes.isEmpty() == false
|
||||
|| indicesOnYellowNodes.isEmpty() == false
|
||||
|| yellowDataNodes.isEmpty() == false) {
|
||||
Set<String> impactedIndices = Stream.concat(
|
||||
Stream.concat(indicesWithBlock.stream(), indicesOnRedNodes.stream()),
|
||||
indicesOnYellowNodes.stream()
|
||||
).collect(Collectors.toSet());
|
||||
Set<String> unhealthyNodes = Stream.concat(
|
||||
Stream.concat(nodesWithBlockedIndices.stream(), redDataNodes.stream()),
|
||||
yellowDataNodes.stream()
|
||||
).collect(Collectors.toSet());
|
||||
diagnosisList.add(
|
||||
new Diagnosis(
|
||||
new Diagnosis.Definition(
|
||||
"free-disk-space-or-add-capacity-data-nodes",
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
"%d %s reside%s on nodes that have run out of space and writing has been blocked by the system.",
|
||||
impactedIndices.size(),
|
||||
impactedIndices.size() == 1 ? "index" : "indices",
|
||||
impactedIndices.size() == 1 ? "s" : ""
|
||||
),
|
||||
"Enable autoscaling (if applicable), add disk capacity or free up disk space to resolve "
|
||||
+ "this. If you have already taken action please wait for the rebalancing to complete.",
|
||||
"https://ela.st/free-disk-space-or-add-capacity-data-nodes"
|
||||
),
|
||||
unhealthyNodes.stream().sorted().toList()
|
||||
)
|
||||
);
|
||||
}
|
||||
if (redMasterNodes.isEmpty() == false || yellowMasterNodes.isEmpty() == false) {
|
||||
diagnosisList.add(
|
||||
new Diagnosis(
|
||||
new Diagnosis.Definition(
|
||||
"free-disk-space-or-add-capacity-master-nodes",
|
||||
"Disk is almost full.",
|
||||
"Please add capacity to the current nodes, or replace them with ones with higher capacity.",
|
||||
"https://ela.st/free-disk-space-or-add-capacity-master-nodes"
|
||||
),
|
||||
Stream.concat(redMasterNodes.stream(), yellowMasterNodes.stream()).sorted().toList()
|
||||
)
|
||||
);
|
||||
}
|
||||
if (redNonDataNonMasterNodes.isEmpty() == false || yellowNonDataNonMasterNodes.isEmpty() == false) {
|
||||
diagnosisList.add(
|
||||
new Diagnosis(
|
||||
new Diagnosis.Definition(
|
||||
"free-disk-space-or-add-capacity-other-nodes",
|
||||
"Disk is almost full.",
|
||||
"Please add capacity to the current nodes, or replace them with ones with higher capacity.",
|
||||
"https://ela.st/free-disk-space-or-add-capacity-other-nodes"
|
||||
),
|
||||
Stream.concat(redNonDataNonMasterNodes.stream(), yellowNonDataNonMasterNodes.stream()).sorted().toList()
|
||||
)
|
||||
);
|
||||
}
|
||||
return diagnosisList;
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getNodeIdsReportingStatus(Map<String, DiskHealthInfo> diskHealthInfoMap, HealthStatus status) {
|
||||
return diskHealthInfoMap.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> status.equals(entry.getValue().healthStatus()))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<DiscoveryNodeRole> getRolesOnNodes(Set<String> nodeIds, ClusterState clusterState) {
|
||||
return clusterState.nodes()
|
||||
.getNodes()
|
||||
.values()
|
||||
.stream()
|
||||
.filter(node -> nodeIds.contains(node.getId()))
|
||||
.map(DiscoveryNode::getRoles)
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getNodesWithDataRole(Set<String> nodeIds, ClusterState clusterState) {
|
||||
return clusterState.nodes()
|
||||
.getNodes()
|
||||
.values()
|
||||
.stream()
|
||||
.filter(node -> nodeIds.contains(node.getId()))
|
||||
.filter(node -> node.getRoles().stream().anyMatch(DiscoveryNodeRole::canContainData))
|
||||
.map(DiscoveryNode::getId)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getNodesWithMasterRole(Set<String> nodeIds, ClusterState clusterState) {
|
||||
return clusterState.nodes()
|
||||
.getNodes()
|
||||
.values()
|
||||
.stream()
|
||||
.filter(node -> nodeIds.contains(node.getId()))
|
||||
.filter(node -> node.getRoles().contains(DiscoveryNodeRole.MASTER_ROLE))
|
||||
.map(DiscoveryNode::getId)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getNodesWithNonDataNonMasterRoles(Set<String> nodeIds, ClusterState clusterState) {
|
||||
return clusterState.nodes()
|
||||
.getNodes()
|
||||
.values()
|
||||
.stream()
|
||||
.filter(node -> nodeIds.contains(node.getId()))
|
||||
.filter(
|
||||
node -> node.getRoles()
|
||||
.stream()
|
||||
.anyMatch(role -> (role.equals(DiscoveryNodeRole.MASTER_ROLE) || role.canContainData()) == false)
|
||||
)
|
||||
.map(DiscoveryNode::getId)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getIndicesForNodes(Set<String> nodes, ClusterState clusterState) {
|
||||
return clusterState.routingTable()
|
||||
.allShards()
|
||||
.stream()
|
||||
.filter(routing -> nodes.contains(routing.currentNodeId()))
|
||||
.map(routing -> routing.index().getName())
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
// Non-private for unit testing
|
||||
static Set<String> getNodeIdsForIndices(Set<String> indices, ClusterState clusterState) {
|
||||
return clusterState.routingTable()
|
||||
.allShards()
|
||||
.stream()
|
||||
.filter(routing -> indices.contains(routing.index().getName()))
|
||||
.map(ShardRouting::currentNodeId)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* This method logs if any nodes in the cluster state do not have health info results reported. This is logged at debug level and is
|
||||
* not ordinarly important, but could be useful in tracking down problems where nodes have stopped reporting health node information.
|
||||
* @param diskHealthInfoMap A map of nodeId to DiskHealthInfo
|
||||
*/
|
||||
private void logMissingHealthInfoData(Map<String, DiskHealthInfo> diskHealthInfoMap, ClusterState clusterState) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
Set<DiscoveryNode> nodesInClusterState = new HashSet<>(clusterState.nodes());
|
||||
Set<String> nodeIdsInClusterState = nodesInClusterState.stream().map(DiscoveryNode::getId).collect(Collectors.toSet());
|
||||
Set<String> nodeIdsInHealthInfo = diskHealthInfoMap.keySet();
|
||||
if (nodeIdsInHealthInfo.containsAll(nodeIdsInClusterState) == false) {
|
||||
String nodesWithMissingData = nodesInClusterState.stream()
|
||||
.filter(node -> nodeIdsInHealthInfo.contains(node.getId()) == false)
|
||||
.map(node -> String.format(Locale.ROOT, "{%s / %s}", node.getId(), node.getName()))
|
||||
.collect(Collectors.joining(", "));
|
||||
logger.debug("The following nodes are in the cluster state but not reporting health data: [{}}]", nodesWithMissingData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private HealthIndicatorDetails getDetails(boolean explain, Map<String, DiskHealthInfo> diskHealthInfoMap, ClusterState clusterState) {
|
||||
if (explain == false) {
|
||||
return HealthIndicatorDetails.EMPTY;
|
||||
}
|
||||
return (builder, params) -> {
|
||||
builder.startObject();
|
||||
builder.array("nodes", arrayXContentBuilder -> {
|
||||
for (Map.Entry<String, DiskHealthInfo> entry : diskHealthInfoMap.entrySet()) {
|
||||
builder.startObject();
|
||||
String nodeId = entry.getKey();
|
||||
builder.field("node_id", nodeId);
|
||||
String nodeName = getNameForNodeId(nodeId, clusterState);
|
||||
if (nodeName != null) {
|
||||
builder.field("name", nodeName);
|
||||
}
|
||||
builder.field("status", entry.getValue().healthStatus());
|
||||
DiskHealthInfo.Cause cause = entry.getValue().cause();
|
||||
if (cause != null) {
|
||||
builder.field("cause", entry.getValue().cause());
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
});
|
||||
return builder.endObject();
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the name of the node with the given nodeId, as seen in the cluster state at this moment. The name of a node is optional,
|
||||
* so if the node does not have a name (or the node with the given nodeId is no longer in the cluster state), null is returned.
|
||||
* @param nodeId The id of the node whose name is to be returned
|
||||
* @return The current name of the node, or null if the node is not in the cluster state or does not have a name
|
||||
*/
|
||||
@Nullable
|
||||
private String getNameForNodeId(String nodeId, ClusterState clusterState) {
|
||||
DiscoveryNode node = clusterState.nodes().get(nodeId);
|
||||
return node == null ? null : node.getName();
|
||||
}
|
||||
}
|
|
@ -105,6 +105,7 @@ import org.elasticsearch.gateway.PersistedClusterStateService;
|
|||
import org.elasticsearch.health.HealthIndicatorService;
|
||||
import org.elasticsearch.health.HealthService;
|
||||
import org.elasticsearch.health.metadata.HealthMetadataService;
|
||||
import org.elasticsearch.health.node.DiskHealthIndicatorService;
|
||||
import org.elasticsearch.health.node.HealthInfoCache;
|
||||
import org.elasticsearch.health.node.LocalHealthMonitor;
|
||||
import org.elasticsearch.health.node.selection.HealthNode;
|
||||
|
@ -1164,10 +1165,15 @@ public class Node implements Closeable {
|
|||
List<HealthIndicatorService> preflightHealthIndicatorServices = Collections.singletonList(
|
||||
new StableMasterHealthIndicatorService(coordinationDiagnosticsService, clusterService)
|
||||
);
|
||||
var serverHealthIndicatorServices = List.of(
|
||||
new RepositoryIntegrityHealthIndicatorService(clusterService),
|
||||
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
|
||||
var serverHealthIndicatorServices = new ArrayList<>(
|
||||
List.of(
|
||||
new RepositoryIntegrityHealthIndicatorService(clusterService),
|
||||
new ShardsAvailabilityHealthIndicatorService(clusterService, clusterModule.getAllocationService())
|
||||
)
|
||||
);
|
||||
if (HealthNode.isEnabled()) {
|
||||
serverHealthIndicatorServices.add(new DiskHealthIndicatorService(clusterService));
|
||||
}
|
||||
var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
|
||||
.stream()
|
||||
.flatMap(plugin -> plugin.getHealthIndicatorServices().stream())
|
||||
|
|
|
@ -0,0 +1,957 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||
* Side Public License, v 1.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.health.node;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||
import org.elasticsearch.health.Diagnosis;
|
||||
import org.elasticsearch.health.HealthIndicatorImpact;
|
||||
import org.elasticsearch.health.HealthIndicatorResult;
|
||||
import org.elasticsearch.health.HealthStatus;
|
||||
import org.elasticsearch.health.ImpactArea;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.xcontent.ToXContent;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xcontent.XContentFactory;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class DiskHealthIndicatorServiceTests extends ESTestCase {
|
||||
public void testServiceBasics() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
{
|
||||
HealthStatus expectedStatus = HealthStatus.GREEN;
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
}
|
||||
{
|
||||
HealthStatus expectedStatus = HealthStatus.YELLOW;
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
}
|
||||
{
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testGreen() throws IOException {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthStatus expectedStatus = HealthStatus.GREEN;
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(result.symptom(), equalTo("The cluster has enough available disk space."));
|
||||
assertThat(result.impacts().size(), equalTo(0));
|
||||
assertThat(result.diagnosisList().size(), equalTo(0));
|
||||
Map<String, Object> detailsMap = xContentToMap(result.details());
|
||||
assertThat(detailsMap.size(), equalTo(1));
|
||||
List<Map<String, String>> nodeDetails = (List<Map<String, String>>) detailsMap.get("nodes");
|
||||
assertThat(nodeDetails.size(), equalTo(discoveryNodes.size()));
|
||||
Map<String, String> nodeIdToName = discoveryNodes.stream().collect(Collectors.toMap(DiscoveryNode::getId, DiscoveryNode::getName));
|
||||
for (Map<String, String> nodeDetail : nodeDetails) {
|
||||
assertThat(nodeDetail.size(), greaterThanOrEqualTo(3));
|
||||
assertThat(nodeDetail.size(), lessThanOrEqualTo(4)); // Could have a cause
|
||||
String nodeId = nodeDetail.get("node_id");
|
||||
assertThat(nodeDetail.get("name"), equalTo(nodeIdToName.get(nodeId)));
|
||||
assertThat(nodeDetail.get("status"), equalTo("GREEN"));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testRedNoBlocksNoIndices() throws IOException {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(result.symptom(), containsString("1 node with roles: [data"));
|
||||
assertThat(result.symptom(), containsString("] is out of disk or running low on disk space."));
|
||||
assertThat(result.impacts().size(), equalTo(3));
|
||||
HealthIndicatorImpact impact = result.impacts().get(0);
|
||||
assertNotNull(impact);
|
||||
List<ImpactArea> impactAreas = impact.impactAreas();
|
||||
assertThat(impactAreas.size(), equalTo(1));
|
||||
assertThat(impactAreas.get(0), equalTo(ImpactArea.INGEST));
|
||||
assertThat(impact.severity(), equalTo(1));
|
||||
assertThat(impact.impactDescription(), equalTo("Cannot insert or update documents in the affected indices."));
|
||||
assertThat(result.diagnosisList().size(), equalTo(3));
|
||||
Diagnosis diagnosis = result.diagnosisList().get(0);
|
||||
List<String> affectedResources = diagnosis.affectedResources();
|
||||
assertThat(affectedResources.size(), equalTo(1));
|
||||
String expectedRedNodeId = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> expectedStatus.equals(entry.getValue().healthStatus()))
|
||||
.map(Map.Entry::getKey)
|
||||
.findAny()
|
||||
.orElseThrow();
|
||||
assertThat(affectedResources.get(0), equalTo(expectedRedNodeId));
|
||||
Map<String, Object> detailsMap = xContentToMap(result.details());
|
||||
assertThat(detailsMap.size(), equalTo(1));
|
||||
List<Map<String, String>> nodeDetails = (List<Map<String, String>>) detailsMap.get("nodes");
|
||||
assertThat(nodeDetails.size(), equalTo(discoveryNodes.size()));
|
||||
Map<String, String> nodeIdToName = discoveryNodes.stream().collect(Collectors.toMap(DiscoveryNode::getId, DiscoveryNode::getName));
|
||||
for (Map<String, String> nodeDetail : nodeDetails) {
|
||||
assertThat(nodeDetail.size(), greaterThanOrEqualTo(3));
|
||||
assertThat(nodeDetail.size(), lessThanOrEqualTo(4)); // Could have a cause
|
||||
String nodeId = nodeDetail.get("node_id");
|
||||
assertThat(nodeDetail.get("name"), equalTo(nodeIdToName.get(nodeId)));
|
||||
if (nodeId.equals(expectedRedNodeId)) {
|
||||
assertThat(nodeDetail.get("status"), equalTo("RED"));
|
||||
} else {
|
||||
assertThat(nodeDetail.get("status"), equalTo("GREEN"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testRedNoBlocksWithIndices() throws IOException {
|
||||
/*
|
||||
* This method tests that we get the expected behavior when there are nodes with indices that report RED status and there are no
|
||||
* blocks in the cluster state.
|
||||
*/
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
int numberOfRedNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, numberOfRedNodes, discoveryNodes);
|
||||
Set<String> redNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> nonRedNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus) == false)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap = new HashMap<>();
|
||||
int numberOfIndices = randomIntBetween(1, 1000);
|
||||
int numberOfRedIndices = randomIntBetween(1, numberOfIndices);
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
String indexName = randomAlphaOfLength(20);
|
||||
/*
|
||||
* The following is artificial but useful for making sure the test has the right counts. The first numberOfRedIndices indices
|
||||
* are always placed on all of the red nodes. All other indices are placed on all of the non red nodes.
|
||||
*/
|
||||
if (i < numberOfRedIndices) {
|
||||
indexNameToNodeIdsMap.put(indexName, redNodeIds);
|
||||
} else {
|
||||
indexNameToNodeIdsMap.put(indexName, nonRedNodeIds);
|
||||
}
|
||||
}
|
||||
ClusterService clusterService = createClusterService(Set.of(), discoveryNodes, indexNameToNodeIdsMap);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(
|
||||
result.symptom(),
|
||||
containsString(numberOfRedNodes + " node" + (numberOfRedNodes == 1 ? "" : "s") + " with roles: [data")
|
||||
);
|
||||
assertThat(result.symptom(), containsString(" out of disk or running low on disk space."));
|
||||
assertThat(result.impacts().size(), equalTo(3));
|
||||
HealthIndicatorImpact impact = result.impacts().get(0);
|
||||
assertNotNull(impact);
|
||||
List<ImpactArea> impactAreas = impact.impactAreas();
|
||||
assertThat(impactAreas.size(), equalTo(1));
|
||||
assertThat(impactAreas.get(0), equalTo(ImpactArea.INGEST));
|
||||
assertThat(impact.severity(), equalTo(1));
|
||||
assertThat(impact.impactDescription(), equalTo("Cannot insert or update documents in the affected indices."));
|
||||
assertThat(result.diagnosisList().size(), equalTo(3));
|
||||
Diagnosis diagnosis = result.diagnosisList().get(0);
|
||||
List<String> affectedResources = diagnosis.affectedResources();
|
||||
assertThat(affectedResources.size(), equalTo(numberOfRedNodes));
|
||||
assertTrue(affectedResources.containsAll(redNodeIds));
|
||||
Map<String, Object> detailsMap = xContentToMap(result.details());
|
||||
assertThat(detailsMap.size(), equalTo(1));
|
||||
List<Map<String, String>> nodeDetails = (List<Map<String, String>>) detailsMap.get("nodes");
|
||||
assertThat(nodeDetails.size(), equalTo(discoveryNodes.size()));
|
||||
Map<String, String> nodeIdToName = discoveryNodes.stream().collect(Collectors.toMap(DiscoveryNode::getId, DiscoveryNode::getName));
|
||||
for (Map<String, String> nodeDetail : nodeDetails) {
|
||||
assertThat(nodeDetail.size(), greaterThanOrEqualTo(3));
|
||||
assertThat(nodeDetail.size(), lessThanOrEqualTo(4)); // Could have a cause
|
||||
String nodeId = nodeDetail.get("node_id");
|
||||
assertThat(nodeDetail.get("name"), equalTo(nodeIdToName.get(nodeId)));
|
||||
if (redNodeIds.contains(nodeId)) {
|
||||
assertThat(nodeDetail.get("status"), equalTo("RED"));
|
||||
} else {
|
||||
assertThat(nodeDetail.get("status"), equalTo("GREEN"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testHasBlockButOtherwiseGreen() {
|
||||
/*
|
||||
* Tests when there is an index that has a block on it but the nodes report green (so the lock is probably about to be released).
|
||||
*/
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
ClusterService clusterService = createClusterService(true, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
{
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.GREEN, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(result.symptom(), equalTo("1 index is blocked and cannot be updated but 0 nodes are currently out of space."));
|
||||
}
|
||||
}
|
||||
|
||||
public void testHasBlockButOtherwiseYellow() {
|
||||
/*
|
||||
* Tests when there is an index that has a block on it but the nodes report yellow.
|
||||
*/
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
ClusterService clusterService = createClusterService(true, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
int numberOfYellowNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.YELLOW, numberOfYellowNodes, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(result.symptom(), containsString(" out of disk or running low on disk space."));
|
||||
}
|
||||
|
||||
public void testHasBlockButOtherwiseRed() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
int numberOfRedNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.RED, numberOfRedNodes, discoveryNodes);
|
||||
Set<String> redNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> nonRedNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus) == false)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap = new HashMap<>();
|
||||
int numberOfIndices = randomIntBetween(1, 1000);
|
||||
Set<String> blockedIndices = new HashSet<>();
|
||||
int numberOfRedIndices = randomIntBetween(1, numberOfIndices);
|
||||
Set<String> redIndices = new HashSet<>();
|
||||
Set<String> allUnhealthyNodes = new HashSet<>();
|
||||
allUnhealthyNodes.addAll(redNodeIds);
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
String indexName = randomAlphaOfLength(20);
|
||||
/*
|
||||
* The following is artificial but useful for making sure the test has the right counts. The first numberOfRedIndices indices
|
||||
* are always placed on all of the red nodes. All other indices are placed on all of the non red nodes.
|
||||
*/
|
||||
if (i < numberOfRedIndices) {
|
||||
indexNameToNodeIdsMap.put(indexName, redNodeIds);
|
||||
redIndices.add(indexName);
|
||||
} else {
|
||||
indexNameToNodeIdsMap.put(indexName, nonRedNodeIds);
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
blockedIndices.add(indexName);
|
||||
}
|
||||
}
|
||||
ClusterService clusterService = createClusterService(blockedIndices, discoveryNodes, indexNameToNodeIdsMap);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(result.symptom(), containsString(" out of disk or running low on disk space."));
|
||||
for (String index : blockedIndices) {
|
||||
allUnhealthyNodes.addAll(indexNameToNodeIdsMap.get(index));
|
||||
}
|
||||
assertThat(
|
||||
result.symptom(),
|
||||
containsString(allUnhealthyNodes.size() + " node" + (allUnhealthyNodes.size() == 1 ? "" : "s") + " with roles: [data")
|
||||
);
|
||||
}
|
||||
|
||||
public void testMissingHealthInfo() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
Set<DiscoveryNode> discoveryNodesInClusterState = new HashSet<>(discoveryNodes);
|
||||
discoveryNodesInClusterState.add(
|
||||
new DiscoveryNode(
|
||||
randomAlphaOfLength(30),
|
||||
UUID.randomUUID().toString(),
|
||||
buildNewFakeTransportAddress(),
|
||||
Collections.emptyMap(),
|
||||
DiscoveryNodeRole.roles(),
|
||||
Version.CURRENT
|
||||
)
|
||||
);
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodesInClusterState);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
{
|
||||
HealthInfo healthInfo = HealthInfo.EMPTY_HEALTH_INFO;
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(HealthStatus.UNKNOWN));
|
||||
}
|
||||
{
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.GREEN, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(HealthStatus.GREEN));
|
||||
}
|
||||
{
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.YELLOW, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(HealthStatus.YELLOW));
|
||||
}
|
||||
{
|
||||
HealthInfo healthInfo = createHealthInfo(HealthStatus.RED, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(HealthStatus.RED));
|
||||
}
|
||||
}
|
||||
|
||||
public void testMasterNodeProblems() {
|
||||
Set<DiscoveryNodeRole> roles = Set.of(
|
||||
DiscoveryNodeRole.MASTER_ROLE,
|
||||
randomFrom(
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE
|
||||
)
|
||||
);
|
||||
Set<DiscoveryNode> discoveryNodes = createNodes(roles);
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthStatus expectedStatus = randomFrom(HealthStatus.RED, HealthStatus.YELLOW);
|
||||
int numberOfProblemNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, numberOfProblemNodes, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(
|
||||
result.symptom(),
|
||||
equalTo(
|
||||
numberOfProblemNodes
|
||||
+ " node"
|
||||
+ (numberOfProblemNodes == 1 ? "" : "s")
|
||||
+ " with roles: ["
|
||||
+ roles.stream().map(DiscoveryNodeRole::roleName).sorted().collect(Collectors.joining(", "))
|
||||
+ "] "
|
||||
+ (numberOfProblemNodes == 1 ? "is" : "are")
|
||||
+ " out of disk or running low on disk space."
|
||||
)
|
||||
);
|
||||
List<HealthIndicatorImpact> impacts = result.impacts();
|
||||
assertThat(impacts.size(), equalTo(2));
|
||||
assertThat(impacts.get(0).impactDescription(), equalTo("Cluster stability might be impaired."));
|
||||
assertThat(impacts.get(0).severity(), equalTo(2));
|
||||
assertThat(impacts.get(0).impactAreas(), equalTo(List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
assertThat(impacts.get(1).impactDescription(), equalTo("Some cluster functionality might be unavailable."));
|
||||
assertThat(impacts.get(1).severity(), equalTo(2));
|
||||
assertThat(impacts.get(1).impactAreas(), equalTo(List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
|
||||
List<Diagnosis> diagnosisList = result.diagnosisList();
|
||||
assertThat(diagnosisList.size(), equalTo(2));
|
||||
Diagnosis diagnosis = diagnosisList.get(0);
|
||||
List<String> affectedResources = diagnosis.affectedResources();
|
||||
assertThat(affectedResources.size(), equalTo(numberOfProblemNodes));
|
||||
Diagnosis.Definition diagnosisDefinition = diagnosis.definition();
|
||||
assertThat(diagnosisDefinition.cause(), equalTo("Disk is almost full."));
|
||||
assertThat(
|
||||
diagnosisDefinition.action(),
|
||||
equalTo("Please add capacity to the current nodes, or replace them with ones with higher capacity.")
|
||||
);
|
||||
}
|
||||
|
||||
public void testNonDataNonMasterNodeProblems() {
|
||||
Set<DiscoveryNodeRole> nonMasterNonDataRoles = Set.of(
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE
|
||||
);
|
||||
Set<DiscoveryNodeRole> roles = new HashSet<>(
|
||||
randomSubsetOf(randomIntBetween(1, nonMasterNonDataRoles.size()), nonMasterNonDataRoles)
|
||||
);
|
||||
Set<DiscoveryNode> discoveryNodes = createNodes(roles);
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
HealthStatus expectedStatus = randomFrom(HealthStatus.RED, HealthStatus.YELLOW);
|
||||
int numberOfProblemNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, numberOfProblemNodes, discoveryNodes);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(expectedStatus));
|
||||
assertThat(
|
||||
result.symptom(),
|
||||
equalTo(
|
||||
numberOfProblemNodes
|
||||
+ " node"
|
||||
+ (numberOfProblemNodes == 1 ? "" : "s")
|
||||
+ " with roles: ["
|
||||
+ roles.stream().map(DiscoveryNodeRole::roleName).sorted().collect(Collectors.joining(", "))
|
||||
+ "] "
|
||||
+ (numberOfProblemNodes == 1 ? "is" : "are")
|
||||
+ " out of disk or running low on disk space."
|
||||
)
|
||||
);
|
||||
List<HealthIndicatorImpact> impacts = result.impacts();
|
||||
assertThat(impacts.size(), equalTo(1));
|
||||
assertThat(impacts.get(0).impactDescription(), equalTo("Some cluster functionality might be unavailable."));
|
||||
assertThat(impacts.get(0).severity(), equalTo(2));
|
||||
assertThat(impacts.get(0).impactAreas(), equalTo(List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
List<Diagnosis> diagnosisList = result.diagnosisList();
|
||||
assertThat(diagnosisList.size(), equalTo(1));
|
||||
Diagnosis diagnosis = diagnosisList.get(0);
|
||||
List<String> affectedResources = diagnosis.affectedResources();
|
||||
assertThat(affectedResources.size(), equalTo(numberOfProblemNodes));
|
||||
Diagnosis.Definition diagnosisDefinition = diagnosis.definition();
|
||||
assertThat(diagnosisDefinition.cause(), equalTo("Disk is almost full."));
|
||||
assertThat(
|
||||
diagnosisDefinition.action(),
|
||||
equalTo("Please add capacity to the current nodes, or replace them with ones with higher capacity.")
|
||||
);
|
||||
}
|
||||
|
||||
public void testBlockedIndexWithRedNonDataNodesAndYellowDataNodes() {
|
||||
/*
|
||||
* In this test, there are indices with blocks on them, master nodes that report RED, non-data nodes that report RED, and data
|
||||
* nodes that report YELLOW. We expect the overall status will be RED, with 3 impacts and 3 diagnoses (for the 3 different node
|
||||
* types experiencing problems).
|
||||
*/
|
||||
Set<DiscoveryNodeRole> allNonDataNonMasterRoles = Set.of(
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE
|
||||
);
|
||||
Set<DiscoveryNodeRole> nonDataNonMasterRoles = new HashSet<>(
|
||||
randomSubsetOf(randomIntBetween(1, allNonDataNonMasterRoles.size()), allNonDataNonMasterRoles)
|
||||
);
|
||||
Set<DiscoveryNodeRole> allDataRoles = Set.of(
|
||||
DiscoveryNodeRole.DATA_ROLE,
|
||||
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_COLD_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_HOT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_WARM_NODE_ROLE
|
||||
);
|
||||
Set<DiscoveryNodeRole> dataRoles = new HashSet<>(randomSubsetOf(randomIntBetween(1, allDataRoles.size()), allDataRoles));
|
||||
Set<DiscoveryNode> masterDiscoveryNodes = createNodes(Set.of(DiscoveryNodeRole.MASTER_ROLE));
|
||||
Set<DiscoveryNode> nonDataNonMasterDiscoveryNodes = createNodes(nonDataNonMasterRoles);
|
||||
Set<DiscoveryNode> dataDiscoveryNodes = createNodes(dataRoles);
|
||||
ClusterService clusterService = createClusterService(
|
||||
true,
|
||||
Stream.concat(
|
||||
masterDiscoveryNodes.stream(),
|
||||
(Stream.concat(nonDataNonMasterDiscoveryNodes.stream(), dataDiscoveryNodes.stream()))
|
||||
).collect(Collectors.toSet())
|
||||
);
|
||||
DiskHealthIndicatorService diskHealthIndicatorService = new DiskHealthIndicatorService(clusterService);
|
||||
int numberOfRedMasterNodes = randomIntBetween(1, masterDiscoveryNodes.size());
|
||||
int numberOfRedNonDataNonMasterNodes = randomIntBetween(1, nonDataNonMasterDiscoveryNodes.size());
|
||||
int numberOfYellowDataNodes = randomIntBetween(1, dataDiscoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(
|
||||
HealthStatus.RED,
|
||||
numberOfRedMasterNodes,
|
||||
masterDiscoveryNodes,
|
||||
HealthStatus.RED,
|
||||
numberOfRedNonDataNonMasterNodes,
|
||||
nonDataNonMasterDiscoveryNodes,
|
||||
HealthStatus.YELLOW,
|
||||
numberOfYellowDataNodes,
|
||||
dataDiscoveryNodes
|
||||
);
|
||||
HealthIndicatorResult result = diskHealthIndicatorService.calculate(true, healthInfo);
|
||||
assertThat(result.status(), equalTo(HealthStatus.RED));
|
||||
assertThat(result.symptom(), containsString(" out of disk or running low on disk space."));
|
||||
List<HealthIndicatorImpact> impacts = result.impacts();
|
||||
assertThat(impacts.size(), equalTo(3));
|
||||
assertThat(impacts.get(0).impactDescription(), equalTo("Cannot insert or update documents in the affected indices."));
|
||||
assertThat(impacts.get(0).severity(), equalTo(1));
|
||||
assertThat(impacts.get(0).impactAreas(), equalTo(List.of(ImpactArea.INGEST)));
|
||||
assertThat(impacts.get(1).impactDescription(), equalTo("Cluster stability might be impaired."));
|
||||
assertThat(impacts.get(1).severity(), equalTo(2));
|
||||
assertThat(impacts.get(1).impactAreas(), equalTo(List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
assertThat(impacts.get(2).impactDescription(), equalTo("Some cluster functionality might be unavailable."));
|
||||
assertThat(impacts.get(2).severity(), equalTo(2));
|
||||
assertThat(impacts.get(2).impactAreas(), equalTo(List.of(ImpactArea.DEPLOYMENT_MANAGEMENT)));
|
||||
|
||||
List<Diagnosis> diagnosisList = result.diagnosisList();
|
||||
assertThat(diagnosisList.size(), equalTo(3));
|
||||
Diagnosis dataDiagnosis = diagnosisList.get(0);
|
||||
List<String> dataAffectedResources = dataDiagnosis.affectedResources();
|
||||
assertThat(dataAffectedResources.size(), equalTo(numberOfYellowDataNodes));
|
||||
Diagnosis.Definition dataDiagnosisDefinition = dataDiagnosis.definition();
|
||||
assertThat(
|
||||
dataDiagnosisDefinition.cause(),
|
||||
equalTo("1 index resides on nodes that have run out of space and writing has been blocked by the system.")
|
||||
);
|
||||
assertThat(
|
||||
dataDiagnosisDefinition.action(),
|
||||
equalTo(
|
||||
"Enable autoscaling (if applicable), add disk capacity or free up disk space to resolve this. If you have already "
|
||||
+ "taken action please wait for the rebalancing to complete."
|
||||
)
|
||||
);
|
||||
|
||||
Diagnosis masterDiagnosis = diagnosisList.get(1);
|
||||
List<String> masterAffectedResources = masterDiagnosis.affectedResources();
|
||||
assertThat(masterAffectedResources.size(), equalTo(numberOfRedMasterNodes));
|
||||
Diagnosis.Definition masterDiagnosisDefinition = masterDiagnosis.definition();
|
||||
assertThat(masterDiagnosisDefinition.cause(), equalTo("Disk is almost full."));
|
||||
assertThat(
|
||||
masterDiagnosisDefinition.action(),
|
||||
equalTo("Please add capacity to the current nodes, or replace them with ones with higher capacity.")
|
||||
);
|
||||
|
||||
Diagnosis nonDataNonMasterDiagnosis = diagnosisList.get(2);
|
||||
List<String> nonDataNonMasterAffectedResources = nonDataNonMasterDiagnosis.affectedResources();
|
||||
assertThat(nonDataNonMasterAffectedResources.size(), equalTo(numberOfRedNonDataNonMasterNodes));
|
||||
Diagnosis.Definition nonDataNonMasterDiagnosisDefinition = nonDataNonMasterDiagnosis.definition();
|
||||
assertThat(nonDataNonMasterDiagnosisDefinition.cause(), equalTo("Disk is almost full."));
|
||||
assertThat(
|
||||
nonDataNonMasterDiagnosisDefinition.action(),
|
||||
equalTo("Please add capacity to the current nodes, or replace them with ones with higher capacity.")
|
||||
);
|
||||
}
|
||||
|
||||
public void testGetNodeIdsReportingStatus() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
Map<String, DiskHealthInfo> diskInfoByNode = new HashMap<>(discoveryNodes.size());
|
||||
Map<HealthStatus, Set<String>> statusToNodeIdMap = new HashMap<>(HealthStatus.values().length);
|
||||
for (DiscoveryNode node : discoveryNodes) {
|
||||
HealthStatus status = randomFrom(HealthStatus.values());
|
||||
final DiskHealthInfo diskHealthInfo = randomBoolean()
|
||||
? new DiskHealthInfo(status)
|
||||
: new DiskHealthInfo(status, randomFrom(DiskHealthInfo.Cause.values()));
|
||||
Set<String> nodeIdsForStatus = statusToNodeIdMap.computeIfAbsent(status, k -> new HashSet<>());
|
||||
nodeIdsForStatus.add(node.getId());
|
||||
diskInfoByNode.put(node.getId(), diskHealthInfo);
|
||||
}
|
||||
for (HealthStatus status : HealthStatus.values()) {
|
||||
assertThat(
|
||||
DiskHealthIndicatorService.getNodeIdsReportingStatus(diskInfoByNode, status),
|
||||
equalTo(statusToNodeIdMap.get(status) == null ? Set.of() : statusToNodeIdMap.get(status))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetRolesOnNodes() {
|
||||
Set<DiscoveryNodeRole> roles = new HashSet<>(
|
||||
randomSubsetOf(randomIntBetween(1, DiscoveryNodeRole.roles().size()), DiscoveryNodeRole.roles())
|
||||
);
|
||||
Set<DiscoveryNode> discoveryNodes = createNodes(roles);
|
||||
ClusterService clusterService = createClusterService(false, discoveryNodes);
|
||||
Set<DiscoveryNodeRole> result = DiskHealthIndicatorService.getRolesOnNodes(
|
||||
discoveryNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
|
||||
clusterService.state()
|
||||
);
|
||||
assertThat(result, equalTo(roles));
|
||||
}
|
||||
|
||||
public void testGetNodesWithDataRole() {
|
||||
Set<DiscoveryNodeRole> nonDataRoles = Set.of(
|
||||
DiscoveryNodeRole.MASTER_ROLE,
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE
|
||||
);
|
||||
Set<DiscoveryNodeRole> dataRoles = Set.of(
|
||||
DiscoveryNodeRole.DATA_ROLE,
|
||||
DiscoveryNodeRole.DATA_COLD_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_HOT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_WARM_NODE_ROLE
|
||||
);
|
||||
Set<DiscoveryNode> nonDataNodes = createNodes(
|
||||
new HashSet<>(randomSubsetOf(randomIntBetween(1, nonDataRoles.size()), nonDataRoles))
|
||||
);
|
||||
Set<DiscoveryNode> pureDataNodes = createNodes(new HashSet<>(randomSubsetOf(randomIntBetween(1, dataRoles.size()), dataRoles)));
|
||||
Set<DiscoveryNode> mixedNodes = createNodes(
|
||||
Stream.concat(
|
||||
randomSubsetOf(randomIntBetween(1, nonDataRoles.size()), nonDataRoles).stream(),
|
||||
randomSubsetOf(randomIntBetween(1, dataRoles.size()), dataRoles).stream()
|
||||
).collect(Collectors.toSet())
|
||||
);
|
||||
Set<DiscoveryNode> allNodes = Stream.concat(Stream.concat(nonDataNodes.stream(), pureDataNodes.stream()), mixedNodes.stream())
|
||||
.collect(Collectors.toSet());
|
||||
ClusterService clusterService = createClusterService(false, allNodes);
|
||||
assertThat(
|
||||
DiskHealthIndicatorService.getNodesWithDataRole(
|
||||
allNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
|
||||
clusterService.state()
|
||||
),
|
||||
equalTo(Stream.concat(pureDataNodes.stream(), mixedNodes.stream()).map(DiscoveryNode::getId).collect(Collectors.toSet()))
|
||||
);
|
||||
}
|
||||
|
||||
public void testGetNodesWithMasterRole() {
|
||||
Set<DiscoveryNodeRole> nonDataRoles = Set.of(
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE,
|
||||
DiscoveryNodeRole.DATA_ROLE,
|
||||
DiscoveryNodeRole.DATA_COLD_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_HOT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_WARM_NODE_ROLE
|
||||
);
|
||||
Set<DiscoveryNode> nonMasterNodes = createNodes(
|
||||
new HashSet<>(randomSubsetOf(randomIntBetween(1, nonDataRoles.size()), nonDataRoles))
|
||||
);
|
||||
Set<DiscoveryNode> pureMasterNodes = createNodes(Set.of(DiscoveryNodeRole.MASTER_ROLE));
|
||||
Set<DiscoveryNode> mixedNodes = createNodes(
|
||||
Stream.concat(
|
||||
randomSubsetOf(randomIntBetween(1, nonDataRoles.size()), nonDataRoles).stream(),
|
||||
Stream.of(DiscoveryNodeRole.MASTER_ROLE)
|
||||
).collect(Collectors.toSet())
|
||||
);
|
||||
Set<DiscoveryNode> allNodes = Stream.concat(Stream.concat(nonMasterNodes.stream(), pureMasterNodes.stream()), mixedNodes.stream())
|
||||
.collect(Collectors.toSet());
|
||||
ClusterService clusterService = createClusterService(false, allNodes);
|
||||
assertThat(
|
||||
DiskHealthIndicatorService.getNodesWithMasterRole(
|
||||
allNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
|
||||
clusterService.state()
|
||||
),
|
||||
equalTo(Stream.concat(pureMasterNodes.stream(), mixedNodes.stream()).map(DiscoveryNode::getId).collect(Collectors.toSet()))
|
||||
);
|
||||
}
|
||||
|
||||
public void testGetNodesWithNonDataNonMasterRoles() {
|
||||
Set<DiscoveryNodeRole> dataAndMasterRoles = Set.of(
|
||||
DiscoveryNodeRole.MASTER_ROLE,
|
||||
DiscoveryNodeRole.DATA_ROLE,
|
||||
DiscoveryNodeRole.DATA_COLD_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_FROZEN_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_HOT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE,
|
||||
DiscoveryNodeRole.DATA_WARM_NODE_ROLE
|
||||
);
|
||||
Set<DiscoveryNodeRole> nonDataNonMasterRoles = Set.of(
|
||||
DiscoveryNodeRole.ML_ROLE,
|
||||
DiscoveryNodeRole.INGEST_ROLE,
|
||||
DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE,
|
||||
DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE,
|
||||
DiscoveryNodeRole.TRANSFORM_ROLE
|
||||
);
|
||||
Set<DiscoveryNode> dataAndMasterNodes = createNodes(
|
||||
new HashSet<>(randomSubsetOf(randomIntBetween(1, dataAndMasterRoles.size()), dataAndMasterRoles))
|
||||
);
|
||||
Set<DiscoveryNode> pureNonDataNonMasterNodes = createNodes(
|
||||
new HashSet<>(randomSubsetOf(randomIntBetween(1, nonDataNonMasterRoles.size()), nonDataNonMasterRoles))
|
||||
);
|
||||
Set<DiscoveryNode> mixedNodes = createNodes(
|
||||
Stream.concat(
|
||||
randomSubsetOf(randomIntBetween(1, dataAndMasterRoles.size()), dataAndMasterRoles).stream(),
|
||||
randomSubsetOf(randomIntBetween(1, nonDataNonMasterRoles.size()), nonDataNonMasterRoles).stream()
|
||||
).collect(Collectors.toSet())
|
||||
);
|
||||
Set<DiscoveryNode> allNodes = Stream.concat(
|
||||
Stream.concat(dataAndMasterNodes.stream(), pureNonDataNonMasterNodes.stream()),
|
||||
mixedNodes.stream()
|
||||
).collect(Collectors.toSet());
|
||||
ClusterService clusterService = createClusterService(false, allNodes);
|
||||
assertThat(
|
||||
DiskHealthIndicatorService.getNodesWithNonDataNonMasterRoles(
|
||||
allNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()),
|
||||
clusterService.state()
|
||||
),
|
||||
equalTo(
|
||||
Stream.concat(pureNonDataNonMasterNodes.stream(), mixedNodes.stream()).map(DiscoveryNode::getId).collect(Collectors.toSet())
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public void testGetIndicesForNodes() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
int numberOfRedNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, numberOfRedNodes, discoveryNodes);
|
||||
Set<String> redNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> nonRedNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus) == false)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap = new HashMap<>();
|
||||
int numberOfIndices = randomIntBetween(1, 1000);
|
||||
Set<String> redNodeIndices = new HashSet<>();
|
||||
Set<String> nonRedNodeIndices = new HashSet<>();
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
String indexName = randomAlphaOfLength(20);
|
||||
if (randomBoolean()) {
|
||||
indexNameToNodeIdsMap.put(indexName, redNodeIds);
|
||||
redNodeIndices.add(indexName);
|
||||
} else {
|
||||
indexNameToNodeIdsMap.put(indexName, nonRedNodeIds);
|
||||
nonRedNodeIndices.add(indexName);
|
||||
}
|
||||
}
|
||||
ClusterService clusterService = createClusterService(Set.of(), discoveryNodes, indexNameToNodeIdsMap);
|
||||
assertThat(DiskHealthIndicatorService.getIndicesForNodes(redNodeIds, clusterService.state()), equalTo(redNodeIndices));
|
||||
assertThat(DiskHealthIndicatorService.getIndicesForNodes(nonRedNodeIds, clusterService.state()), equalTo(nonRedNodeIndices));
|
||||
}
|
||||
|
||||
public void testGetNodeIdsForIndices() {
|
||||
Set<DiscoveryNode> discoveryNodes = createNodesWithAllRoles();
|
||||
HealthStatus expectedStatus = HealthStatus.RED;
|
||||
int numberOfRedNodes = randomIntBetween(1, discoveryNodes.size());
|
||||
HealthInfo healthInfo = createHealthInfo(expectedStatus, numberOfRedNodes, discoveryNodes);
|
||||
Set<String> redNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus))
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> nonRedNodeIds = healthInfo.diskInfoByNode()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> entry.getValue().healthStatus().equals(expectedStatus) == false)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toSet());
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap = new HashMap<>();
|
||||
int numberOfIndices = randomIntBetween(1, 1000);
|
||||
Set<String> redNodeIndices = new HashSet<>();
|
||||
Set<String> nonRedNodeIndices = new HashSet<>();
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
String indexName = randomAlphaOfLength(20);
|
||||
if (randomBoolean()) {
|
||||
indexNameToNodeIdsMap.put(indexName, redNodeIds);
|
||||
redNodeIndices.add(indexName);
|
||||
} else {
|
||||
indexNameToNodeIdsMap.put(indexName, nonRedNodeIds);
|
||||
nonRedNodeIndices.add(indexName);
|
||||
}
|
||||
}
|
||||
ClusterService clusterService = createClusterService(Set.of(), discoveryNodes, indexNameToNodeIdsMap);
|
||||
assertThat(DiskHealthIndicatorService.getNodeIdsForIndices(redNodeIndices, clusterService.state()), equalTo(redNodeIds));
|
||||
assertThat(DiskHealthIndicatorService.getNodeIdsForIndices(nonRedNodeIndices, clusterService.state()), equalTo(nonRedNodeIds));
|
||||
}
|
||||
|
||||
private Set<DiscoveryNode> createNodesWithAllRoles() {
|
||||
return createNodes(DiscoveryNodeRole.roles());
|
||||
}
|
||||
|
||||
private Set<DiscoveryNode> createNodes(Set<DiscoveryNodeRole> roles) {
|
||||
int numberOfNodes = randomIntBetween(1, 200);
|
||||
Set<DiscoveryNode> discoveryNodes = new HashSet<>();
|
||||
for (int i = 0; i < numberOfNodes; i++) {
|
||||
discoveryNodes.add(
|
||||
new DiscoveryNode(
|
||||
randomAlphaOfLength(30),
|
||||
UUID.randomUUID().toString(),
|
||||
buildNewFakeTransportAddress(),
|
||||
Collections.emptyMap(),
|
||||
roles,
|
||||
Version.CURRENT
|
||||
)
|
||||
);
|
||||
}
|
||||
return discoveryNodes;
|
||||
}
|
||||
|
||||
private HealthInfo createHealthInfo(HealthStatus expectedStatus, Set<DiscoveryNode> nodes) {
|
||||
return createHealthInfo(expectedStatus, 1, nodes);
|
||||
}
|
||||
|
||||
private HealthInfo createHealthInfo(HealthStatus expectedStatus, int numberOfNodesWithExpectedStatus, Set<DiscoveryNode> nodes) {
|
||||
assert numberOfNodesWithExpectedStatus <= nodes.size();
|
||||
Map<String, DiskHealthInfo> diskInfoByNode = new HashMap<>(nodes.size());
|
||||
createHealthInfoForNodes(diskInfoByNode, expectedStatus, numberOfNodesWithExpectedStatus, nodes);
|
||||
return new HealthInfo(diskInfoByNode);
|
||||
}
|
||||
|
||||
/*
|
||||
* This version of the method is similar to the one above, except it applies three different statuses to three different sets of nodes.
|
||||
*/
|
||||
private HealthInfo createHealthInfo(
|
||||
HealthStatus expectedStatus1,
|
||||
int numberOfNodesWithExpectedStatus1,
|
||||
Set<DiscoveryNode> nodes1,
|
||||
HealthStatus expectedStatus2,
|
||||
int numberOfNodesWithExpectedStatus2,
|
||||
Set<DiscoveryNode> nodes2,
|
||||
HealthStatus expectedStatus3,
|
||||
int numberOfNodesWithExpectedStatus3,
|
||||
Set<DiscoveryNode> nodes3
|
||||
) {
|
||||
assert numberOfNodesWithExpectedStatus1 <= nodes1.size();
|
||||
assert numberOfNodesWithExpectedStatus2 <= nodes2.size();
|
||||
assert numberOfNodesWithExpectedStatus3 <= nodes3.size();
|
||||
Map<String, DiskHealthInfo> diskInfoByNode = new HashMap<>();
|
||||
createHealthInfoForNodes(diskInfoByNode, expectedStatus1, numberOfNodesWithExpectedStatus1, nodes1);
|
||||
createHealthInfoForNodes(diskInfoByNode, expectedStatus2, numberOfNodesWithExpectedStatus2, nodes2);
|
||||
createHealthInfoForNodes(diskInfoByNode, expectedStatus3, numberOfNodesWithExpectedStatus3, nodes3);
|
||||
return new HealthInfo(diskInfoByNode);
|
||||
}
|
||||
|
||||
private void createHealthInfoForNodes(
|
||||
Map<String, DiskHealthInfo> diskInfoByNode,
|
||||
HealthStatus expectedStatus,
|
||||
int numberOfNodesWithExpectedStatus,
|
||||
Set<DiscoveryNode> nodes
|
||||
) {
|
||||
int numberWithNonGreenStatus3 = 0;
|
||||
for (DiscoveryNode node : nodes) {
|
||||
final DiskHealthInfo diskHealthInfo;
|
||||
if (numberWithNonGreenStatus3 < numberOfNodesWithExpectedStatus) {
|
||||
diskHealthInfo = randomBoolean()
|
||||
? new DiskHealthInfo(expectedStatus)
|
||||
: new DiskHealthInfo(expectedStatus, randomFrom(DiskHealthInfo.Cause.values()));
|
||||
numberWithNonGreenStatus3++;
|
||||
} else {
|
||||
diskHealthInfo = randomBoolean()
|
||||
? new DiskHealthInfo(HealthStatus.GREEN)
|
||||
: new DiskHealthInfo(HealthStatus.GREEN, randomFrom(DiskHealthInfo.Cause.values()));
|
||||
}
|
||||
diskInfoByNode.put(node.getId(), diskHealthInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private static ClusterService createClusterService(boolean blockIndex, Set<DiscoveryNode> nodes) {
|
||||
return createClusterService(1, blockIndex ? 1 : 0, nodes);
|
||||
}
|
||||
|
||||
private static ClusterService createClusterService(int numberOfIndices, int numberOfIndicesToBlock, Set<DiscoveryNode> nodes) {
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap = new HashMap<>();
|
||||
Set<String> blockedIndices = new HashSet<>(numberOfIndicesToBlock);
|
||||
for (int i = 0; i < numberOfIndices; i++) {
|
||||
String indexName = randomAlphaOfLength(20);
|
||||
/*
|
||||
* The following effectively makes it so that the index does not exist on any node. That's not realistic, but works out for
|
||||
* tests where we want for there to be no indices on red/yellow nodes
|
||||
*/
|
||||
indexNameToNodeIdsMap.put(indexName, Set.of());
|
||||
if (i < numberOfIndicesToBlock) {
|
||||
blockedIndices.add(indexName);
|
||||
}
|
||||
}
|
||||
return createClusterService(blockedIndices, nodes, indexNameToNodeIdsMap);
|
||||
}
|
||||
|
||||
private static ClusterService createClusterService(
|
||||
Set<String> blockedIndices,
|
||||
Set<DiscoveryNode> nodes,
|
||||
Map<String, Set<String>> indexNameToNodeIdsMap
|
||||
) {
|
||||
RoutingTable routingTable = mock(RoutingTable.class);
|
||||
List<ShardRouting> shardRoutings = new ArrayList<>();
|
||||
when(routingTable.allShards()).thenReturn(shardRoutings);
|
||||
|
||||
ClusterBlocks.Builder clusterBlocksBuilder = new ClusterBlocks.Builder();
|
||||
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
|
||||
List<ClusterBlocks> clusterBlocksList = new ArrayList<>();
|
||||
for (String indexName : indexNameToNodeIdsMap.keySet()) {
|
||||
boolean blockIndex = blockedIndices.contains(indexName);
|
||||
IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings(
|
||||
Settings.builder()
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), blockIndex)
|
||||
.build()
|
||||
).build();
|
||||
indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata);
|
||||
if (blockIndex) {
|
||||
ClusterBlocks clusterBlocks = clusterBlocksBuilder.addBlocks(indexMetadata).build();
|
||||
clusterBlocksList.add(clusterBlocks);
|
||||
}
|
||||
for (String nodeId : indexNameToNodeIdsMap.get(indexName)) {
|
||||
ShardRouting shardRouting = TestShardRouting.newShardRouting(
|
||||
indexMetadata.getIndex().getName(),
|
||||
randomIntBetween(1, 5),
|
||||
nodeId,
|
||||
randomBoolean(),
|
||||
ShardRoutingState.STARTED
|
||||
);
|
||||
shardRoutings.add(shardRouting);
|
||||
}
|
||||
}
|
||||
Metadata.Builder metadataBuilder = Metadata.builder();
|
||||
metadataBuilder.indices(indexMetadataMap);
|
||||
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder();
|
||||
for (DiscoveryNode node : nodes) {
|
||||
nodesBuilder.add(node);
|
||||
}
|
||||
ClusterState.Builder clusterStateBuilder = ClusterState.builder(new ClusterName("test-cluster"))
|
||||
.routingTable(routingTable)
|
||||
.metadata(metadataBuilder.build())
|
||||
.nodes(nodesBuilder);
|
||||
for (ClusterBlocks clusterBlocks : clusterBlocksList) {
|
||||
clusterStateBuilder.blocks(clusterBlocks);
|
||||
}
|
||||
clusterStateBuilder.nodes(nodesBuilder);
|
||||
ClusterState clusterState = clusterStateBuilder.build();
|
||||
var clusterService = mock(ClusterService.class);
|
||||
when(clusterService.state()).thenReturn(clusterState);
|
||||
return clusterService;
|
||||
}
|
||||
|
||||
private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
xcontent.toXContent(builder, ToXContent.EMPTY_PARAMS);
|
||||
XContentParser parser = XContentType.JSON.xContent()
|
||||
.createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, BytesReference.bytes(builder).streamInput());
|
||||
return parser.map();
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue