mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
[7.x] Use query param instead of a system property for opting in for new cluster health response code (#79397)
Backport #79351 to 7.x: The original change was implemented in #78940, but we have decided to move from a system property to a request parameter, so Cloud users/clients have an easier way to opt-in for the new status code. Relates #70849
This commit is contained in:
parent
55185fcd18
commit
fbe49d15b0
14 changed files with 115 additions and 61 deletions
|
@ -37,13 +37,13 @@ import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDeci
|
||||||
import org.elasticsearch.common.compress.CompressedXContent;
|
import org.elasticsearch.common.compress.CompressedXContent;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.core.TimeValue;
|
|
||||||
import org.elasticsearch.xcontent.XContentType;
|
|
||||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.transport.RemoteClusterService;
|
import org.elasticsearch.transport.RemoteClusterService;
|
||||||
import org.elasticsearch.transport.SniffConnectionStrategy;
|
import org.elasticsearch.transport.SniffConnectionStrategy;
|
||||||
|
import org.elasticsearch.xcontent.XContentType;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -317,7 +317,7 @@ public class ClusterClientIT extends ESRestHighLevelClientTestCase {
|
||||||
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
|
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
|
||||||
assertNoIndices(response);
|
assertNoIndices(response);
|
||||||
assertWarnings("The HTTP status code for a cluster health timeout will be changed from 408 to 200 in a " +
|
assertWarnings("The HTTP status code for a cluster health timeout will be changed from 408 to 200 in a " +
|
||||||
"future version. Set the [es.cluster_health.request_timeout_200] system property to [true] to suppress this message and " +
|
"future version. Set the [return_200_for_cluster_health_timeout] query parameter to [true] to suppress this message and " +
|
||||||
"opt in to the future behaviour now.");
|
"opt in to the future behaviour now.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,6 +97,11 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
|
||||||
provided or better, i.e. `green` > `yellow` > `red`. By default, will not
|
provided or better, i.e. `green` > `yellow` > `red`. By default, will not
|
||||||
wait for any status.
|
wait for any status.
|
||||||
|
|
||||||
|
`return_200_for_cluster_health_timeout`::
|
||||||
|
(Optional, Boolean) A boolean value which controls whether to return HTTP 200
|
||||||
|
status code instead of HTTP 408 in case of a cluster health timeout from
|
||||||
|
the server side. Defaults to false.
|
||||||
|
|
||||||
[[cluster-health-api-response-body]]
|
[[cluster-health-api-response-body]]
|
||||||
==== {api-response-body-title}
|
==== {api-response-body-title}
|
||||||
|
|
||||||
|
|
|
@ -102,6 +102,10 @@
|
||||||
"red"
|
"red"
|
||||||
],
|
],
|
||||||
"description":"Wait until cluster is in a specific state"
|
"description":"Wait until cluster is in a specific state"
|
||||||
|
},
|
||||||
|
"return_200_for_cluster_health_timeout":{
|
||||||
|
"type":"boolean",
|
||||||
|
"description":"Whether to return HTTP 200 instead of 408 in case of a cluster health timeout from the server side"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,3 +35,25 @@
|
||||||
- match: { initializing_shards: 0 }
|
- match: { initializing_shards: 0 }
|
||||||
- match: { unassigned_shards: 0 }
|
- match: { unassigned_shards: 0 }
|
||||||
- gte: { number_of_pending_tasks: 0 }
|
- gte: { number_of_pending_tasks: 0 }
|
||||||
|
|
||||||
|
---
|
||||||
|
"cluster health request timeout with 200 response code":
|
||||||
|
- skip:
|
||||||
|
version: " - 7.15.99"
|
||||||
|
reason: "return_200_for_cluster_health_timeout was added in 7.16"
|
||||||
|
- do:
|
||||||
|
cluster.health:
|
||||||
|
timeout: 1ms
|
||||||
|
wait_for_active_shards: 5
|
||||||
|
return_200_for_cluster_health_timeout: true
|
||||||
|
|
||||||
|
- is_true: cluster_name
|
||||||
|
- is_true: timed_out
|
||||||
|
- gte: { number_of_nodes: 1 }
|
||||||
|
- gte: { number_of_data_nodes: 1 }
|
||||||
|
- match: { active_primary_shards: 0 }
|
||||||
|
- match: { active_shards: 0 }
|
||||||
|
- match: { relocating_shards: 0 }
|
||||||
|
- match: { initializing_shards: 0 }
|
||||||
|
- match: { unassigned_shards: 0 }
|
||||||
|
- gte: { number_of_pending_tasks: 0 }
|
||||||
|
|
|
@ -35,6 +35,8 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
|
private ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
|
||||||
private String waitForNodes = "";
|
private String waitForNodes = "";
|
||||||
private Priority waitForEvents = null;
|
private Priority waitForEvents = null;
|
||||||
|
private boolean return200ForClusterHealthTimeout;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Only used by the high-level REST Client. Controls the details level of the health information returned.
|
* Only used by the high-level REST Client. Controls the details level of the health information returned.
|
||||||
* The default value is 'cluster'.
|
* The default value is 'cluster'.
|
||||||
|
@ -69,6 +71,9 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
} else {
|
} else {
|
||||||
indicesOptions = IndicesOptions.lenientExpandOpen();
|
indicesOptions = IndicesOptions.lenientExpandOpen();
|
||||||
}
|
}
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
|
||||||
|
return200ForClusterHealthTimeout = in.readBoolean();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -101,6 +106,11 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
if (out.getVersion().onOrAfter(Version.V_7_2_0)) {
|
if (out.getVersion().onOrAfter(Version.V_7_2_0)) {
|
||||||
indicesOptions.writeIndicesOptions(out);
|
indicesOptions.writeIndicesOptions(out);
|
||||||
}
|
}
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
|
||||||
|
out.writeBoolean(return200ForClusterHealthTimeout);
|
||||||
|
} else if (return200ForClusterHealthTimeout) {
|
||||||
|
throw new IllegalArgumentException("Can't fix response code in a cluster involving nodes with version " + out.getVersion());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -244,6 +254,18 @@ public class ClusterHealthRequest extends MasterNodeReadRequest<ClusterHealthReq
|
||||||
return this.waitForEvents;
|
return this.waitForEvents;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean doesReturn200ForClusterHealthTimeout() {
|
||||||
|
return return200ForClusterHealthTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets whether to return HTTP 200 status code instead of HTTP 408 in case of a
|
||||||
|
* cluster health timeout from the server side.
|
||||||
|
*/
|
||||||
|
public void return200ForClusterHealthTimeout(boolean return200ForClusterHealthTimeout) {
|
||||||
|
this.return200ForClusterHealthTimeout = return200ForClusterHealthTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the level of detail for the health information to be returned.
|
* Set the level of detail for the health information to be returned.
|
||||||
* Only used by the high-level REST Client.
|
* Only used by the high-level REST Client.
|
||||||
|
|
|
@ -8,25 +8,26 @@
|
||||||
|
|
||||||
package org.elasticsearch.action.admin.cluster.health;
|
package org.elasticsearch.action.admin.cluster.health;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.cluster.health.ClusterIndexHealth;
|
import org.elasticsearch.cluster.health.ClusterIndexHealth;
|
||||||
import org.elasticsearch.cluster.health.ClusterStateHealth;
|
import org.elasticsearch.cluster.health.ClusterStateHealth;
|
||||||
import org.elasticsearch.common.logging.DeprecationCategory;
|
|
||||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
|
||||||
import org.elasticsearch.xcontent.ParseField;
|
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.common.logging.DeprecationCategory;
|
||||||
import org.elasticsearch.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
import org.elasticsearch.xcontent.ObjectParser;
|
|
||||||
import org.elasticsearch.common.xcontent.StatusToXContentObject;
|
import org.elasticsearch.common.xcontent.StatusToXContentObject;
|
||||||
import org.elasticsearch.xcontent.XContentBuilder;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.xcontent.XContentParser;
|
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.action.search.RestSearchAction;
|
import org.elasticsearch.rest.action.search.RestSearchAction;
|
||||||
|
import org.elasticsearch.xcontent.ConstructingObjectParser;
|
||||||
|
import org.elasticsearch.xcontent.ObjectParser;
|
||||||
|
import org.elasticsearch.xcontent.ParseField;
|
||||||
|
import org.elasticsearch.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.xcontent.XContentParser;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -102,10 +103,10 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
|
|
||||||
private static final ObjectParser.NamedObjectParser<ClusterIndexHealth, Void> INDEX_PARSER =
|
private static final ObjectParser.NamedObjectParser<ClusterIndexHealth, Void> INDEX_PARSER =
|
||||||
(XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index);
|
(XContentParser parser, Void context, String index) -> ClusterIndexHealth.innerFromXContent(parser, index);
|
||||||
private static final String ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY = "es.cluster_health.request_timeout_200";
|
static final String ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY = "return_200_for_cluster_health_timeout";
|
||||||
static final String CLUSTER_HEALTH_REQUEST_TIMEOUT_DEPRECATION_MSG = "The HTTP status code for a cluster health timeout " +
|
static final String CLUSTER_HEALTH_REQUEST_TIMEOUT_DEPRECATION_MSG = "The HTTP status code for a cluster health timeout " +
|
||||||
"will be changed from 408 to 200 in a future version. Set the [" + ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY + "] " +
|
"will be changed from 408 to 200 in a future version. Set the [" + ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY + "] " +
|
||||||
"system property to [true] to suppress this message and opt in to the future behaviour now.";
|
"query parameter to [true] to suppress this message and opt in to the future behaviour now.";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// ClusterStateHealth fields
|
// ClusterStateHealth fields
|
||||||
|
@ -138,15 +139,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
private boolean timedOut = false;
|
private boolean timedOut = false;
|
||||||
private ClusterStateHealth clusterStateHealth;
|
private ClusterStateHealth clusterStateHealth;
|
||||||
private ClusterHealthStatus clusterHealthStatus;
|
private ClusterHealthStatus clusterHealthStatus;
|
||||||
private boolean esClusterHealthRequestTimeout200 = readEsClusterHealthRequestTimeout200FromProperty();
|
private boolean return200ForClusterHealthTimeout;
|
||||||
|
|
||||||
public ClusterHealthResponse() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/** For the testing of opting in for the 200 status code without setting a system property */
|
|
||||||
ClusterHealthResponse(boolean esClusterHealthRequestTimeout200) {
|
|
||||||
this.esClusterHealthRequestTimeout200 = esClusterHealthRequestTimeout200;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ClusterHealthResponse(StreamInput in) throws IOException {
|
public ClusterHealthResponse(StreamInput in) throws IOException {
|
||||||
super(in);
|
super(in);
|
||||||
|
@ -158,15 +151,21 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
numberOfInFlightFetch = in.readInt();
|
numberOfInFlightFetch = in.readInt();
|
||||||
delayedUnassignedShards= in.readInt();
|
delayedUnassignedShards= in.readInt();
|
||||||
taskMaxWaitingTime = in.readTimeValue();
|
taskMaxWaitingTime = in.readTimeValue();
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
|
||||||
|
return200ForClusterHealthTimeout = in.readBoolean();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** needed for plugins BWC */
|
/** needed for plugins BWC */
|
||||||
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState) {
|
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState,
|
||||||
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0));
|
boolean return200ForServerTimeout) {
|
||||||
|
this(clusterName, concreteIndices, clusterState, -1, -1, -1, TimeValue.timeValueHours(0),
|
||||||
|
return200ForServerTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
|
public ClusterHealthResponse(String clusterName, String[] concreteIndices, ClusterState clusterState, int numberOfPendingTasks,
|
||||||
int numberOfInFlightFetch, int delayedUnassignedShards, TimeValue taskMaxWaitingTime) {
|
int numberOfInFlightFetch, int delayedUnassignedShards, TimeValue taskMaxWaitingTime,
|
||||||
|
boolean return200ForServerTimeout) {
|
||||||
this.clusterName = clusterName;
|
this.clusterName = clusterName;
|
||||||
this.numberOfPendingTasks = numberOfPendingTasks;
|
this.numberOfPendingTasks = numberOfPendingTasks;
|
||||||
this.numberOfInFlightFetch = numberOfInFlightFetch;
|
this.numberOfInFlightFetch = numberOfInFlightFetch;
|
||||||
|
@ -174,6 +173,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
this.taskMaxWaitingTime = taskMaxWaitingTime;
|
this.taskMaxWaitingTime = taskMaxWaitingTime;
|
||||||
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
|
this.clusterStateHealth = new ClusterStateHealth(clusterState, concreteIndices);
|
||||||
this.clusterHealthStatus = clusterStateHealth.getStatus();
|
this.clusterHealthStatus = clusterStateHealth.getStatus();
|
||||||
|
this.return200ForClusterHealthTimeout = return200ForServerTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -305,6 +305,11 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
out.writeInt(numberOfInFlightFetch);
|
out.writeInt(numberOfInFlightFetch);
|
||||||
out.writeInt(delayedUnassignedShards);
|
out.writeInt(delayedUnassignedShards);
|
||||||
out.writeTimeValue(taskMaxWaitingTime);
|
out.writeTimeValue(taskMaxWaitingTime);
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
|
||||||
|
out.writeBoolean(return200ForClusterHealthTimeout);
|
||||||
|
} else if (return200ForClusterHealthTimeout) {
|
||||||
|
throw new IllegalArgumentException("Can't fix response code in a cluster involving nodes with version " + out.getVersion());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -317,7 +322,7 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
if (isTimedOut() == false) {
|
if (isTimedOut() == false) {
|
||||||
return RestStatus.OK;
|
return RestStatus.OK;
|
||||||
}
|
}
|
||||||
if (esClusterHealthRequestTimeout200) {
|
if (return200ForClusterHealthTimeout) {
|
||||||
return RestStatus.OK;
|
return RestStatus.OK;
|
||||||
} else {
|
} else {
|
||||||
deprecationLogger.critical(DeprecationCategory.API,"cluster_health_request_timeout",
|
deprecationLogger.critical(DeprecationCategory.API,"cluster_health_request_timeout",
|
||||||
|
@ -383,17 +388,4 @@ public class ClusterHealthResponse extends ActionResponse implements StatusToXCo
|
||||||
return Objects.hash(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTime,
|
return Objects.hash(clusterName, numberOfPendingTasks, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTime,
|
||||||
timedOut, clusterStateHealth, clusterHealthStatus);
|
timedOut, clusterStateHealth, clusterHealthStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean readEsClusterHealthRequestTimeout200FromProperty() {
|
|
||||||
String property = System.getProperty(ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY);
|
|
||||||
if (property == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (Boolean.parseBoolean(property)) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
throw new IllegalArgumentException(ES_CLUSTER_HEALTH_REQUEST_TIMEOUT_200_KEY + " can only be unset or [true] but was ["
|
|
||||||
+ property + "]");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,8 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.core.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.CollectionUtils;
|
import org.elasticsearch.common.util.CollectionUtils;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.node.NodeClosedException;
|
import org.elasticsearch.node.NodeClosedException;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
@ -232,7 +232,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
||||||
|
|
||||||
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
|
private ClusterHealthResponse getResponse(final ClusterHealthRequest request, ClusterState clusterState,
|
||||||
final int waitFor, TimeoutState timeoutState) {
|
final int waitFor, TimeoutState timeoutState) {
|
||||||
ClusterHealthResponse response = clusterHealth(request, clusterState, clusterService.getMasterService().numberOfPendingTasks(),
|
ClusterHealthResponse response = clusterHealth(request, clusterState,
|
||||||
|
clusterService.getMasterService().numberOfPendingTasks(),
|
||||||
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
|
allocationService.getNumberOfInFlightFetches(), clusterService.getMasterService().getMaxTaskWaitTime());
|
||||||
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
|
int readyCounter = prepareResponse(request, response, clusterState, indexNameExpressionResolver);
|
||||||
boolean valid = (readyCounter == waitFor);
|
boolean valid = (readyCounter == waitFor);
|
||||||
|
@ -331,8 +332,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState, int numberOfPendingTasks,
|
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState,
|
||||||
int numberOfInFlightFetch, TimeValue pendingTaskTimeInQueue) {
|
int numberOfPendingTasks, int numberOfInFlightFetch, TimeValue pendingTaskTimeInQueue) {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("Calculating health based on state version [{}]", clusterState.version());
|
logger.trace("Calculating health based on state version [{}]", clusterState.version());
|
||||||
}
|
}
|
||||||
|
@ -344,12 +345,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
|
||||||
// one of the specified indices is not there - treat it as RED.
|
// one of the specified indices is not there - treat it as RED.
|
||||||
ClusterHealthResponse response = new ClusterHealthResponse(clusterState.getClusterName().value(), Strings.EMPTY_ARRAY,
|
ClusterHealthResponse response = new ClusterHealthResponse(clusterState.getClusterName().value(), Strings.EMPTY_ARRAY,
|
||||||
clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
|
clusterState, numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
|
||||||
pendingTaskTimeInQueue);
|
pendingTaskTimeInQueue, request.doesReturn200ForClusterHealthTimeout());
|
||||||
response.setStatus(ClusterHealthStatus.RED);
|
response.setStatus(ClusterHealthStatus.RED);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new ClusterHealthResponse(clusterState.getClusterName().value(), concreteIndices, clusterState, numberOfPendingTasks,
|
return new ClusterHealthResponse(clusterState.getClusterName().value(), concreteIndices, clusterState,
|
||||||
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue);
|
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue,
|
||||||
|
request.doesReturn200ForClusterHealthTimeout());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,6 +83,9 @@ public class RestClusterHealthAction extends BaseRestHandler {
|
||||||
if (request.param("wait_for_events") != null) {
|
if (request.param("wait_for_events") != null) {
|
||||||
clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
|
clusterHealthRequest.waitForEvents(Priority.valueOf(request.param("wait_for_events").toUpperCase(Locale.ROOT)));
|
||||||
}
|
}
|
||||||
|
clusterHealthRequest.return200ForClusterHealthTimeout(request.paramAsBoolean(
|
||||||
|
"return_200_for_cluster_health_timeout",
|
||||||
|
clusterHealthRequest.doesReturn200ForClusterHealthTimeout()));
|
||||||
return clusterHealthRequest;
|
return clusterHealthRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,7 @@ public class ClusterHealthRequestTests extends ESTestCase {
|
||||||
assertThat(cloneRequest.waitForEvents(), equalTo(originalRequest.waitForEvents()));
|
assertThat(cloneRequest.waitForEvents(), equalTo(originalRequest.waitForEvents()));
|
||||||
assertIndicesEquals(cloneRequest.indices(), originalRequest.indices());
|
assertIndicesEquals(cloneRequest.indices(), originalRequest.indices());
|
||||||
assertThat(cloneRequest.indicesOptions(), equalTo(originalRequest.indicesOptions()));
|
assertThat(cloneRequest.indicesOptions(), equalTo(originalRequest.indicesOptions()));
|
||||||
|
assertThat(cloneRequest.doesReturn200ForClusterHealthTimeout(), equalTo(originalRequest.doesReturn200ForClusterHealthTimeout()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRequestReturnsHiddenIndicesByDefault() {
|
public void testRequestReturnsHiddenIndicesByDefault() {
|
||||||
|
|
|
@ -20,10 +20,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.core.TimeValue;
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.xcontent.ToXContent;
|
|
||||||
import org.elasticsearch.xcontent.XContentParser;
|
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||||
|
import org.elasticsearch.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.xcontent.XContentParser;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -43,7 +43,7 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
|
||||||
private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values());
|
private final ClusterHealthRequest.Level level = randomFrom(ClusterHealthRequest.Level.values());
|
||||||
|
|
||||||
public void testIsTimeout() {
|
public void testIsTimeout() {
|
||||||
ClusterHealthResponse res = new ClusterHealthResponse();
|
ClusterHealthResponse res = new ClusterHealthResponse("", new String[]{}, ClusterState.EMPTY_STATE, false);
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
res.setTimedOut(randomBoolean());
|
res.setTimedOut(randomBoolean());
|
||||||
if (res.isTimedOut()) {
|
if (res.isTimedOut()) {
|
||||||
|
@ -56,7 +56,7 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testTimeoutReturns200IfOptedIn() {
|
public void testTimeoutReturns200IfOptedIn() {
|
||||||
ClusterHealthResponse res = new ClusterHealthResponse(true);
|
ClusterHealthResponse res = new ClusterHealthResponse("", new String[]{}, ClusterState.EMPTY_STATE, true);
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
res.setTimedOut(randomBoolean());
|
res.setTimedOut(randomBoolean());
|
||||||
assertEquals(RestStatus.OK, res.status());
|
assertEquals(RestStatus.OK, res.status());
|
||||||
|
@ -70,7 +70,7 @@ public class ClusterHealthResponsesTests extends AbstractSerializingTestCase<Clu
|
||||||
int delayedUnassigned = randomIntBetween(0, 200);
|
int delayedUnassigned = randomIntBetween(0, 200);
|
||||||
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
|
TimeValue pendingTaskInQueueTime = TimeValue.timeValueMillis(randomIntBetween(1000, 100000));
|
||||||
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", new String[] {Metadata.ALL},
|
ClusterHealthResponse clusterHealth = new ClusterHealthResponse("bla", new String[] {Metadata.ALL},
|
||||||
clusterState, pendingTasks, inFlight, delayedUnassigned, pendingTaskInQueueTime);
|
clusterState, pendingTasks, inFlight, delayedUnassigned, pendingTaskInQueueTime, false);
|
||||||
clusterHealth = maybeSerialize(clusterHealth);
|
clusterHealth = maybeSerialize(clusterHealth);
|
||||||
assertClusterHealth(clusterHealth);
|
assertClusterHealth(clusterHealth);
|
||||||
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
|
assertThat(clusterHealth.getNumberOfPendingTasks(), Matchers.equalTo(pendingTasks));
|
||||||
|
|
|
@ -37,17 +37,17 @@ public class TransportClusterHealthActionTests extends ESTestCase {
|
||||||
final ClusterHealthRequest request = new ClusterHealthRequest();
|
final ClusterHealthRequest request = new ClusterHealthRequest();
|
||||||
request.waitForNoInitializingShards(true);
|
request.waitForNoInitializingShards(true);
|
||||||
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
|
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 0);
|
||||||
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState);
|
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState, false);
|
||||||
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
|
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
|
||||||
|
|
||||||
request.waitForNoInitializingShards(true);
|
request.waitForNoInitializingShards(true);
|
||||||
clusterState = randomClusterStateWithInitializingShards("test", between(1, 10));
|
clusterState = randomClusterStateWithInitializingShards("test", between(1, 10));
|
||||||
response = new ClusterHealthResponse("", indices, clusterState);
|
response = new ClusterHealthResponse("", indices, clusterState, false);
|
||||||
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
||||||
|
|
||||||
request.waitForNoInitializingShards(false);
|
request.waitForNoInitializingShards(false);
|
||||||
clusterState = randomClusterStateWithInitializingShards("test", randomInt(20));
|
clusterState = randomClusterStateWithInitializingShards("test", randomInt(20));
|
||||||
response = new ClusterHealthResponse("", indices, clusterState);
|
response = new ClusterHealthResponse("", indices, clusterState, false);
|
||||||
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,11 +57,11 @@ public class TransportClusterHealthActionTests extends ESTestCase {
|
||||||
request.waitForActiveShards(ActiveShardCount.ALL);
|
request.waitForActiveShards(ActiveShardCount.ALL);
|
||||||
|
|
||||||
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 1);
|
ClusterState clusterState = randomClusterStateWithInitializingShards("test", 1);
|
||||||
ClusterHealthResponse response = new ClusterHealthResponse("", indices, clusterState);
|
ClusterHealthResponse response = new ClusterHealthResponse( "", indices, clusterState, false);
|
||||||
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(0));
|
||||||
|
|
||||||
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
|
clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();
|
||||||
response = new ClusterHealthResponse("", indices, clusterState);
|
response = new ClusterHealthResponse("", indices, clusterState, false);
|
||||||
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
|
assertThat(TransportClusterHealthAction.prepareResponse(request, response, clusterState, null), equalTo(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,8 @@ public class RestClusterHealthActionTests extends ESTestCase {
|
||||||
params.put("wait_for_active_shards", String.valueOf(waitForActiveShards));
|
params.put("wait_for_active_shards", String.valueOf(waitForActiveShards));
|
||||||
params.put("wait_for_nodes", waitForNodes);
|
params.put("wait_for_nodes", waitForNodes);
|
||||||
params.put("wait_for_events", waitForEvents.name());
|
params.put("wait_for_events", waitForEvents.name());
|
||||||
|
boolean requestTimeout200 = randomBoolean();
|
||||||
|
params.put("return_200_for_cluster_health_timeout", String.valueOf(requestTimeout200));
|
||||||
|
|
||||||
FakeRestRequest restRequest = buildRestRequest(params);
|
FakeRestRequest restRequest = buildRestRequest(params);
|
||||||
ClusterHealthRequest clusterHealthRequest = RestClusterHealthAction.fromRequest(restRequest);
|
ClusterHealthRequest clusterHealthRequest = RestClusterHealthAction.fromRequest(restRequest);
|
||||||
|
@ -65,7 +67,7 @@ public class RestClusterHealthActionTests extends ESTestCase {
|
||||||
assertThat(clusterHealthRequest.waitForActiveShards(), equalTo(ActiveShardCount.parseString(String.valueOf(waitForActiveShards))));
|
assertThat(clusterHealthRequest.waitForActiveShards(), equalTo(ActiveShardCount.parseString(String.valueOf(waitForActiveShards))));
|
||||||
assertThat(clusterHealthRequest.waitForNodes(), equalTo(waitForNodes));
|
assertThat(clusterHealthRequest.waitForNodes(), equalTo(waitForNodes));
|
||||||
assertThat(clusterHealthRequest.waitForEvents(), equalTo(waitForEvents));
|
assertThat(clusterHealthRequest.waitForEvents(), equalTo(waitForEvents));
|
||||||
|
assertThat(clusterHealthRequest.doesReturn200ForClusterHealthTimeout(), equalTo(requestTimeout200));
|
||||||
}
|
}
|
||||||
|
|
||||||
private FakeRestRequest buildRestRequest(Map<String, String> params) {
|
private FakeRestRequest buildRestRequest(Map<String, String> params) {
|
||||||
|
|
|
@ -35,11 +35,12 @@ import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.core.TimeValue;
|
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.core.TimeValue;
|
||||||
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
|
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -109,7 +110,7 @@ public class MlIndexAndAliasTests extends ESTestCase {
|
||||||
clusterAdminClient = mock(ClusterAdminClient.class);
|
clusterAdminClient = mock(ClusterAdminClient.class);
|
||||||
doAnswer(invocationOnMock -> {
|
doAnswer(invocationOnMock -> {
|
||||||
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
||||||
listener.onResponse(new ClusterHealthResponse());
|
listener.onResponse(new ClusterHealthResponse("", Strings.EMPTY_ARRAY, ClusterState.EMPTY_STATE, false));
|
||||||
return null;
|
return null;
|
||||||
}).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class));
|
}).when(clusterAdminClient).health(any(ClusterHealthRequest.class), any(ActionListener.class));
|
||||||
|
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class TransformInternalIndexTests extends ESTestCase {
|
||||||
doAnswer(invocationOnMock -> {
|
doAnswer(invocationOnMock -> {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
||||||
listener.onResponse(new ClusterHealthResponse());
|
listener.onResponse(new ClusterHealthResponse("", new String[]{}, ClusterState.EMPTY_STATE, false));
|
||||||
return null;
|
return null;
|
||||||
}).when(clusterClient).health(any(), any());
|
}).when(clusterClient).health(any(), any());
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ public class TransformInternalIndexTests extends ESTestCase {
|
||||||
doAnswer(invocationOnMock -> {
|
doAnswer(invocationOnMock -> {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
ActionListener<ClusterHealthResponse> listener = (ActionListener<ClusterHealthResponse>) invocationOnMock.getArguments()[1];
|
||||||
listener.onResponse(new ClusterHealthResponse());
|
listener.onResponse(new ClusterHealthResponse("", new String[]{}, ClusterState.EMPTY_STATE, false));
|
||||||
return null;
|
return null;
|
||||||
}).when(clusterClient).health(any(), any());
|
}).when(clusterClient).health(any(), any());
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue