remove _primary and _replica shard preferences (#26791)

The shard preference _primary, _replica and its variants were useful
for the asynchronous replication. However, with the current impl, they
are no longer useful and should be removed.

Closes #26335
This commit is contained in:
Nhat 2017-10-08 11:03:06 -04:00 committed by GitHub
parent 9db21cd23f
commit bf4c3642b2
33 changed files with 111 additions and 302 deletions

View file

@ -142,8 +142,8 @@ public class NoopSearchRequestBuilder extends ActionRequestBuilder<SearchRequest
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public NoopSearchRequestBuilder setPreference(String preference) { public NoopSearchRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -146,8 +146,8 @@ public class ClusterSearchShardsRequest extends MasterNodeReadRequest<ClusterSea
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public ClusterSearchShardsRequest preference(String preference) { public ClusterSearchShardsRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -55,8 +55,8 @@ public class ClusterSearchShardsRequestBuilder extends MasterNodeReadOperationRe
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public ClusterSearchShardsRequestBuilder setPreference(String preference) { public ClusterSearchShardsRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -152,8 +152,8 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public GetRequest preference(String preference) { public GetRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -76,8 +76,8 @@ public class GetRequestBuilder extends SingleShardOperationRequestBuilder<GetReq
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public GetRequestBuilder setPreference(String preference) { public GetRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -284,8 +284,8 @@ public class MultiGetRequest extends ActionRequest implements Iterable<MultiGetR
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public MultiGetRequest preference(String preference) { public MultiGetRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -58,8 +58,8 @@ public class MultiGetRequestBuilder extends ActionRequestBuilder<MultiGetRequest
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public MultiGetRequestBuilder setPreference(String preference) { public MultiGetRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -64,8 +64,8 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public MultiGetShardRequest preference(String preference) { public MultiGetShardRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -241,8 +241,8 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public SearchRequest preference(String preference) { public SearchRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -144,8 +144,8 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public SearchRequestBuilder setPreference(String preference) { public SearchRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -59,8 +59,8 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public MultiTermVectorsShardRequest preference(String preference) { public MultiTermVectorsShardRequest preference(String preference) {
this.preference = preference; this.preference = preference;

View file

@ -294,8 +294,7 @@ public class TermVectorsRequest extends SingleShardRequest<TermVectorsRequest> i
/** /**
* Sets the preference to execute the search. Defaults to randomize across * Sets the preference to execute the search. Defaults to randomize across
* shards. Can be set to <tt>_local</tt> to prefer local shards, * shards. Can be set to <tt>_local</tt> to prefer local shards or a custom value,
* <tt>_primary</tt> to execute only on primary shards, or a custom value,
* which guarantees that the same order will be used across different * which guarantees that the same order will be used across different
* requests. * requests.
*/ */

View file

@ -99,8 +99,8 @@ public class TermVectorsRequestBuilder extends ActionRequestBuilder<TermVectorsR
/** /**
* Sets the preference to execute the search. Defaults to randomize across shards. Can be set to * Sets the preference to execute the search. Defaults to randomize across shards. Can be set to
* <tt>_local</tt> to prefer local shards, <tt>_primary</tt> to execute only on primary shards, or * <tt>_local</tt> to prefer local shards or a custom value, which guarantees that the same order
* a custom value, which guarantees that the same order will be used across different requests. * will be used across different requests.
*/ */
public TermVectorsRequestBuilder setPreference(String preference) { public TermVectorsRequestBuilder setPreference(String preference) {
request.preference(preference); request.preference(preference);

View file

@ -441,74 +441,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, primaryAsList); return new PlainShardIterator(shardId, primaryAsList);
} }
public ShardIterator primaryActiveInitializingShardIt() {
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
return primaryShardIt();
}
public ShardIterator primaryFirstActiveInitializingShardsIt() {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion
for (ShardRouting shardRouting : shuffler.shuffle(activeShards)) {
ordered.add(shardRouting);
if (shardRouting.primary()) {
// switch, its the matching node id
ordered.set(ordered.size() - 1, ordered.get(0));
ordered.set(0, shardRouting);
}
}
// no need to worry about primary first here..., its temporal
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator replicaActiveInitializingShardIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
LinkedList<ShardRouting> ordered = new LinkedList<>();
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.addFirst(replica);
} else if (replica.initializing()) {
ordered.addLast(replica);
}
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator replicaFirstActiveInitializingShardsIt() {
// If the primaries are unassigned, return an empty list (there aren't
// any replicas to query anyway)
if (noPrimariesActive()) {
return new PlainShardIterator(shardId, NO_SHARDS);
}
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
// fill it in a randomized fashion with the active replicas
for (ShardRouting replica : shuffler.shuffle(replicas)) {
if (replica.active()) {
ordered.add(replica);
}
}
// Add the primary shard
ordered.add(primary);
// Add initializing shards last
if (!allInitializingShards.isEmpty()) {
ordered.addAll(allInitializingShards);
}
return new PlainShardIterator(shardId, ordered);
}
public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) { public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size()); ArrayList<ShardRouting> ordered = new ArrayList<>(activeShards.size() + allInitializingShards.size());
int seed = shuffler.nextSeed(); int seed = shuffler.nextSeed();

View file

@ -198,14 +198,6 @@ public class OperationRouting extends AbstractComponent {
return indexShard.preferNodeActiveInitializingShardsIt(nodesIds); return indexShard.preferNodeActiveInitializingShardsIt(nodesIds);
case LOCAL: case LOCAL:
return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId)); return indexShard.preferNodeActiveInitializingShardsIt(Collections.singleton(localNodeId));
case PRIMARY:
return indexShard.primaryActiveInitializingShardIt();
case REPLICA:
return indexShard.replicaActiveInitializingShardIt();
case PRIMARY_FIRST:
return indexShard.primaryFirstActiveInitializingShardsIt();
case REPLICA_FIRST:
return indexShard.replicaFirstActiveInitializingShardsIt();
case ONLY_LOCAL: case ONLY_LOCAL:
return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId);
case ONLY_NODES: case ONLY_NODES:

View file

@ -39,26 +39,6 @@ public enum Preference {
*/ */
LOCAL("_local"), LOCAL("_local"),
/**
* Route to primary shards
*/
PRIMARY("_primary"),
/**
* Route to replica shards
*/
REPLICA("_replica"),
/**
* Route to primary shards first
*/
PRIMARY_FIRST("_primary_first"),
/**
* Route to replica shards first
*/
REPLICA_FIRST("_replica_first"),
/** /**
* Route to the local shard only * Route to the local shard only
*/ */
@ -97,16 +77,6 @@ public enum Preference {
return PREFER_NODES; return PREFER_NODES;
case "_local": case "_local":
return LOCAL; return LOCAL;
case "_primary":
return PRIMARY;
case "_replica":
return REPLICA;
case "_primary_first":
case "_primaryFirst":
return PRIMARY_FIRST;
case "_replica_first":
case "_replicaFirst":
return REPLICA_FIRST;
case "_only_local": case "_only_local":
case "_onlyLocal": case "_onlyLocal":
return ONLY_LOCAL; return ONLY_LOCAL;

View file

@ -50,6 +50,7 @@ import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
@ -415,10 +416,6 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
} }
public void testReplicaShardPreferenceIters() throws Exception { public void testReplicaShardPreferenceIters() throws Exception {
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10)
.build());
OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, OperationRouting operationRouting = new OperationRouting(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
@ -430,69 +427,22 @@ public class RoutingIteratorTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test")) .addAsNew(metaData.index("test"))
.build(); .build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metaData(metaData).routingTable(routingTable).build(); final ClusterState clusterState = ClusterState
.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder()
.add(newNode("node1"))
.add(newNode("node2"))
.add(newNode("node3"))
.localNodeId("node1"))
.build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() String[] removedPreferences = {"_primary", "_primary_first", "_replica", "_replica_first"};
.add(newNode("node1")) for (String pref : removedPreferences) {
.add(newNode("node2")) expectThrows(IllegalArgumentException.class,
.add(newNode("node3")) () -> operationRouting.searchShards(clusterState, new String[]{"test"}, null, pref));
.localNodeId("node1") }
).build();
clusterState = strategy.reroute(clusterState, "reroute");
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
// When replicas haven't initialized, it comes back with the primary first, then initializing replicas
GroupShardsIterator<ShardIterator> shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
ShardIterator iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
ShardRouting routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary()); // replicas haven't initialized yet, so primary is first
assertTrue(routing.started());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
assertTrue(routing.initializing());
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
clusterState = strategy.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING));
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(2)); // two potential replicas for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
shardIterators = operationRouting.searchShards(clusterState, new String[]{"test"}, null, "_replica_first");
assertThat(shardIterators.size(), equalTo(2)); // two potential shards
iter = shardIterators.iterator().next();
assertThat(iter.size(), equalTo(3)); // three potential candidates for the shard
routing = iter.nextOrNull();
assertNotNull(routing);
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertFalse(routing.primary());
// finally the primary
routing = iter.nextOrNull();
assertThat(routing.shardId().id(), anyOf(equalTo(0), equalTo(1)));
assertTrue(routing.primary());
} }
} }

