Don't replicate document request when using shadow replicas

This commit is contained in:
Lee Hinman 2015-02-13 11:38:09 -07:00
parent 213292e067
commit e5bc047d7c
2 changed files with 25 additions and 10 deletions

View file

@ -45,11 +45,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException; import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
@ -631,9 +631,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} }
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest()); final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// nocommit - this doesn't replicate mappings changes, so can fail if mappings are not predefined
// It was successful on the replica, although we never
// actually executed
state.onReplicaSuccess();
return;
}
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) { if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId); final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) { transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override @Override
public void handleResponse(TransportResponse.Empty vResponse) { public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess(); state.onReplicaSuccess();

View file

@ -25,13 +25,15 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Test; import org.junit.Test;
import java.nio.file.Path; import java.nio.file.Path;
@ -70,7 +72,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build(); .build();
prepareCreate(IDX).setSettings(idxSettings).get(); prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX); ensureGreen(IDX);
// So basically, the primary should fail and the replica will need to // So basically, the primary should fail and the replica will need to
@ -181,7 +183,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build(); .build();
prepareCreate(IDX).setSettings(idxSettings).get(); prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX); ensureGreen(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get(); client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get(); client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
@ -225,13 +227,13 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build(); .build();
prepareCreate(IDX).setSettings(idxSettings).get(); prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX); ensureGreen(IDX);
int docCount = randomIntBetween(10, 100); int docCount = randomIntBetween(10, 100);
List<IndexRequestBuilder> builders = newArrayList(); List<IndexRequestBuilder> builders = newArrayList();
for (int i = 0; i < docCount; i++) { for (int i = 0; i < docCount; i++) {
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("body", "foo")); builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
} }
indexRandom(true, true, true, builders); indexRandom(true, true, true, builders);
flushAndRefresh(IDX); flushAndRefresh(IDX);
@ -277,7 +279,7 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, false) .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, false)
.build(); .build();
prepareCreate(IDX).setSettings(idxSettings).get(); prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX); ensureGreen(IDX);
IndexRequestBuilder[] builders = new IndexRequestBuilder[between(10, 20)]; IndexRequestBuilder[] builders = new IndexRequestBuilder[between(10, 20)];