Cluster health API should return RED status (on applicable levels) when an index has not recovered from the gateway, closes #507.

This commit is contained in:
kimchy 2010-11-09 21:25:22 +02:00
parent 4695f93276
commit 4c6af6afa5
7 changed files with 30 additions and 14 deletions

View file

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RoutingTableValidation;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportService;
@ -166,9 +167,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
response.numberOfNodes = clusterState.nodes().size();
response.numberOfDataNodes = clusterState.nodes().dataNodes().size();
request.indices(clusterState.metaData().concreteIndices(request.indices()));
for (String index : request.indices()) {
for (String index : clusterState.metaData().concreteIndices(request.indices())) {
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
IndexMetaData indexMetaData = clusterState.metaData().index(index);
if (indexRoutingTable == null) {
@ -219,6 +219,10 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
indexHealth.status = ClusterHealthStatus.GREEN;
if (!indexHealth.validationFailures().isEmpty()) {
indexHealth.status = ClusterHealthStatus.RED;
} else if (clusterState.blocks().hasIndexBlock(indexHealth.index(), GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
indexHealth.status = ClusterHealthStatus.RED;
} else if (indexHealth.shards().isEmpty()) { // might be since none has been created yet (two phase index creation)
indexHealth.status = ClusterHealthStatus.RED;
} else {
for (ClusterShardHealth shardHealth : indexHealth) {
if (shardHealth.status() == ClusterHealthStatus.RED) {
@ -245,6 +249,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
response.status = ClusterHealthStatus.GREEN;
if (!response.validationFailures().isEmpty()) {
response.status = ClusterHealthStatus.RED;
} else if (clusterState.blocks().hasGlobalBlock(GatewayService.NOT_RECOVERED_FROM_GATEWAY_BLOCK)) {
response.status = ClusterHealthStatus.RED;
} else {
for (ClusterIndexHealth indexHealth : response) {
if (indexHealth.status() == ClusterHealthStatus.RED) {

View file

@ -224,6 +224,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
@Override public void clusterStateProcessed(ClusterState clusterState) {
if (request.state == State.CLOSE) { // no need to do shard allocated when closed...
listener.onResponse(new Response(true, clusterState.metaData().index(request.index)));
return;
}
clusterService.submitStateUpdateTask("reroute after index [" + request.index + "] creation", new ProcessedClusterStateUpdateTask() {

View file

@ -112,6 +112,9 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
updateClusterStateBlockedOnNotRecovered();
logger.debug("not recovering from gateway, recover_after_time [{}]", recoverAfterTime);
} else {
// first update the state that its blocked for not recovered, and then let recovery take its place
// that way, we can wait till it is resolved
updateClusterStateBlockedOnNotRecovered();
performStateRecovery(initialStateTimeout);
}
}
@ -179,11 +182,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
clusterService.submitStateUpdateTask("remove-index-block (all primary shards active for [" + index + "])", new ClusterStateUpdateTask() {
@Override public ClusterState execute(ClusterState currentState) {
// check if the block was removed...
if (!currentState.blocks().indices().containsKey(index)) {
return currentState;
}
// check if the block was removed...
if (!currentState.blocks().indices().get(index).contains(GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
if (!currentState.blocks().hasIndexBlock(index, GatewayService.INDEX_NOT_RECOVERED_BLOCK)) {
return currentState;
}
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());

View file

@ -325,6 +325,9 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem
try {
snapshot("scheduled");
} catch (Exception e) {
if (indexShard.state() == IndexShardState.CLOSED) {
return;
}
logger.warn("failed to snapshot (scheduled)", e);
}
}

View file

@ -20,6 +20,7 @@
package org.elasticsearch.test.integration.gateway.local;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -87,6 +88,11 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
assertThat(stateResponse.state().metaData().index("test").state(), equalTo(IndexMetaData.State.CLOSE));
assertThat(stateResponse.state().routingTable().index("test"), nullValue());
logger.info("--> verifying that the state is green");
health = client("node1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
assertThat(health.status(), equalTo(ClusterHealthStatus.GREEN));
logger.info("--> trying to index into a closed index ...");
try {
client("node1").prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();
@ -103,8 +109,8 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
startNode("node1", settingsBuilder().put("gateway.type", "local").build());
startNode("node2", settingsBuilder().put("gateway.type", "local").build());
logger.info("--> waiting for green status");
health = client("node1").admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForNodes("2").execute().actionGet();
logger.info("--> waiting for two nodes");
health = client("node1").admin().cluster().prepareHealth().setWaitForNodes("2").execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
stateResponse = client("node1").admin().cluster().prepareState().execute().actionGet();

View file

@ -71,17 +71,14 @@ public class ManyIndicesStressTest {
logger.info("starting node...");
node = NodeBuilder.nodeBuilder().settings(settings).node();
Thread.sleep(5000);
ClusterHealthResponse health = node.client().admin().cluster().prepareHealth().setTimeout("5m").setWaitForYellowStatus().execute().actionGet();
logger.info("health: " + health.status());
logger.info("active shards: " + health.activeShards());
logger.info("active primary shards: " + health.activePrimaryShards());
if (health.timedOut()) {
logger.error("Timed out on health...");
}
Thread.sleep(30000);
ClusterState clusterState = node.client().admin().cluster().prepareState().execute().actionGet().state();
for (int i = 0; i < numberOfIndices; i++) {
if (clusterState.blocks().indices().containsKey("index_" + i)) {

View file

@ -83,7 +83,11 @@ public class HdfsGatewayTests {
assertThat(createIndexResponse.acknowledged(), equalTo(true));
node.close();
node = buildNode().start();
Thread.sleep(500);
logger.info("--> waiting for green status");
ClusterHealthResponse health = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
assertThat(health.timedOut(), equalTo(false));
try {
node.client().admin().indices().create(createIndexRequest("test")).actionGet();
assert false : "index should exists";