View file

@ -33,6 +33,7 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.cli.MockTerminal; import org.elasticsearch.cli.MockTerminal;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIterator;
@ -210,7 +211,10 @@ public class TruncateTranslogIT extends ESIntegTestCase {
logger.info("--> starting the replica node to test recovery"); logger.info("--> starting the replica node to test recovery");
internalCluster().startNode(); internalCluster().startNode();
ensureGreen("test"); ensureGreen("test");
assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), numDocsToKeep); for (String node : internalCluster().nodesInclude("test")) {
SearchRequestBuilder q = client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery());
assertHitCount(q.get(), numDocsToKeep);
}
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()
.filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get(); .filter(recoveryState -> recoveryState.getPrimary() == false).findFirst().get();
@ -308,7 +312,9 @@ public class TruncateTranslogIT extends ESIntegTestCase {
logger.info("--> starting the replica node to test recovery"); logger.info("--> starting the replica node to test recovery");
internalCluster().startNode(); internalCluster().startNode();
ensureGreen("test"); ensureGreen("test");
assertHitCount(client().prepareSearch("test").setPreference("_replica").setQuery(matchAllQuery()).get(), totalDocs); for (String node : internalCluster().nodesInclude("test")) {
assertHitCount(client().prepareSearch("test").setPreference("_only_nodes:" + node).setQuery(matchAllQuery()).get(), totalDocs);
}
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get(); final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setActiveOnly(false).get();
final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream() final RecoveryState replicaRecoveryState = recoveryResponse.shardRecoveryStates().get("test").stream()

View file

@ -406,8 +406,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
} }
}); });
// Wait for document to be indexed on primary assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").get().isExists()));
assertBusy(() -> assertTrue(client().prepareGet("index", "type", "1").setPreference("_primary").get().isExists()));
// The mappings have not been propagated to the replica yet as a consequence the document count not be indexed // The mappings have not been propagated to the replica yet as a consequence the document count not be indexed
// We wait on purpose to make sure that the document is not indexed because the shard operation is stalled // We wait on purpose to make sure that the document is not indexed because the shard operation is stalled

