first cut at catchup from primary

make flush to a refresh
factor our ShadowIndexShard to have IndexShard be idential to the master and least intrusive

cleanup abstractions
This commit is contained in:
Simon Willnauer 2015-02-12 12:41:32 +01:00
parent 4a367c0750
commit f2297199b7
14 changed files with 377 additions and 50 deletions

View file

@ -789,7 +789,12 @@ public class IndexMetaData {
// NOCOMMIT find a good place for this and document it
public static boolean usesSharedFilesystem(Settings settings) {
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false));
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings));
}
// NOCOMMIT find a good place for this and document it
public static boolean isIndexUsingShadowReplicas(Settings settings) {
return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false);
}
}

View file

@ -54,8 +54,7 @@ public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasab
}
/**
* Releases the current snapshot, returning <code>true</code> if it was
* actually released.
* Releases the current snapshot.
*/
public void close() {
deletionPolicy.close(getGeneration());

View file

@ -118,6 +118,7 @@ public class ShadowEngine extends Engine {
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
logger.debug("cowardly refusing to FLUSH");
// reread the last committed segment infos
refresh("flush");
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {

View file

@ -34,8 +34,10 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -46,17 +48,16 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
@ -104,7 +105,10 @@ import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.InternalIndicesLifecycle;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import org.elasticsearch.threadpool.ThreadPool;
@ -114,6 +118,7 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -151,21 +156,19 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexService indexService;
private final ShardSuggestService shardSuggestService;
private final ShardBitsetFilterCache shardBitsetFilterCache;
private final MappingUpdatedAction mappingUpdatedAction;
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private final EngineConfig config;
private final EngineFactory engineFactory;
private long checkIndexTook = 0;
private volatile EngineFactory engineFactory;
private volatile IndexShardState state;
private TimeValue refreshInterval;
private volatile ScheduledFuture refreshScheduledFuture;
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;
protected volatile ShardRouting shardRouting;
@Nullable
private RecoveryState recoveryState;
@ -190,9 +193,8 @@ public class IndexShard extends AbstractIndexShardComponent {
CodecService codecService, ShardTermVectorsService termVectorsService,
IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, AnalysisService analysisService,
SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
MappingUpdatedAction mappingUpdatedAction) {
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy,
SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
@ -223,7 +225,6 @@ public class IndexShard extends AbstractIndexShardComponent {
this.codecService = codecService;
this.shardSuggestService = shardSuggestService;
this.shardBitsetFilterCache = shardBitsetFilterCache;
this.mappingUpdatedAction = mappingUpdatedAction;
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
indexSettingsService.addListener(applyRefreshSettings);
@ -323,17 +324,6 @@ public class IndexShard extends AbstractIndexShardComponent {
if (currentRouting.equals(newRouting)) {
return this;
}
// check for a shadow replica that now needs to be transformed into
// a normal primary
if (currentRouting.primary() == false && // currently a replica
newRouting.primary() == true && // becoming a primary
indexSettings.getAsBoolean(IndexMetaData.SETTING_SHADOW_REPLICAS, false)) {
this.shardRouting = newRouting; // this is important otherwise we will not fail the shard right-away
failShard("can't promote shadow replica to primary",
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
return this;
}
}
if (state == IndexShardState.POST_RECOVERY) {
@ -891,7 +881,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
@ -905,7 +895,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private void verifyStarted() throws IllegalIndexShardStateException {
protected final void verifyStarted() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);

View file

@ -60,7 +60,12 @@ public class IndexShardModule extends AbstractModule {
@Override
protected void configure() {
bind(ShardId.class).toInstance(shardId);
if (useShadowEngine()) {
bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
} else {
bind(IndexShard.class).asEagerSingleton();
}
Class<? extends EngineFactory> engineFactory = DEFAULT_ENGINE_FACTORY_CLASS;
String factorySetting = ENGINE_FACTORY;
if (useShadowEngine()) {

View file

@ -0,0 +1,152 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.ShadowEngine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.recovery.RecoveryFailedException;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.CountDownLatch;
/**
*/ //nocommit document this!
public class ShadowIndexShard extends IndexShard {
private final RecoveryTarget recoveryTarget;
private final ClusterService clusterService;
@Inject
public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory, RecoveryTarget recoveryTarget, ClusterService clusterService) {
super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler, translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indexingService, getService, searchService, shardWarmerService, shardFilterCache, shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService, termVectorsService, indexFieldDataService, indexService, shardSuggestService, shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService, mergePolicyProvider, factory);
this.recoveryTarget = recoveryTarget;
this.clusterService = clusterService;
}
public void flush(FlushRequest request) throws ElasticsearchException {
if (state() == IndexShardState.STARTED) {
syncFilesFromPrimary();
}
super.flush(request);
}
private void syncFilesFromPrimary() {
final ShardRouting shardRouting = routingEntry();
if (IndexMetaData.usesSharedFilesystem(indexSettings()) == false && shardRouting.primary() == false) {
// nocommit - we are running a full recovery here I wonder if we should do this only do this if request.waitIfOngoing() == true? Or if we need a new parameter?
// I also wonder if we want to have an infrastructure for this instead that communicates with the primary etc?
ClusterState state = clusterService.state();
final CountDownLatch latch = new CountDownLatch(1);
DiscoveryNode sourceNode = IndicesClusterStateService.findSourceNodeForPeerRecovery(state.routingTable(), state.nodes(), shardRouting, logger);
if (sourceNode != null) {
assert engine() instanceof ShadowEngine;
recoveryTarget.startFileSync(this, sourceNode, new RecoveryTarget.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
latch.countDown();
logger.info("shadow replica catchup done {}", state);
// nocommit
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
latch.countDown();
logger.warn(" failed to catch up shadow replica can't find source node", e);
//nocommit
}
});
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
} else {
logger.warn(" failed to catch up shadow replica can't find source node", shardId);
}
}
}
@Override
public IndexShard routingEntry(ShardRouting newRouting) {
ShardRouting shardRouting = this.routingEntry();
super.routingEntry(newRouting);
// check for a shadow replica that now needs to be transformed into
// a normal primary today we simply fail it to force reallocation
if (shardRouting != null && shardRouting.primary() == false && // currently a replica
newRouting.primary() == true) {// becoming a primary
failShard("can't promote shadow replica to primary",
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
}
return this;
}
@Override
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
if (recoveryState.getType() == RecoveryState.Type.FILE_SYNC) {
logger.debug("skipping recovery finalization file sync runs on a started engine");
} else {
super.performRecoveryFinalization(withFlush, recoveryState);
}
}
}

View file

@ -44,6 +44,7 @@ import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressedString;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -98,7 +99,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// a list of shards that failed during recovery
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
private final NodeEnvironment nodeEnvironment;
static class FailedShard {
public final long version;
@ -114,15 +114,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
private final boolean sendRefreshMapping;
private final AtomicLong recoveryIdGenerator = new AtomicLong();
@Inject
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget,
ShardStateAction shardStateAction,
NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingRefreshAction nodeMappingRefreshAction,
NodeEnvironment nodeEnvironment) {
NodeMappingRefreshAction nodeMappingRefreshAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
@ -133,7 +131,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true);
this.nodeEnvironment = nodeEnvironment;
}
@Override
@ -573,7 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
// we have an ongoing recovery, find the source based on current routing and compare them
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting, logger);
if (!recoveryState.getSourceNode().equals(sourceNode)) {
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])",
shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
@ -669,7 +666,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
// if we're in peer recovery, try to find out the source node now so in case it fails, we will not create the index shard
DiscoveryNode sourceNode = null;
if (isPeerRecovery(shardRouting)) {
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting, logger);
if (sourceNode == null) {
logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
return;
@ -767,7 +764,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
* check if its needed or not.
*/
private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
public static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting, ESLogger logger) {
//nocommit factor this out somewhere useful
DiscoveryNode sourceNode = null;
if (!shardRouting.primary()) {
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());

View file

@ -0,0 +1,56 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
/**
* A recovery handler that skips phase 2 and 3 of the recovery
*/
public class FileSyncRecoveryHandler extends ShardRecoveryHandler {
private IndexShard shard;
private final StartRecoveryRequest request;
public FileSyncRecoveryHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
this.shard = shard;
this.request = request;
}
@Override
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
logger.trace("{} recovery [phase2] to {}: skipping translog for file sync", request.shardId(), request.targetNode());
}
@Override
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
logger.trace("{} recovery [phase3] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode());
return 0;
}
}

View file

@ -29,9 +29,12 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesLifecycle;
@ -117,15 +120,21 @@ public class RecoverySource extends AbstractComponent {
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode());
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
if (!targetShardRouting.initializing()) {
if (request.recoveryType() != RecoveryState.Type.FILE_SYNC && !targetShardRouting.initializing()) {
logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]",
request.shardId(), request.targetNode(), targetShardRouting.state());
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
}
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
final ShardRecoveryHandler handler;
if (request.recoveryType() == RecoveryState.Type.FILE_SYNC) {
logger.trace("{} taking snapshot before file sync recovery", shard.shardId());
Releasables.close(shard.snapshotIndex()); // we have to take a snapshot here and close it right away so we can catchup from the latest docs committed
handler = new FileSyncRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
} else {
handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
}
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);

