Add multi-project support for health indicator shards_availability (#125512)

This commit is contained in:
Sam Xiao 2025-03-31 11:12:52 -04:00 committed by GitHub
parent fd2492f935
commit bddc14c232
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 695 additions and 201 deletions

View file

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.DefaultProjectResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
@ -178,7 +179,12 @@ public class ShardsAvailabilityHealthIndicatorBenchmark {
new TaskManager(Settings.EMPTY, threadPool, Collections.emptySet())
);
clusterService.getClusterApplierService().setInitialState(initialClusterState);
indicatorService = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService, new SystemIndices(List.of()));
indicatorService = new ShardsAvailabilityHealthIndicatorService(
clusterService,
allocationService,
new SystemIndices(List.of()),
DefaultProjectResolver.INSTANCE
);
}
private int toInt(String v) {

View file

@ -27,7 +27,12 @@ public class ShardsAvailabilityPlugin extends Plugin implements HealthPlugin {
@Override
public Collection<?> createComponents(PluginServices services) {
this.shardHealthService.set(
new ShardsAvailabilityHealthIndicatorService(services.clusterService(), services.allocationService(), services.systemIndices())
new ShardsAvailabilityHealthIndicatorService(
services.clusterService(),
services.allocationService(),
services.systemIndices(),
services.projectResolver()
)
);
return Set.of(this.shardHealthService.get());
}

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.shrink.ResizeType;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
@ -133,8 +134,9 @@ public class ShardsAvailabilityHealthIndicatorServiceIT extends ESIntegTestCase
var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
var allocationService = internalCluster().getCurrentMasterNodeInstance(AllocationService.class);
var systemIndices = internalCluster().getCurrentMasterNodeInstance(SystemIndices.class);
var projectResolver = internalCluster().getCurrentMasterNodeInstance(ProjectResolver.class);
var service = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService, systemIndices);
var service = new ShardsAvailabilityHealthIndicatorService(clusterService, allocationService, systemIndices, projectResolver);
var states = new ArrayList<RoutingNodesAndHealth>();
var listener = new ClusterStateListener() {
@Override

View file

@ -18,13 +18,16 @@ import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
@ -55,6 +58,7 @@ import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.health.ImpactArea;
import org.elasticsearch.health.SimpleHealthIndicatorDetails;
import org.elasticsearch.health.node.HealthInfo;
import org.elasticsearch.health.node.ProjectIndexName;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
@ -72,7 +76,6 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toMap;
@ -90,8 +93,8 @@ import static org.elasticsearch.health.Diagnosis.Resource.Type.INDEX;
import static org.elasticsearch.health.HealthStatus.GREEN;
import static org.elasticsearch.health.HealthStatus.RED;
import static org.elasticsearch.health.HealthStatus.YELLOW;
import static org.elasticsearch.health.node.HealthIndicatorDisplayValues.getTruncatedIndices;
import static org.elasticsearch.health.node.HealthIndicatorDisplayValues.indicesComparatorByPriorityAndName;
import static org.elasticsearch.health.node.HealthIndicatorDisplayValues.getTruncatedProjectIndices;
import static org.elasticsearch.health.node.HealthIndicatorDisplayValues.indicesComparatorByPriorityAndProjectIndex;
/**
* This indicator reports health for shards.
@ -132,19 +135,22 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
private final AllocationService allocationService;
private final SystemIndices systemIndices;
protected final ProjectResolver projectResolver;
private volatile TimeValue replicaUnassignedBufferTime;
public ShardsAvailabilityHealthIndicatorService(
ClusterService clusterService,
AllocationService allocationService,
SystemIndices systemIndices
SystemIndices systemIndices,
ProjectResolver projectResolver
) {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.systemIndices = systemIndices;
this.replicaUnassignedBufferTime = REPLICA_UNASSIGNED_BUFFER_TIME.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(REPLICA_UNASSIGNED_BUFFER_TIME, this::setReplicaUnassignedBufferTime);
this.projectResolver = projectResolver;
}
private void setReplicaUnassignedBufferTime(TimeValue replicaUnassignedBufferTime) {
@ -189,12 +195,17 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
boolean verbose,
TimeValue replicaUnassignedBufferTime
) {
for (IndexRoutingTable indexShardRouting : state.globalRoutingTable().indexRouting()) {
for (int i = 0; i < indexShardRouting.size(); i++) {
IndexShardRoutingTable shardRouting = indexShardRouting.shard(i);
status.addPrimary(shardRouting.primaryShard(), state, shutdown, verbose);
for (ShardRouting replicaShard : shardRouting.replicaShards()) {
status.addReplica(replicaShard, state, shutdown, verbose, replicaUnassignedBufferTime);
for (Map.Entry<ProjectId, RoutingTable> entries : state.globalRoutingTable().routingTables().entrySet()) {
ProjectId projectId = entries.getKey();
RoutingTable projectRoutingTable = entries.getValue();
for (IndexRoutingTable indexShardRouting : projectRoutingTable.indicesRouting().values()) {
for (int i = 0; i < indexShardRouting.size(); i++) {
IndexShardRoutingTable shardRouting = indexShardRouting.shard(i);
status.addPrimary(projectId, shardRouting.primaryShard(), state, shutdown, verbose);
for (ShardRouting replicaShard : shardRouting.replicaShards()) {
status.addReplica(projectId, replicaShard, state, shutdown, verbose, replicaUnassignedBufferTime);
}
}
}
}
@ -460,35 +471,37 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
int initializing = 0;
int started = 0;
int relocating = 0;
public final Set<String> indicesWithUnavailableShards = new HashSet<>();
public final Set<String> indicesWithAllShardsUnavailable = new HashSet<>();
public final Set<ProjectIndexName> indicesWithUnavailableShards = new HashSet<>();
public final Set<ProjectIndexName> indicesWithAllShardsUnavailable = new HashSet<>();
// We keep the searchable snapshots separately as long as the original index is still available
// This is checked during the post-processing
public SearchableSnapshotsState searchableSnapshotsState = new SearchableSnapshotsState();
final Map<Diagnosis.Definition, Set<String>> diagnosisDefinitions = new HashMap<>();
final Map<Diagnosis.Definition, Set<ProjectIndexName>> diagnosisDefinitions = new HashMap<>();
public void increment(
ProjectId projectId,
ShardRouting routing,
ClusterState state,
NodesShutdownMetadata shutdowns,
boolean verbose,
TimeValue replicaUnassignedBufferTime
) {
boolean isNew = isUnassignedDueToNewInitialization(routing, state);
boolean isNew = isUnassignedDueToNewInitialization(projectId, routing, state);
boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns);
long replicaUnassignedCutoffTime = Instant.now().toEpochMilli() - replicaUnassignedBufferTime.millis();
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(routing, state)
&& isNewlyCreatedAndInitializingReplica(routing, state, replicaUnassignedCutoffTime) == false;
boolean allUnavailable = areAllShardsOfThisTypeUnavailable(projectId, routing, state)
&& isNewlyCreatedAndInitializingReplica(projectId, routing, state, replicaUnassignedCutoffTime) == false;
ProjectIndexName projectIndex = new ProjectIndexName(projectId, routing.getIndexName());
if (allUnavailable) {
indicesWithAllShardsUnavailable.add(routing.getIndexName());
indicesWithAllShardsUnavailable.add(projectIndex);
}
if ((routing.active() || isRestarting || isNew) == false) {
String indexName = routing.getIndexName();
Settings indexSettings = state.metadata().indexMetadata(routing.index()).getSettings();
Settings indexSettings = state.metadata().getProject(projectId).index(routing.index()).getSettings();
if (SearchableSnapshotsSettings.isSearchableSnapshotStore(indexSettings)) {
searchableSnapshotsState.addSearchableSnapshotWithUnavailableShard(indexName);
searchableSnapshotsState.addSearchableSnapshotWithUnavailableShard(projectIndex);
} else {
indicesWithUnavailableShards.add(indexName);
indicesWithUnavailableShards.add(projectIndex);
}
}
@ -501,16 +514,14 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
} else {
unassigned++;
if (verbose) {
diagnoseUnassignedShardRouting(routing, state).forEach(
definition -> addDefinition(definition, routing.getIndexName())
);
diagnoseUnassignedShardRouting(routing, state).forEach(definition -> addDefinition(definition, projectIndex));
}
}
}
case INITIALIZING -> {
initializing++;
if (verbose) {
addDefinition(DIAGNOSIS_WAIT_FOR_INITIALIZATION, routing.getIndexName());
addDefinition(DIAGNOSIS_WAIT_FOR_INITIALIZATION, projectIndex);
}
}
case STARTED -> started++;
@ -526,8 +537,8 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
return indicesWithAllShardsUnavailable.isEmpty() == false;
}
private void addDefinition(Diagnosis.Definition diagnosisDefinition, String indexName) {
diagnosisDefinitions.computeIfAbsent(diagnosisDefinition, (k) -> new HashSet<>()).add(indexName);
private void addDefinition(Diagnosis.Definition diagnosisDefinition, ProjectIndexName projectIndexName) {
diagnosisDefinitions.computeIfAbsent(diagnosisDefinition, (k) -> new HashSet<>()).add(projectIndexName);
}
}
@ -536,11 +547,10 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
* example: if a replica is passed then this will return true if ALL replicas are unassigned,
* but if at least one is assigned, it will return false.
*/
boolean areAllShardsOfThisTypeUnavailable(ShardRouting routing, ClusterState state) {
return StreamSupport.stream(
state.routingTable().allActiveShardsGrouped(new String[] { routing.getIndexName() }, true).spliterator(),
false
)
boolean areAllShardsOfThisTypeUnavailable(ProjectId projectId, ShardRouting routing, ClusterState state) {
return state.routingTable(projectId)
.allActiveShardsGrouped(new String[] { routing.getIndexName() }, true)
.stream()
.flatMap(shardIter -> shardIter.getShardRoutings().stream())
.filter(sr -> sr.shardId().equals(routing.shardId()))
.filter(sr -> sr.primary() == routing.primary())
@ -551,19 +561,23 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
* Returns true if the given shard is a replica that is only unassigned due to its primary being
* newly created. See {@link ClusterShardHealth#getInactivePrimaryHealth(ShardRouting)} for more
* information.
*
* We use this information when considering whether a cluster should turn red. For some cases
* (a newly created index having unassigned replicas for example), we don't want the cluster
* to turn "unhealthy" for the tiny amount of time before the shards are allocated.
*/
static boolean isNewlyCreatedAndInitializingReplica(ShardRouting routing, ClusterState state, long replicaUnassignedCutoffTime) {
static boolean isNewlyCreatedAndInitializingReplica(
ProjectId projectId,
ShardRouting routing,
ClusterState state,
long replicaUnassignedCutoffTime
) {
if (routing.active()) {
return false;
}
if (routing.primary()) {
return false;
}
ShardRouting primary = state.routingTable().shardRoutingTable(routing.shardId()).primaryShard();
ShardRouting primary = state.routingTable(projectId).shardRoutingTable(routing.shardId()).primaryShard();
if (primary.active() == false) {
return ClusterShardHealth.getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
}
@ -589,13 +603,15 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
return now - restartingAllocationDelayExpiration <= 0;
}
private static boolean isUnassignedDueToNewInitialization(ShardRouting routing, ClusterState state) {
private static boolean isUnassignedDueToNewInitialization(ProjectId projectId, ShardRouting routing, ClusterState state) {
if (routing.active()) {
return false;
}
// If the primary is inactive for unexceptional events in the cluster lifecycle, both the primary and the
// replica are considered new initializations.
ShardRouting primary = routing.primary() ? routing : state.routingTable().shardRoutingTable(routing.shardId()).primaryShard();
ShardRouting primary = routing.primary()
? routing
: state.routingTable(projectId).shardRoutingTable(routing.shardId()).primaryShard();
return primary.active() == false && getInactivePrimaryHealth(primary) == ClusterHealthStatus.YELLOW;
}
@ -950,18 +966,19 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
this.clusterMetadata = clusterMetadata;
}
void addPrimary(ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
primaries.increment(routing, state, shutdowns, verbose, TimeValue.MINUS_ONE);
void addPrimary(ProjectId projectId, ShardRouting routing, ClusterState state, NodesShutdownMetadata shutdowns, boolean verbose) {
primaries.increment(projectId, routing, state, shutdowns, verbose, TimeValue.MINUS_ONE);
}
void addReplica(
ProjectId projectId,
ShardRouting routing,
ClusterState state,
NodesShutdownMetadata shutdowns,
boolean verbose,
TimeValue replicaUnassignedBufferTime
) {
replicas.increment(routing, state, shutdowns, verbose, replicaUnassignedBufferTime);
replicas.increment(projectId, routing, state, shutdowns, verbose, replicaUnassignedBufferTime);
}
void updateSearchableSnapshotsOfAvailableIndices() {
@ -1068,7 +1085,11 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
"Cannot add data to %d %s [%s]. Searches might return incomplete results.",
primaries.indicesWithUnavailableShards.size(),
primaries.indicesWithUnavailableShards.size() == 1 ? "index" : "indices",
getTruncatedIndices(primaries.indicesWithUnavailableShards, clusterMetadata)
getTruncatedProjectIndices(
primaries.indicesWithUnavailableShards,
clusterMetadata,
projectResolver.supportsMultipleProjects()
)
);
impacts.add(
new HealthIndicatorImpact(
@ -1080,14 +1101,18 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
)
);
}
Set<String> readOnlyIndicesWithUnavailableShards = primaries.searchableSnapshotsState.getRedSearchableSnapshots();
Set<ProjectIndexName> readOnlyIndicesWithUnavailableShards = primaries.searchableSnapshotsState.getRedSearchableSnapshots();
if (readOnlyIndicesWithUnavailableShards.isEmpty() == false) {
String impactDescription = String.format(
Locale.ROOT,
"Searching %d %s [%s] might return incomplete results.",
readOnlyIndicesWithUnavailableShards.size(),
readOnlyIndicesWithUnavailableShards.size() == 1 ? "index" : "indices",
getTruncatedIndices(readOnlyIndicesWithUnavailableShards, clusterMetadata)
getTruncatedProjectIndices(
readOnlyIndicesWithUnavailableShards,
clusterMetadata,
projectResolver.supportsMultipleProjects()
)
);
impacts.add(
new HealthIndicatorImpact(
@ -1104,7 +1129,7 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
* that is reported as unavailable. That replica is likely being promoted to primary. The only impact that matters at this
* point is the one above, which has already been reported for this index.
*/
Set<String> indicesWithUnavailableReplicasOnly = new HashSet<>(replicas.indicesWithUnavailableShards);
Set<ProjectIndexName> indicesWithUnavailableReplicasOnly = new HashSet<>(replicas.indicesWithUnavailableShards);
indicesWithUnavailableReplicasOnly.removeAll(primaries.indicesWithUnavailableShards);
if (indicesWithUnavailableReplicasOnly.isEmpty() == false) {
String impactDescription = String.format(
@ -1112,7 +1137,11 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
"Searches might be slower than usual. Fewer redundant copies of the data exist on %d %s [%s].",
indicesWithUnavailableReplicasOnly.size(),
indicesWithUnavailableReplicasOnly.size() == 1 ? "index" : "indices",
getTruncatedIndices(indicesWithUnavailableReplicasOnly, clusterMetadata)
getTruncatedProjectIndices(
indicesWithUnavailableReplicasOnly,
clusterMetadata,
projectResolver.supportsMultipleProjects()
)
);
impacts.add(
new HealthIndicatorImpact(NAME, REPLICA_UNASSIGNED_IMPACT_ID, 2, impactDescription, List.of(ImpactArea.SEARCH))
@ -1129,9 +1158,9 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
*/
public List<Diagnosis> getDiagnosis(boolean verbose, int maxAffectedResourcesCount) {
if (verbose) {
Map<Diagnosis.Definition, Set<String>> diagnosisToAffectedIndices = new HashMap<>(primaries.diagnosisDefinitions);
Map<Diagnosis.Definition, Set<ProjectIndexName>> diagnosisToAffectedIndices = new HashMap<>(primaries.diagnosisDefinitions);
replicas.diagnosisDefinitions.forEach((diagnosisDef, indicesWithReplicasUnassigned) -> {
Set<String> indicesWithPrimariesUnassigned = diagnosisToAffectedIndices.get(diagnosisDef);
Set<ProjectIndexName> indicesWithPrimariesUnassigned = diagnosisToAffectedIndices.get(diagnosisDef);
if (indicesWithPrimariesUnassigned == null) {
diagnosisToAffectedIndices.put(diagnosisDef, indicesWithReplicasUnassigned);
} else {
@ -1145,13 +1174,14 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
return diagnosisToAffectedIndices.entrySet().stream().map(e -> {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(1);
if (e.getKey().equals(ACTION_RESTORE_FROM_SNAPSHOT)) {
Set<String> restoreFromSnapshotIndices = e.getValue();
Set<ProjectIndexName> restoreFromSnapshotIndices = e.getValue();
if (restoreFromSnapshotIndices != null && restoreFromSnapshotIndices.isEmpty() == false) {
affectedResources = getRestoreFromSnapshotAffectedResources(
clusterMetadata,
systemIndices,
restoreFromSnapshotIndices,
maxAffectedResourcesCount
maxAffectedResourcesCount,
projectResolver.supportsMultipleProjects()
);
}
} else {
@ -1160,7 +1190,13 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
INDEX,
e.getValue()
.stream()
.sorted(indicesComparatorByPriorityAndName(clusterMetadata))
.sorted(
indicesComparatorByPriorityAndProjectIndex(
clusterMetadata,
projectResolver.supportsMultipleProjects()
)
)
.map(projectIndex -> projectIndex.toString(projectResolver.supportsMultipleProjects()))
.limit(Math.min(e.getValue().size(), maxAffectedResourcesCount))
.collect(Collectors.toList())
)
@ -1183,27 +1219,23 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
static List<Diagnosis.Resource> getRestoreFromSnapshotAffectedResources(
Metadata metadata,
SystemIndices systemIndices,
Set<String> restoreFromSnapshotIndices,
int maxAffectedResourcesCount
Set<ProjectIndexName> restoreFromSnapshotIndices,
int maxAffectedResourcesCount,
boolean supportsMultipleProjects
) {
List<Diagnosis.Resource> affectedResources = new ArrayList<>(2);
Set<String> affectedIndices = new HashSet<>(restoreFromSnapshotIndices);
Set<ProjectId> affectedProjects = restoreFromSnapshotIndices.stream().map(ProjectIndexName::projectId).collect(toSet());
Set<ProjectIndexName> affectedIndices = new HashSet<>(restoreFromSnapshotIndices);
Set<String> affectedFeatureStates = new HashSet<>();
Map<String, Set<String>> featureToSystemIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getMatchingIndices(metadata.getProject()).stream())
.collect(toSet())
)
);
for (Map.Entry<String, Set<String>> featureToIndices : featureToSystemIndices.entrySet()) {
for (String featureIndex : featureToIndices.getValue()) {
Map<String, Set<ProjectIndexName>> featureToSystemIndices = getSystemIndicesForProjects(
systemIndices,
affectedProjects,
metadata
);
for (Map.Entry<String, Set<ProjectIndexName>> featureToIndices : featureToSystemIndices.entrySet()) {
for (ProjectIndexName featureIndex : featureToIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToIndices.getKey());
affectedIndices.remove(featureIndex);
@ -1211,22 +1243,16 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
}
}
Map<String, Set<String>> featureToDsBackingIndices = systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(descriptor -> descriptor.getBackingIndexNames(metadata).stream())
.collect(toSet())
)
);
Map<String, Set<ProjectIndexName>> featureToDsBackingIndices = getSystemDsBackingIndicesForProjects(
systemIndices,
affectedProjects,
metadata
);
// the shards_availability indicator works with indices so let's remove the feature states data streams backing indices from
// the list of affected indices (the feature state will cover the restore of these indices too)
for (Map.Entry<String, Set<String>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (String featureIndex : featureToBackingIndices.getValue()) {
for (Map.Entry<String, Set<ProjectIndexName>> featureToBackingIndices : featureToDsBackingIndices.entrySet()) {
for (ProjectIndexName featureIndex : featureToBackingIndices.getValue()) {
if (restoreFromSnapshotIndices.contains(featureIndex)) {
affectedFeatureStates.add(featureToBackingIndices.getKey());
affectedIndices.remove(featureIndex);
@ -1235,7 +1261,16 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
}
if (affectedIndices.isEmpty() == false) {
affectedResources.add(new Diagnosis.Resource(INDEX, affectedIndices.stream().limit(maxAffectedResourcesCount).toList()));
affectedResources.add(
new Diagnosis.Resource(
INDEX,
affectedIndices.stream()
.sorted(indicesComparatorByPriorityAndProjectIndex(metadata, supportsMultipleProjects))
.map(index -> index.toString(supportsMultipleProjects))
.limit(maxAffectedResourcesCount)
.toList()
)
);
}
if (affectedFeatureStates.isEmpty() == false) {
affectedResources.add(
@ -1244,35 +1279,97 @@ public class ShardsAvailabilityHealthIndicatorService implements HealthIndicator
}
return affectedResources;
}
/**
* Retrieve the system indices for the projects and group them by Feature
*/
private static Map<String, Set<ProjectIndexName>> getSystemIndicesForProjects(
SystemIndices systemIndices,
Set<ProjectId> projects,
Metadata metadata
) {
return systemIndices.getFeatures()
.stream()
.collect(
Collectors.toMap(
SystemIndices.Feature::getName,
feature -> feature.getIndexDescriptors()
.stream()
.flatMap(
descriptor -> projects.stream()
.flatMap(
projectId -> descriptor.getMatchingIndices(metadata.getProject(projectId))
.stream()
.map(index -> new ProjectIndexName(projectId, index))
)
)
.collect(Collectors.toSet())
)
);
}
/**
* Retrieve the backing indices for system data stream for the projects and group them by Feature
*/
private static Map<String, Set<ProjectIndexName>> getSystemDsBackingIndicesForProjects(
SystemIndices systemIndices,
Set<ProjectId> projects,
Metadata metadata
) {
return systemIndices.getFeatures()
.stream()
.collect(
toMap(
SystemIndices.Feature::getName,
feature -> feature.getDataStreamDescriptors()
.stream()
.flatMap(
descriptor -> projects.stream()
.flatMap(
projectId -> descriptor.getBackingIndexNames(metadata.getProject(projectId))
.stream()
.map(index -> new ProjectIndexName(projectId, index))
)
)
.collect(Collectors.toSet())
)
);
}
}
public static class SearchableSnapshotsState {
private final Set<String> searchableSnapshotWithUnavailableShard = new HashSet<>();
private final Set<String> searchableSnapshotWithOriginalIndexAvailable = new HashSet<>();
private final Set<ProjectIndexName> searchableSnapshotWithUnavailableShard = new HashSet<>();
private final Set<ProjectIndexName> searchableSnapshotWithOriginalIndexAvailable = new HashSet<>();
void addSearchableSnapshotWithUnavailableShard(String indexName) {
void addSearchableSnapshotWithUnavailableShard(ProjectIndexName indexName) {
searchableSnapshotWithUnavailableShard.add(indexName);
}
void addSearchableSnapshotWithOriginalIndexAvailable(String indexName) {
void addSearchableSnapshotWithOriginalIndexAvailable(ProjectIndexName indexName) {
searchableSnapshotWithOriginalIndexAvailable.add(indexName);
}
public Set<String> getRedSearchableSnapshots() {
public Set<ProjectIndexName> getRedSearchableSnapshots() {
return Sets.difference(searchableSnapshotWithUnavailableShard, searchableSnapshotWithOriginalIndexAvailable);
}
// If the original index of a searchable snapshot with unavailable shards is available then we remove the searchable snapshot
// from the list of the unavailable searchable snapshots because the data is available via the original index.
void updateSearchableSnapshotWithAvailableIndices(Metadata clusterMetadata, Set<String> indicesWithUnavailableShards) {
for (String index : searchableSnapshotWithUnavailableShard) {
assert clusterMetadata.getProject().index(index) != null : "Index metadata of index '" + index + "' should not be null";
Settings indexSettings = clusterMetadata.getProject().index(index).getSettings();
void updateSearchableSnapshotWithAvailableIndices(Metadata clusterMetadata, Set<ProjectIndexName> indicesWithUnavailableShards) {
for (ProjectIndexName projectIndex : searchableSnapshotWithUnavailableShard) {
ProjectId projectId = projectIndex.projectId();
String index = projectIndex.indexName();
assert clusterMetadata.getProject(projectId).index(index) != null
: "Index metadata of index '" + index + "' should not be null";
Settings indexSettings = clusterMetadata.getProject(projectId).index(index).getSettings();
String originalIndex = indexSettings.get(SearchableSnapshotsSettings.SEARCHABLE_SNAPSHOT_INDEX_NAME_SETTING_KEY);
ProjectIndexName originalProjectIndex = new ProjectIndexName(projectId, originalIndex);
if (originalIndex != null
&& clusterMetadata.getProject().indices().containsKey(originalIndex) != false
&& indicesWithUnavailableShards.contains(originalIndex) == false) {
addSearchableSnapshotWithOriginalIndexAvailable(index);
&& clusterMetadata.getProject(projectId).indices().containsKey(originalIndex) != false
&& indicesWithUnavailableShards.contains(originalProjectIndex) == false) {
addSearchableSnapshotWithOriginalIndexAvailable(projectIndex);
}
}
}

View file

@ -10,6 +10,7 @@
package org.elasticsearch.health;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
@ -19,6 +20,7 @@ import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.GET;
@ -29,6 +31,8 @@ public class RestGetHealthAction extends BaseRestHandler {
private static final String SIZE_PARAM = "size";
private static final String CAPABILITY_MULTI_PROJECT_SHARDS_AVAILABILITY = "multi_project_shards_availability";
@Override
public String getName() {
// TODO: Existing - "cluster_health_action", "cat_health_action"
@ -57,4 +61,9 @@ public class RestGetHealthAction extends BaseRestHandler {
public boolean canTripCircuitBreaker() {
return false;
}
@Override
public Set<String> supportedCapabilities() {
return Sets.union(Set.of(CAPABILITY_MULTI_PROJECT_SHARDS_AVAILABILITY), super.supportedCapabilities());
}
}

View file

@ -11,6 +11,7 @@ package org.elasticsearch.health.node;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.Collection;
@ -47,6 +48,7 @@ public class HealthIndicatorDisplayValues {
* logging or user messages. The indices are sorted by priority and then by name to ensure a
* deterministic message. If there are more indices than 10, it adds the '...' suffix.
*/
@Deprecated
public static String getTruncatedIndices(Set<String> indices, Metadata clusterMetadata) {
final int maxIndices = 10;
String truncatedIndicesString = indices.stream()
@ -59,6 +61,28 @@ public class HealthIndicatorDisplayValues {
return truncatedIndicesString;
}
/**
* Creates a string that displays max 10 indices from the given set to be used as examples in
* logging or user messages. The indices are sorted by priority and then by name to ensure a
* deterministic message. If there are more indices than 10, it adds the '...' suffix.
*/
public static String getTruncatedProjectIndices(
Set<ProjectIndexName> indices,
Metadata clusterMetadata,
boolean supportsMultipleProjects
) {
final int maxIndices = 10;
String truncatedIndicesString = indices.stream()
.sorted(indicesComparatorByPriorityAndProjectIndex(clusterMetadata, supportsMultipleProjects))
.limit(maxIndices)
.map(projectIndexName -> projectIndexName.toString(supportsMultipleProjects))
.collect(joining(", "));
if (maxIndices < indices.size()) {
truncatedIndicesString = truncatedIndicesString + ", ...";
}
return truncatedIndicesString;
}
/**
* Creates a string that displays all the values that fulfilled the predicate sorted in the natural order.
* @param values, the values to be displayed
@ -119,6 +143,7 @@ public class HealthIndicatorDisplayValues {
* @param clusterMetadata Used to look up index priority.
* @return Comparator instance
*/
@Deprecated
public static Comparator<String> indicesComparatorByPriorityAndName(Metadata clusterMetadata) {
// We want to show indices with a numerically higher index.priority first (since lower priority ones might get truncated):
return Comparator.comparingInt((String indexName) -> {
@ -126,4 +151,23 @@ public class HealthIndicatorDisplayValues {
return indexMetadata == null ? -1 : indexMetadata.priority();
}).reversed().thenComparing(Comparator.naturalOrder());
}
/**
* Sorts index names by their priority first, then alphabetically by name. If the priority cannot be determined for an index then
* a priority of -1 is used to sort it behind other index names.
* @param clusterMetadata Used to look up index priority.
* @param supportsMultipleProjects Whether cluster supports multi-project
* @return Comparator instance
*/
public static Comparator<ProjectIndexName> indicesComparatorByPriorityAndProjectIndex(
Metadata clusterMetadata,
boolean supportsMultipleProjects
) {
// We want to show indices with a numerically higher index.priority first (since lower priority ones might get truncated):
return Comparator.comparingInt((ProjectIndexName projectIndexName) -> {
ProjectMetadata projectMetadata = clusterMetadata.getProject(projectIndexName.projectId());
IndexMetadata indexMetadata = projectMetadata.index(projectIndexName.indexName());
return indexMetadata == null ? -1 : indexMetadata.priority();
}).reversed().thenComparing(projectIndex -> projectIndex.toString(supportsMultipleProjects));
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.health.node;
import org.elasticsearch.cluster.metadata.ProjectId;
public record ProjectIndexName(ProjectId projectId, String indexName) implements Comparable<ProjectIndexName> {
// VisibleForTesting
public static final String DELIMITER = "/";
@Override
public String toString() {
return toString(true);
}
public String toString(boolean withProjectId) {
if (withProjectId) {
return projectId.id() + DELIMITER + indexName;
} else {
return indexName;
}
}
@Override
public int compareTo(ProjectIndexName other) {
return this.toString().compareTo(other.toString());
}
}

View file

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.index.Index;
import java.util.Collections;
@ -106,8 +107,18 @@ public class SystemDataStreamDescriptor {
* @param metadata Metadata in which to look for indices
* @return List of names of backing indices
*/
@Deprecated
public List<String> getBackingIndexNames(Metadata metadata) {
DataStream dataStream = metadata.getProject().dataStreams().get(dataStreamName);
return getBackingIndexNames(metadata.getProject());
}
/**
* Retrieve backing indices for this system data stream
* @param projectMetadata Project metadata in which to look for indices
* @return List of names of backing indices
*/
public List<String> getBackingIndexNames(ProjectMetadata projectMetadata) {
DataStream dataStream = projectMetadata.dataStreams().get(dataStreamName);
if (dataStream == null) {
return Collections.emptyList();
}

View file

@ -9,6 +9,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.allocation.shards.ShardsAvailabilityHealthIndicatorService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
@ -46,7 +47,12 @@ public class ShardsAvailabilityActionGuideTests extends ESTestCase {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(ClusterSettings.createBuiltInClusterSettings());
when(clusterService.getSettings()).thenReturn(Settings.EMPTY);
service = new ShardsAvailabilityHealthIndicatorService(clusterService, mock(AllocationService.class), mock(SystemIndices.class));
service = new ShardsAvailabilityHealthIndicatorService(
clusterService,
mock(AllocationService.class),
mock(SystemIndices.class),
mock(ProjectResolver.class)
);
}
public void testRestoreFromSnapshotAction() {

View file

@ -0,0 +1,40 @@
---
"Health indicator shards_availability for multi-project enabled cluster":
- requires:
test_runner_features: capabilities
capabilities:
- method: GET
path: /_health_report
capabilities: [ multi_project_shards_availability ]
reason: Capability required to run test
- do:
health_report:
feature: master_is_stable
- is_true: cluster_name
- match: { indicators.master_is_stable.status: "green" }
- do:
indices.create:
index: red_index
master_timeout: 1s
timeout: 1s
body:
settings:
number_of_shards: 1
number_of_replicas: 0
index.routing.allocation.enable: none
- do:
health_report:
feature: shards_availability
- is_true: cluster_name
- match: { indicators.shards_availability.status: "red" }
- match: { indicators.shards_availability.symptom: "This cluster has 1 unavailable primary shard." }
- is_true: indicators.shards_availability.diagnosis
- length: { indicators.shards_availability.diagnosis: 1 }
- is_true: indicators.shards_availability.diagnosis.0.affected_resources
- length: { indicators.shards_availability.diagnosis.0.affected_resources: 1 }
# regex match project index name
- match: { indicators.shards_availability.diagnosis.0.affected_resources.indices.0: "/.*\\/red_index/" }