View file

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
@ -73,13 +74,21 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
logger.info("using preference {}", preference); logger.info("using preference {}", preference);
// we want to make sure that while recovery happens, and a replica gets recovered, its properly refreshed // we want to make sure that while recovery happens, and a replica gets recovered, its properly refreshed
ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();; ClusterHealthStatus status = client().admin().cluster().prepareHealth("test").get().getStatus();
while (status != ClusterHealthStatus.GREEN) { while (status != ClusterHealthStatus.GREEN) {
// first, verify that search on the primary search works // first, verify that search on the primary search works
SearchResponse searchResponse = client().prepareSearch("test").setPreference("_primary").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); for (IndexShardRoutingTable shardRoutingTable : clusterService().state().routingTable().index("test")) {
assertHitCount(searchResponse, 1); String primaryNode = shardRoutingTable.primaryShard().currentNodeId();
SearchResponse searchResponse = client().prepareSearch("test")
.setPreference("_only_nodes:" + primaryNode)
.setQuery(QueryBuilders.termQuery("field", "test"))
.execute().actionGet();
assertHitCount(searchResponse, 1);
break;
}
Client client = client(); Client client = client();
searchResponse = client.prepareSearch("test").setPreference(preference + Integer.toString(counter++)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); SearchResponse searchResponse = client.prepareSearch("test").setPreference(preference + Integer.toString(counter++)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
if (searchResponse.getHits().getTotalHits() != 1) { if (searchResponse.getHits().getTotalHits() != 1) {
refresh(); refresh();
SearchResponse searchResponseAfterRefresh = client.prepareSearch("test").setPreference(preference).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); SearchResponse searchResponseAfterRefresh = client.prepareSearch("test").setPreference(preference).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
@ -93,8 +102,13 @@ public class SearchWhileCreatingIndexIT extends ESIntegTestCase {
status = client().admin().cluster().prepareHealth("test").get().getStatus(); status = client().admin().cluster().prepareHealth("test").get().getStatus();
internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1); internalCluster().ensureAtLeastNumDataNodes(numberOfReplicas + 1);
} }
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
assertHitCount(searchResponse, 1); for (String node : internalCluster().nodesInclude("test")) {
SearchResponse searchResponse = client().prepareSearch("test")
.setPreference("_prefer_nodes:" + node)
.setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
assertHitCount(searchResponse, 1);
}
cluster().wipeIndices("test"); cluster().wipeIndices("test");
} }
} }