View file

@ -83,7 +83,8 @@ public class RecoveryState implements ToXContent, Streamable {
GATEWAY((byte) 0),
SNAPSHOT((byte) 1),
REPLICA((byte) 2),
RELOCATION((byte) 3);
RELOCATION((byte) 3),
FILE_SYNC((byte) 4);
private static final Type[] TYPES = new Type[Type.values().length];

View file

@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -134,6 +135,21 @@ public class RecoveryTarget extends AbstractComponent {
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
return;
}
startRecoveryInternal(indexShard, recoveryType, sourceNode, listener);
}
/**
* Runs a full recovery but skips phase 2 and 3 and only syncs the files that have been committed.
*/
public void startFileSync(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
if (indexShard.state() != IndexShardState.STARTED) {
throw new ElasticsearchIllegalStateException("Shard is not started can't sync files -state: " + indexShard.state());
}
logger.trace("{} starting file sync with {}", indexShard.shardId(), sourceNode);
startRecoveryInternal(indexShard, RecoveryState.Type.FILE_SYNC, sourceNode, listener);
}
private void startRecoveryInternal(IndexShard indexShard, RecoveryState.Type recoveryType, DiscoveryNode sourceNode, RecoveryListener listener) {
// create a new recovery status, and process...
RecoveryState recoveryState = new RecoveryState(indexShard.shardId());
recoveryState.setType(recoveryType);
@ -142,7 +158,6 @@ public class RecoveryTarget extends AbstractComponent {
recoveryState.setPrimary(indexShard.routingEntry().primary());
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}
protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {

View file

@ -71,9 +71,9 @@ import java.util.concurrent.atomic.AtomicReference;
* everything relating to copying the segment files as well as sending translog
* operations across the wire once the segments have been copied.
*/
public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
public class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final ESLogger logger;
protected final ESLogger logger;
// Shard that is going to be recovered (the "source")
private final IndexShard shard;
private final String indexName;
@ -375,6 +375,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
*/
@Override
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
if (request.recoveryType() == RecoveryState.Type.FILE_SYNC) {
logger.trace("{} recovery [phase2] to {}: skipping translog for file sync", request.shardId(), request.targetNode());
return;
}
if (shard.state() == IndexShardState.CLOSED) {
throw new IndexShardClosedException(request.shardId());
}
@ -430,11 +434,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
StopWatch stopWatch = new StopWatch().start();
final int totalOperations;
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
// Send the translog operations to the target node
int totalOperations = sendSnapshot(snapshot);
totalOperations = sendSnapshot(snapshot);
cancellableThreads.execute(new Interruptable() {
@Override
@ -538,7 +543,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
*
* @return the total number of translog operations that were sent
*/
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
int ops = 0;
long size = 0;
int totalOperations = 0;

View file

@ -23,15 +23,15 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.junit.Ignore;
import org.junit.Test;
import java.nio.file.Path;
@ -262,4 +262,92 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
assertPathHasBeenCleared(dataPath);
}
public void testSimpleNonSharedFS() throws Exception {
internalCluster().startNodesAsync(3).get();
final String IDX = "test";
final Path dataPath = newTempDirPath();
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 2))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, false)
.build();
prepareCreate(IDX).setSettings(idxSettings).get();
ensureGreen(IDX);
IndexRequestBuilder[] builders = new IndexRequestBuilder[between(10, 20)];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(IDX, "doc", Integer.toString(i)).setSource("foo", "bar");
}
indexRandom(false, builders);
flush(IDX);
logger.info("--> performing query");
for (int i = 0; i < 10; i++) {
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, builders.length);
}
logger.info("--> restarting all nodes");
if (randomBoolean()) {
logger.info("--> rolling restart");
internalCluster().rollingRestart();
} else {
logger.info("--> full restart");
internalCluster().fullRestart();
}
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ensureGreen(IDX);
logger.info("--> performing query");
for (int i = 0; i < 10; i++) {
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, builders.length);
for (SearchHit hit : resp.getHits().getHits()) {
assertEquals(hit.sourceAsMap().get("foo"), "bar");
}
}
// we reindex and check if the new updated docs are all available on the primary
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(IDX, "doc", Integer.toString(i)).setSource("foo", "foobar");
}
indexRandom(false, builders);
// only refresh no flush
refresh();
ClusterState state = client().admin().cluster().prepareState().get().getState();
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{IDX}, false);
ShardRouting primary = shardIterators.iterator().next().nextOrNull();
for (int i = 0; i < 10; i++) {
SearchResponse resp = client().prepareSearch(IDX).setExplain(true).setQuery(matchAllQuery()).get();
assertHitCount(resp, builders.length);
for (SearchHit hit : resp.getHits().getHits()) {
if (hit.shard().getNodeId().equals(primary.currentNodeId())) {
// this comes from the primary so we have the latest
assertEquals(hit.sourceAsMap().get("foo"), "foobar");
} else {
// this comes from the replica not caught up yet
assertEquals(hit.sourceAsMap().get("foo"), "bar");
}
}
}
flush(IDX);
logger.info("--> performing query");
for (int i = 0; i < 10; i++) {
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, builders.length);
for (SearchHit hit : resp.getHits().getHits()) {
assertEquals(hit.sourceAsMap().get("foo"), "foobar");
}
}
logger.info("--> deleting index");
assertAcked(client().admin().indices().prepareDelete(IDX));
}
}

View file

@ -26,9 +26,12 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
@ -87,7 +90,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
// When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index
indexShard.engine().snapshotIndex(); // Keep translog for tests that rely on replaying it
Releasables.close(indexShard.engine().snapshotIndex()); // Keep translog for tests that rely on replaying it
logger.info("flush finished in beforeIndexShardClosed");
}
}