Add a capabilities API to check node and cluster capabilities (#106820)

This adds a /_capabilities rest endpoint for checking the capabilities of a cluster - what endpoints, parameters, and endpoint capabilities the cluster supports
This commit is contained in:
Simon Cooper 2024-05-08 14:44:26 +01:00 committed by GitHub
parent 8864058f83
commit e7350dce29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 504 additions and 0 deletions

View file

@ -0,0 +1,5 @@
pr: 106820
summary: Add a capabilities API to check node and cluster capabilities
area: Infra/REST API
type: feature
issues: []

View file

@ -61,4 +61,15 @@ public enum RestApiVersion {
};
}
public static RestApiVersion forMajor(int major) {
switch (major) {
case 7 -> {
return V_7;
}
case 8 -> {
return V_8;
}
default -> throw new IllegalArgumentException("Unknown REST API version " + major);
}
}
}

View file

@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.nodescapabilities;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesResponse;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SimpleNodesCapabilitiesIT extends ESIntegTestCase {
public void testNodesCapabilities() throws IOException {
internalCluster().startNodes(2);
ClusterHealthResponse clusterHealth = clusterAdmin().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").get();
logger.info("--> done cluster_health, status {}", clusterHealth.getStatus());
// check we support the capabilities API itself. Which we do.
NodesCapabilitiesResponse response = clusterAdmin().nodesCapabilities(new NodesCapabilitiesRequest().path("_capabilities"))
.actionGet();
assertThat(response.getNodes(), hasSize(2));
assertThat(response.isSupported(), is(true));
// check we support some parameters of the capabilities API
response = clusterAdmin().nodesCapabilities(new NodesCapabilitiesRequest().path("_capabilities").parameters("method", "path"))
.actionGet();
assertThat(response.getNodes(), hasSize(2));
assertThat(response.isSupported(), is(true));
// check we don't support some other parameters of the capabilities API
response = clusterAdmin().nodesCapabilities(new NodesCapabilitiesRequest().path("_capabilities").parameters("method", "invalid"))
.actionGet();
assertThat(response.getNodes(), hasSize(2));
assertThat(response.isSupported(), is(false));
// check we don't support a random invalid api
// TODO this is not working yet - see https://github.com/elastic/elasticsearch/issues/107425
/*response = clusterAdmin().nodesCapabilities(new NodesCapabilitiesRequest().path("_invalid"))
.actionGet();
assertThat(response.getNodes(), hasSize(2));
assertThat(response.isSupported(), is(false));*/
}
}

View file