View file

@ -301,7 +301,6 @@ public class MatchedQueriesIT extends ESIntegTestCase {
.should(queryStringQuery("dolor").queryName("dolor")) .should(queryStringQuery("dolor").queryName("dolor"))
.should(queryStringQuery("elit").queryName("elit")) .should(queryStringQuery("elit").queryName("elit"))
) )
.setPreference("_primary")
.get(); .get();
assertHitCount(searchResponse, 2L); assertHitCount(searchResponse, 2L);

View file

@ -107,7 +107,7 @@ public class RandomScoreFunctionIT extends ESIntegTestCase {
for (int o = 0; o < outerIters; o++) { for (int o = 0; o < outerIters; o++) {
final int seed = randomInt(); final int seed = randomInt();
String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!! String preference = randomRealisticUnicodeOfLengthBetween(1, 10); // at least one char!!
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary) // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
while (preference.startsWith("_")) { while (preference.startsWith("_")) {
preference = randomRealisticUnicodeOfLengthBetween(1, 10); preference = randomRealisticUnicodeOfLengthBetween(1, 10);
} }

View file

@ -44,7 +44,6 @@ import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@ -67,7 +66,7 @@ public class SearchPreferenceIT extends ESIntegTestCase {
refresh(); refresh();
internalCluster().stopRandomDataNode(); internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).execute().actionGet(); client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).execute().actionGet();
String[] preferences = new String[] {"_primary", "_local", "_primary_first", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2"}; String[] preferences = new String[]{"_local", "_prefer_nodes:somenode", "_prefer_nodes:server2", "_prefer_nodes:somenode,server2"};
for (String pref : preferences) { for (String pref : preferences) {
logger.info("--> Testing out preference={}", pref); logger.info("--> Testing out preference={}", pref);
SearchResponse searchResponse = client().prepareSearch().setSize(0).setPreference(pref).execute().actionGet(); SearchResponse searchResponse = client().prepareSearch().setSize(0).setPreference(pref).execute().actionGet();
@ -113,54 +112,14 @@ public class SearchPreferenceIT extends ESIntegTestCase {
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet(); client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
refresh(); refresh();
SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L)); assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet(); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_local").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L)); assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_primary").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet(); searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L)); assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
searchResponse = client().prepareSearch().setQuery(matchAllQuery()).setPreference("1234").execute().actionGet();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(1L));
}
public void testReplicaPreference() throws Exception {
client().admin().indices().prepareCreate("test").setSettings("{\"number_of_replicas\": 0}", XContentType.JSON).get();
ensureGreen();
client().prepareIndex("test", "type1").setSource("field1", "value1").execute().actionGet();
refresh();
try {
client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
fail("should have failed because there are no replicas");
} catch (Exception e) {
// pass
}
SearchResponse resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica_first").execute().actionGet();
assertThat(resp.getHits().getTotalHits(), equalTo(1L));
client().admin().indices().prepareUpdateSettings("test").setSettings("{\"number_of_replicas\": 1}", XContentType.JSON).get();
ensureGreen("test");
resp = client().prepareSearch().setQuery(matchAllQuery()).setPreference("_replica").execute().actionGet();
assertThat(resp.getHits().getTotalHits(), equalTo(1L));
} }
public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception { public void testThatSpecifyingNonExistingNodesReturnsUsefulError() throws Exception {

View file

@ -134,14 +134,12 @@ public class QueryProfilerIT extends ESIntegTestCase {
.setQuery(q) .setQuery(q)
.setProfile(false) .setProfile(false)
.addSort("_id", SortOrder.ASC) .addSort("_id", SortOrder.ASC)
.setPreference("_primary")
.setSearchType(SearchType.QUERY_THEN_FETCH); .setSearchType(SearchType.QUERY_THEN_FETCH);
SearchRequestBuilder profile = client().prepareSearch("test") SearchRequestBuilder profile = client().prepareSearch("test")
.setQuery(q) .setQuery(q)
.setProfile(true) .setProfile(true)
.addSort("_id", SortOrder.ASC) .addSort("_id", SortOrder.ASC)
.setPreference("_primary")
.setSearchType(SearchType.QUERY_THEN_FETCH); .setSearchType(SearchType.QUERY_THEN_FETCH);
MultiSearchResponse.Item[] responses = client().prepareMultiSearch() MultiSearchResponse.Item[] responses = client().prepareMultiSearch()

View file

@ -79,7 +79,7 @@ public class SimpleSearchIT extends ESIntegTestCase {
int iters = scaledRandomIntBetween(10, 20); int iters = scaledRandomIntBetween(10, 20);
for (int i = 0; i < iters; i++) { for (int i = 0; i < iters; i++) {
String randomPreference = randomUnicodeOfLengthBetween(0, 4); String randomPreference = randomUnicodeOfLengthBetween(0, 4);
// randomPreference should not start with '_' (reserved for known preference types (e.g. _shards, _primary) // randomPreference should not start with '_' (reserved for known preference types (e.g. _shards)
while (randomPreference.startsWith("_")) { while (randomPreference.startsWith("_")) {
randomPreference = randomUnicodeOfLengthBetween(0, 4); randomPreference = randomUnicodeOfLengthBetween(0, 4);
} }

View file

@ -275,10 +275,6 @@ replicas.
The `preference` can be set to: The `preference` can be set to:
`_primary`::
The operation will go and be executed only on the primary
shards.
`_local`:: `_local`::
The operation will prefer to be executed on a local The operation will prefer to be executed on a local
allocated shard if possible. allocated shard if possible.

View file

@ -91,8 +91,7 @@ will control the version of the document the operation is intended to be
executed against. A good example of a use case for versioning is executed against. A good example of a use case for versioning is
performing a transactional read-then-update. Specifying a `version` from performing a transactional read-then-update. Specifying a `version` from
the document initially read ensures no changes have happened in the the document initially read ensures no changes have happened in the
meantime (when reading in order to update, it is recommended to set meantime. For example:
`preference` to `_primary`). For example:
[source,js] [source,js]
-------------------------------------------------- --------------------------------------------------
@ -242,7 +241,7 @@ The result of the above index operation is:
[[index-routing]] [[index-routing]]
=== Routing === Routing
By default, shard placement — or `routing` — is controlled by using a By default, shard placement ? or `routing` ? is controlled by using a
hash of the document's id value. For more explicit control, the value hash of the document's id value. For more explicit control, the value
fed into the hash function used by the router can be directly specified fed into the hash function used by the router can be directly specified
on a per-operation basis using the `routing` parameter. For example: on a per-operation basis using the `routing` parameter. For example:

View file

@ -6,8 +6,11 @@
Due to cross-cluster search using `:` to separate a cluster and index name, Due to cross-cluster search using `:` to separate a cluster and index name,
cluster names may no longer contain `:`. cluster names may no longer contain `:`.
==== new default for `wait_for_active_shards` parameter of the open index command ==== New default for `wait_for_active_shards` parameter of the open index command
The default value for the `wait_for_active_shards` parameter of the open index API The default value for the `wait_for_active_shards` parameter of the open index API
is changed from 0 to 1, which means that the command will now by default wait for all is changed from 0 to 1, which means that the command will now by default wait for all
primary shards of the opened index to be allocated. primary shards of the opened index to be allocated.
==== Shard preferences `_primary`, `_primary_first`, `_replica`, and `_replica_first` are removed
These shard preferences are removed in favour of the `_prefer_nodes` and `_only_nodes` preferences.

View file

@ -7,21 +7,6 @@ search. By default, the operation is randomized among the available shard copies
The `preference` is a query string parameter which can be set to: The `preference` is a query string parameter which can be set to:
[horizontal] [horizontal]
`_primary`::
The operation will go and be executed only on the primary
shards.
`_primary_first`::
The operation will go and be executed on the primary
shard, and if not available (failover), will execute on other shards.
`_replica`::
The operation will go and be executed only on a replica shard.
`_replica_first`::
The operation will go and be executed only on a replica shard, and if
not available (failover), will execute on other shards.
`_local`:: `_local`::
The operation will prefer to be executed on a local The operation will prefer to be executed on a local
allocated shard if possible. allocated shard if possible.
@ -33,7 +18,7 @@ The `preference` is a query string parameter which can be set to:
`_shards:2,3`:: `_shards:2,3`::
Restricts the operation to the specified shards. (`2` Restricts the operation to the specified shards. (`2`
and `3` in this case). This preference can be combined with other and `3` in this case). This preference can be combined with other
preferences but it has to appear first: `_shards:2,3|_primary` preferences but it has to appear first: `_shards:2,3|_local`
`_only_nodes`:: `_only_nodes`::
Restricts the operation to nodes specified in <<cluster,node specification>> Restricts the operation to nodes specified in <<cluster,node specification>>

View file

@ -25,6 +25,7 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -37,12 +38,15 @@ import org.elasticsearch.test.rest.yaml.ObjectPath;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -227,17 +231,15 @@ public class FullClusterRestartIT extends ESRestTestCase {
Map<String, Object> recoverRsp = toMap(client().performRequest("GET", "/" + index + "/_recovery")); Map<String, Object> recoverRsp = toMap(client().performRequest("GET", "/" + index + "/_recovery"));
logger.debug("--> recovery status:\n{}", recoverRsp); logger.debug("--> recovery status:\n{}", recoverRsp);
Map<String, Object> responseBody = toMap(client().performRequest("GET", "/" + index + "/_search", Set<Integer> counts = new HashSet<>();
Collections.singletonMap("preference", "_primary"))); for (String node : dataNodes(index, client())) {
assertNoFailures(responseBody); Map<String, Object> responseBody = toMap(client().performRequest("GET", "/" + index + "/_search",
int foundHits1 = (int) XContentMapValues.extractValue("hits.total", responseBody); Collections.singletonMap("preference", "_only_nodes:" + node)));
assertNoFailures(responseBody);
responseBody = toMap(client().performRequest("GET", "/" + index + "/_search", int hits = (int) XContentMapValues.extractValue("hits.total", responseBody);
Collections.singletonMap("preference", "_replica"))); counts.add(hits);
assertNoFailures(responseBody); }
int foundHits2 = (int) XContentMapValues.extractValue("hits.total", responseBody); assertEquals("All nodes should have a consistent number of documents", 1, counts.size());
assertEquals(foundHits1, foundHits2);
// TODO: do something more with the replicas! index?
} }
} }
@ -940,4 +942,15 @@ public class FullClusterRestartIT extends ESRestTestCase {
logger.debug("Refreshing [{}]", index); logger.debug("Refreshing [{}]", index);
client().performRequest("POST", "/" + index + "/_refresh"); client().performRequest("POST", "/" + index + "/_refresh");
} }
private List<String> dataNodes(String index, RestClient client) throws IOException {
Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards"));
List<String> nodes = new ArrayList<>();
List<Object> shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0");
for (Object shard : shardStats) {
final String nodeId = ObjectPath.evaluate(shard, "routing.node");
nodes.add(nodeId);
}
return nodes;
}
} }

View file

@ -171,9 +171,6 @@ public class IndexingIT extends ESRestTestCase {
assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5); assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5);
assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5);
} }
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
assertCount(index, "_primary", 5);
assertCount(index, "_replica", 5);
} }
} }
@ -232,9 +229,10 @@ public class IndexingIT extends ESRestTestCase {
updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1)); updateIndexSetting(index, Settings.builder().put("index.number_of_replicas", 1));
ensureGreen(); ensureGreen();
assertOK(client().performRequest("POST", index + "/_refresh")); assertOK(client().performRequest("POST", index + "/_refresh"));
// the number of documents on the primary and on the recovered replica should match the number of indexed documents
assertCount(index, "_primary", numDocs); for (Shard shard : buildShards(index, nodes, newNodeClient)) {
assertCount(index, "_replica", numDocs); assertCount(index, "_only_nodes:" + shard.node.nodeName, numDocs);
}
assertSeqNoOnShards(index, nodes, numDocs, newNodeClient); assertSeqNoOnShards(index, nodes, numDocs, newNodeClient);
} }
} }

View file

@ -11,8 +11,6 @@
- do: - do:
count: count:
# we count through the primary in case there is a replica that has not yet fully recovered
preference: _primary
index: test_index index: test_index
- match: {count: 2} - match: {count: 2}

View file

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random; import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -52,7 +51,7 @@ public class RandomizingClient extends FilterClient {
SearchType.DFS_QUERY_THEN_FETCH, SearchType.DFS_QUERY_THEN_FETCH,
SearchType.QUERY_THEN_FETCH)); SearchType.QUERY_THEN_FETCH));
if (random.nextInt(10) == 0) { if (random.nextInt(10) == 0) {
defaultPreference = RandomPicks.randomFrom(random, EnumSet.of(Preference.PRIMARY_FIRST, Preference.LOCAL)).type(); defaultPreference = Preference.LOCAL.type();
} else if (random.nextInt(10) == 0) { } else if (random.nextInt(10) == 0) {
String s = TestUtil.randomRealisticUnicodeString(random, 1, 10); String s = TestUtil.randomRealisticUnicodeString(random, 1, 10);
defaultPreference = s.startsWith("_") ? null : s; // '_' is a reserved character defaultPreference = s.startsWith("_") ? null : s; // '_' is a reserved character