@ -65,6 +65,7 @@ module org.elasticsearch.server {
exports org.elasticsearch.action.admin.cluster.desirednodes;
exports org.elasticsearch.action.admin.cluster.health;
exports org.elasticsearch.action.admin.cluster.migration;
exports org.elasticsearch.action.admin.cluster.node.capabilities;
exports org.elasticsearch.action.admin.cluster.node.hotthreads;
exports org.elasticsearch.action.admin.cluster.node.info;
exports org.elasticsearch.action.admin.cluster.node.reload;

View file

@ -29,6 +29,7 @@ import org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusA
import org.elasticsearch.action.admin.cluster.migration.PostFeatureUpgradeAction;
import org.elasticsearch.action.admin.cluster.migration.TransportGetFeatureUpgradeStatusAction;
import org.elasticsearch.action.admin.cluster.migration.TransportPostFeatureUpgradeAction;
import org.elasticsearch.action.admin.cluster.node.capabilities.TransportNodesCapabilitiesAction;
import org.elasticsearch.action.admin.cluster.node.hotthreads.TransportNodesHotThreadsAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction;
import org.elasticsearch.action.admin.cluster.node.reload.TransportNodesReloadSecureSettingsAction;
@ -284,6 +285,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestGetSnapshotsAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetStoredScriptAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetTaskAction;
import org.elasticsearch.rest.action.admin.cluster.RestListTasksAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesCapabilitiesAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesHotThreadsAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.RestNodesStatsAction;
@ -616,6 +618,7 @@ public class ActionModule extends AbstractModule {
actions.register(TransportNodesInfoAction.TYPE, TransportNodesInfoAction.class);
actions.register(TransportRemoteInfoAction.TYPE, TransportRemoteInfoAction.class);
actions.register(TransportNodesCapabilitiesAction.TYPE, TransportNodesCapabilitiesAction.class);
actions.register(RemoteClusterNodesAction.TYPE, RemoteClusterNodesAction.TransportAction.class);
actions.register(TransportNodesStatsAction.TYPE, TransportNodesStatsAction.class);
actions.register(TransportNodesUsageAction.TYPE, TransportNodesUsageAction.class);
@ -833,6 +836,7 @@ public class ActionModule extends AbstractModule {
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesCapabilitiesAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
registerHandler.accept(new RestNodesHotThreadsAction());
@ -1029,6 +1033,7 @@ public class ActionModule extends AbstractModule {
@Override
protected void configure() {
bind(RestController.class).toInstance(restController);
bind(ActionFilters.class).toInstance(actionFilters);
bind(DestructiveOperations.class).toInstance(destructiveOperations);
bind(new TypeLiteral<RequestValidators<PutMappingRequest>>() {

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.admin.cluster.node.capabilities;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
public class NodeCapability extends BaseNodeResponse {
private final boolean supported;
public NodeCapability(StreamInput in) throws IOException {
super(in);
supported = in.readBoolean();
}
public NodeCapability(boolean supported, DiscoveryNode node) {
super(node);
this.supported = supported;
}
public boolean isSupported() {
return supported;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(supported);
}
}

View file

@ -0,0 +1,75 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.admin.cluster.node.capabilities;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.RestRequest;
import java.util.Set;
public class NodesCapabilitiesRequest extends BaseNodesRequest<NodesCapabilitiesRequest> {
private RestRequest.Method method = RestRequest.Method.GET;
private String path = "/";
private Set<String> parameters = Set.of();
private Set<String> capabilities = Set.of();
private RestApiVersion restApiVersion = RestApiVersion.current();
public NodesCapabilitiesRequest() {
// always send to all nodes
super(Strings.EMPTY_ARRAY);
}
public NodesCapabilitiesRequest path(String path) {
this.path = path;
return this;
}
public String path() {
return path;
}
public NodesCapabilitiesRequest method(RestRequest.Method method) {
this.method = method;
return this;
}
public RestRequest.Method method() {
return method;
}
public NodesCapabilitiesRequest parameters(String... parameters) {
this.parameters = Set.of(parameters);
return this;
}
public Set<String> parameters() {
return parameters;
}
public NodesCapabilitiesRequest capabilities(String... capabilities) {
this.capabilities = Set.of(capabilities);
return this;
}
public Set<String> capabilities() {
return capabilities;
}
public NodesCapabilitiesRequest restApiVersion(RestApiVersion restApiVersion) {
this.restApiVersion = restApiVersion;
return this;
}
public RestApiVersion restApiVersion() {
return restApiVersion;
}
}

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.admin.cluster.node.capabilities;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
public class NodesCapabilitiesResponse extends BaseNodesResponse<NodeCapability> implements ToXContentFragment {
protected NodesCapabilitiesResponse(ClusterName clusterName, List<NodeCapability> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}
@Override
protected List<NodeCapability> readNodesFrom(StreamInput in) throws IOException {
return TransportAction.localOnly();
}
@Override
protected void writeNodesTo(StreamOutput out, List<NodeCapability> nodes) throws IOException {
TransportAction.localOnly();
}
public boolean isSupported() {
return getNodes().isEmpty() == false && getNodes().stream().allMatch(NodeCapability::isSupported);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field("supported", isSupported());
}
}

View file

@ -0,0 +1,140 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.admin.cluster.node.capabilities;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class TransportNodesCapabilitiesAction extends TransportNodesAction<
NodesCapabilitiesRequest,
NodesCapabilitiesResponse,
TransportNodesCapabilitiesAction.NodeCapabilitiesRequest,
NodeCapability> {
public static final ActionType<NodesCapabilitiesResponse> TYPE = new ActionType<>("cluster:monitor/nodes/capabilities");
private final RestController restController;
@Inject
public TransportNodesCapabilitiesAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
RestController restController
) {
super(
TYPE.name(),
clusterService,
transportService,
actionFilters,
NodeCapabilitiesRequest::new,
threadPool.executor(ThreadPool.Names.MANAGEMENT)
);
this.restController = restController;
}
@Override
protected NodesCapabilitiesResponse newResponse(
NodesCapabilitiesRequest request,
List<NodeCapability> responses,
List<FailedNodeException> failures
) {
return new NodesCapabilitiesResponse(clusterService.getClusterName(), responses, failures);
}
@Override
protected NodeCapabilitiesRequest newNodeRequest(NodesCapabilitiesRequest request) {
return new NodeCapabilitiesRequest(
request.method(),
request.path(),
request.parameters(),
request.capabilities(),
request.restApiVersion()
);
}
@Override
protected NodeCapability newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeCapability(in);
}
@Override
protected NodeCapability nodeOperation(NodeCapabilitiesRequest request, Task task) {
boolean supported = restController.checkSupported(
request.method,
request.path,
request.parameters,
request.capabilities,
request.restApiVersion
);
return new NodeCapability(supported, transportService.getLocalNode());
}
public static class NodeCapabilitiesRequest extends TransportRequest {
private final RestRequest.Method method;
private final String path;
private final Set<String> parameters;
private final Set<String> capabilities;
private final RestApiVersion restApiVersion;
public NodeCapabilitiesRequest(StreamInput in) throws IOException {
super(in);
method = in.readEnum(RestRequest.Method.class);
path = in.readString();
parameters = in.readCollectionAsImmutableSet(StreamInput::readString);
capabilities = in.readCollectionAsImmutableSet(StreamInput::readString);
restApiVersion = RestApiVersion.forMajor(in.readVInt());
}
public NodeCapabilitiesRequest(
RestRequest.Method method,
String path,
Set<String> parameters,
Set<String> capabilities,
RestApiVersion restApiVersion
) {
this.method = method;
this.path = path;
this.parameters = Set.copyOf(parameters);
this.capabilities = Set.copyOf(capabilities);
this.restApiVersion = restApiVersion;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeEnum(method);
out.writeString(path);
out.writeCollection(parameters, StreamOutput::writeString);
out.writeCollection(capabilities, StreamOutput::writeString);
out.writeVInt(restApiVersion.major);
}
}
}

View file

@ -21,6 +21,9 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesResponse;
import org.elasticsearch.action.admin.cluster.node.capabilities.TransportNodesCapabilitiesAction;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
@ -248,6 +251,14 @@ public class ClusterAdminClient implements ElasticsearchClient {
return new NodesStatsRequestBuilder(this).setNodesIds(nodesIds);
}
public ActionFuture<NodesCapabilitiesResponse> nodesCapabilities(final NodesCapabilitiesRequest request) {
return execute(TransportNodesCapabilitiesAction.TYPE, request);
}
public void nodesCapabilities(final NodesCapabilitiesRequest request, final ActionListener<NodesCapabilitiesResponse> listener) {
execute(TransportNodesCapabilitiesAction.TYPE, request, listener);
}
public void nodesUsage(final NodesUsageRequest request, final ActionListener<NodesUsageResponse> listener) {
execute(TransportNodesUsageAction.TYPE, request, listener);
}

View file

@ -12,6 +12,7 @@ import org.apache.lucene.search.spell.LevenshteinDistance;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
@ -77,6 +78,13 @@ public abstract class BaseRestHandler implements RestHandler {
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// check if the query has any parameters that are not in the supported set (if declared)
Set<String> supported = supportedQueryParameters();
if (supported != null && supported.containsAll(request.params().keySet()) == false) {
Set<String> unsupported = Sets.difference(request.params().keySet(), supported);
throw new IllegalArgumentException(unrecognized(request, unsupported, supported, "parameter"));
}
// prepare the request for execution; has the side effect of touching the request parameters
try (var action = prepareRequest(request, client)) {

View file

@ -365,6 +365,32 @@ public class RestController implements HttpServerTransport.Dispatcher {
}
}
public boolean checkSupported(
RestRequest.Method method,
String path,
Set<String> parameters,
Set<String> capabilities,
RestApiVersion restApiVersion
) {
Iterator<MethodHandlers> allHandlers = getAllHandlers(null, path);
while (allHandlers.hasNext()) {
RestHandler handler;
MethodHandlers handlers = allHandlers.next();
if (handlers == null) {
handler = null;
} else {
handler = handlers.getHandler(method, restApiVersion);
}
if (handler != null) {
var supportedParams = handler.supportedQueryParameters();
return (supportedParams == null || supportedParams.containsAll(parameters))
&& handler.supportedCapabilities().containsAll(capabilities);
}
}
return false;
}
@Override
public Map<String, HttpRouteStats> getStats() {
final Iterator<MethodHandlers> methodHandlersIterator = handlers.allNodeValues();

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xcontent.XContent;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* Handler for REST requests
@ -85,6 +86,22 @@ public interface RestHandler {
return Collections.emptyList();
}
/**
* The set of query parameters accepted by this rest handler,
* {@code null} if query parameters should not be checked nor validated.
* TODO - make this not nullable when all handlers have been updated
*/
default @Nullable Set<String> supportedQueryParameters() {
return null;
}
/**
* The set of capabilities this rest handler supports.
*/
default Set<String> supportedCapabilities() {
return Set.of();
}
/**
* Controls whether requests handled by this class are allowed to to access system indices by default.
* @return {@code true} if requests handled by this class should be allowed to access system indices.

View file

@ -0,0 +1,60 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
@ServerlessScope(Scope.INTERNAL)
public class RestNodesCapabilitiesAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return List.of(new Route(RestRequest.Method.GET, "/_capabilities"));
}
@Override
public Set<String> supportedQueryParameters() {
return Set.of("timeout", "method", "path", "parameters", "capabilities");
}
@Override
public String getName() {
return "nodes_capabilities_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
NodesCapabilitiesRequest r = new NodesCapabilitiesRequest().timeout(request.paramAsTime("timeout", null))
.method(RestRequest.Method.valueOf(request.param("method", "GET")))
.path(URLDecoder.decode(request.param("path"), StandardCharsets.UTF_8))
.parameters(request.paramAsStringArray("parameters", Strings.EMPTY_ARRAY))
.capabilities(request.paramAsStringArray("capabilities", Strings.EMPTY_ARRAY))
.restApiVersion(request.getRestApiVersion());
return channel -> client.admin().cluster().nodesCapabilities(r, new NodesResponseRestListener<>(channel));
}
@Override
public boolean canTripCircuitBreaker() {
return false;
}
}

View file

@ -341,6 +341,7 @@ public class Constants {
"cluster:monitor/update/health/info",
"cluster:monitor/ingest/geoip/stats",
"cluster:monitor/main",
"cluster:monitor/nodes/capabilities",
"cluster:monitor/nodes/data_tier_usage",
"cluster:monitor/nodes/hot_threads",
"cluster:monitor/nodes/info",