mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-30 10:23:41 -04:00
first cut at catchup from primary
make flush to a refresh factor our ShadowIndexShard to have IndexShard be idential to the master and least intrusive cleanup abstractions
This commit is contained in:
parent
4a367c0750
commit
f2297199b7
14 changed files with 377 additions and 50 deletions
|
@ -789,7 +789,12 @@ public class IndexMetaData {
|
||||||
|
|
||||||
// NOCOMMIT find a good place for this and document it
|
// NOCOMMIT find a good place for this and document it
|
||||||
public static boolean usesSharedFilesystem(Settings settings) {
|
public static boolean usesSharedFilesystem(Settings settings) {
|
||||||
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false));
|
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings));
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOCOMMIT find a good place for this and document it
|
||||||
|
public static boolean isIndexUsingShadowReplicas(Settings settings) {
|
||||||
|
return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,8 +54,7 @@ public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasab
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Releases the current snapshot, returning <code>true</code> if it was
|
* Releases the current snapshot.
|
||||||
* actually released.
|
|
||||||
*/
|
*/
|
||||||
public void close() {
|
public void close() {
|
||||||
deletionPolicy.close(getGeneration());
|
deletionPolicy.close(getGeneration());
|
||||||
|
|
|
@ -118,6 +118,7 @@ public class ShadowEngine extends Engine {
|
||||||
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|
||||||
logger.debug("cowardly refusing to FLUSH");
|
logger.debug("cowardly refusing to FLUSH");
|
||||||
// reread the last committed segment infos
|
// reread the last committed segment infos
|
||||||
|
refresh("flush");
|
||||||
try {
|
try {
|
||||||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
|
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
|
|
@ -34,8 +34,10 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
|
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
|
||||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.common.Booleans;
|
import org.elasticsearch.common.Booleans;
|
||||||
|
@ -46,17 +48,16 @@ import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.metrics.MeanMetric;
|
import org.elasticsearch.common.metrics.MeanMetric;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.CancellableThreads;
|
|
||||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.VersionType;
|
import org.elasticsearch.index.VersionType;
|
||||||
import org.elasticsearch.index.aliases.IndexAliasesService;
|
import org.elasticsearch.index.aliases.IndexAliasesService;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
|
||||||
import org.elasticsearch.index.cache.IndexCache;
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
|
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
|
||||||
import org.elasticsearch.index.cache.filter.FilterCacheStats;
|
import org.elasticsearch.index.cache.filter.FilterCacheStats;
|
||||||
|
@ -104,7 +105,10 @@ import org.elasticsearch.index.warmer.WarmerStats;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
import org.elasticsearch.indices.IndicesWarmer;
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
import org.elasticsearch.indices.InternalIndicesLifecycle;
|
||||||
|
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
|
import org.elasticsearch.search.suggest.completion.Completion090PostingsFormat;
|
||||||
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
import org.elasticsearch.search.suggest.completion.CompletionStats;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
@ -114,6 +118,7 @@ import java.io.PrintStream;
|
||||||
import java.nio.channels.ClosedByInterruptException;
|
import java.nio.channels.ClosedByInterruptException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
@ -151,21 +156,19 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
private final IndexService indexService;
|
private final IndexService indexService;
|
||||||
private final ShardSuggestService shardSuggestService;
|
private final ShardSuggestService shardSuggestService;
|
||||||
private final ShardBitsetFilterCache shardBitsetFilterCache;
|
private final ShardBitsetFilterCache shardBitsetFilterCache;
|
||||||
private final MappingUpdatedAction mappingUpdatedAction;
|
|
||||||
private final CancellableThreads cancellableThreads = new CancellableThreads();
|
|
||||||
|
|
||||||
private final Object mutex = new Object();
|
private final Object mutex = new Object();
|
||||||
private final String checkIndexOnStartup;
|
private final String checkIndexOnStartup;
|
||||||
private final EngineConfig config;
|
private final EngineConfig config;
|
||||||
|
private final EngineFactory engineFactory;
|
||||||
private long checkIndexTook = 0;
|
private long checkIndexTook = 0;
|
||||||
private volatile EngineFactory engineFactory;
|
|
||||||
private volatile IndexShardState state;
|
private volatile IndexShardState state;
|
||||||
|
|
||||||
private TimeValue refreshInterval;
|
private TimeValue refreshInterval;
|
||||||
|
|
||||||
private volatile ScheduledFuture refreshScheduledFuture;
|
private volatile ScheduledFuture refreshScheduledFuture;
|
||||||
private volatile ScheduledFuture mergeScheduleFuture;
|
private volatile ScheduledFuture mergeScheduleFuture;
|
||||||
private volatile ShardRouting shardRouting;
|
protected volatile ShardRouting shardRouting;
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
private RecoveryState recoveryState;
|
private RecoveryState recoveryState;
|
||||||
|
@ -190,9 +193,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
CodecService codecService, ShardTermVectorsService termVectorsService,
|
CodecService codecService, ShardTermVectorsService termVectorsService,
|
||||||
IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
|
IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService,
|
||||||
ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
|
ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
|
||||||
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, AnalysisService analysisService,
|
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy,
|
||||||
SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory,
|
SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
|
||||||
MappingUpdatedAction mappingUpdatedAction) {
|
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
|
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
|
||||||
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
|
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
|
||||||
|
@ -223,7 +225,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
this.codecService = codecService;
|
this.codecService = codecService;
|
||||||
this.shardSuggestService = shardSuggestService;
|
this.shardSuggestService = shardSuggestService;
|
||||||
this.shardBitsetFilterCache = shardBitsetFilterCache;
|
this.shardBitsetFilterCache = shardBitsetFilterCache;
|
||||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
|
||||||
state = IndexShardState.CREATED;
|
state = IndexShardState.CREATED;
|
||||||
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
|
this.refreshInterval = indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, EngineConfig.DEFAULT_REFRESH_INTERVAL);
|
||||||
indexSettingsService.addListener(applyRefreshSettings);
|
indexSettingsService.addListener(applyRefreshSettings);
|
||||||
|
@ -323,17 +324,6 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
if (currentRouting.equals(newRouting)) {
|
if (currentRouting.equals(newRouting)) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check for a shadow replica that now needs to be transformed into
|
|
||||||
// a normal primary
|
|
||||||
if (currentRouting.primary() == false && // currently a replica
|
|
||||||
newRouting.primary() == true && // becoming a primary
|
|
||||||
indexSettings.getAsBoolean(IndexMetaData.SETTING_SHADOW_REPLICAS, false)) {
|
|
||||||
this.shardRouting = newRouting; // this is important otherwise we will not fail the shard right-away
|
|
||||||
failShard("can't promote shadow replica to primary",
|
|
||||||
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state == IndexShardState.POST_RECOVERY) {
|
if (state == IndexShardState.POST_RECOVERY) {
|
||||||
|
@ -891,7 +881,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
|
protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
|
||||||
IndexShardState state = this.state; // one time volatile read
|
IndexShardState state = this.state; // one time volatile read
|
||||||
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
|
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
|
||||||
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
|
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
|
||||||
|
@ -905,7 +895,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyStarted() throws IllegalIndexShardStateException {
|
protected final void verifyStarted() throws IllegalIndexShardStateException {
|
||||||
IndexShardState state = this.state; // one time volatile read
|
IndexShardState state = this.state; // one time volatile read
|
||||||
if (state != IndexShardState.STARTED) {
|
if (state != IndexShardState.STARTED) {
|
||||||
throw new IndexShardNotStartedException(shardId, state);
|
throw new IndexShardNotStartedException(shardId, state);
|
||||||
|
|
|
@ -60,7 +60,12 @@ public class IndexShardModule extends AbstractModule {
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bind(ShardId.class).toInstance(shardId);
|
bind(ShardId.class).toInstance(shardId);
|
||||||
|
if (useShadowEngine()) {
|
||||||
|
bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
|
||||||
|
} else {
|
||||||
bind(IndexShard.class).asEagerSingleton();
|
bind(IndexShard.class).asEagerSingleton();
|
||||||
|
}
|
||||||
|
|
||||||
Class<? extends EngineFactory> engineFactory = DEFAULT_ENGINE_FACTORY_CLASS;
|
Class<? extends EngineFactory> engineFactory = DEFAULT_ENGINE_FACTORY_CLASS;
|
||||||
String factorySetting = ENGINE_FACTORY;
|
String factorySetting = ENGINE_FACTORY;
|
||||||
if (useShadowEngine()) {
|
if (useShadowEngine()) {
|
||||||
|
|
|
@ -0,0 +1,152 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.index.shard;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
|
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||||
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.IndexService;
|
||||||
|
import org.elasticsearch.index.aliases.IndexAliasesService;
|
||||||
|
import org.elasticsearch.index.cache.IndexCache;
|
||||||
|
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
|
||||||
|
import org.elasticsearch.index.cache.filter.ShardFilterCache;
|
||||||
|
import org.elasticsearch.index.cache.query.ShardQueryCache;
|
||||||
|
import org.elasticsearch.index.codec.CodecService;
|
||||||
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
import org.elasticsearch.index.engine.EngineException;
|
||||||
|
import org.elasticsearch.index.engine.EngineFactory;
|
||||||
|
import org.elasticsearch.index.engine.ShadowEngine;
|
||||||
|
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||||
|
import org.elasticsearch.index.fielddata.ShardFieldData;
|
||||||
|
import org.elasticsearch.index.get.ShardGetService;
|
||||||
|
import org.elasticsearch.index.indexing.ShardIndexingService;
|
||||||
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
|
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||||
|
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||||
|
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
||||||
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||||
|
import org.elasticsearch.index.search.stats.ShardSearchService;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettingsService;
|
||||||
|
import org.elasticsearch.index.similarity.SimilarityService;
|
||||||
|
import org.elasticsearch.index.store.Store;
|
||||||
|
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
|
||||||
|
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
||||||
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
|
import org.elasticsearch.indices.IndicesWarmer;
|
||||||
|
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
|
import org.elasticsearch.indices.recovery.RecoveryTarget;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/ //nocommit document this!
|
||||||
|
public class ShadowIndexShard extends IndexShard {
|
||||||
|
private final RecoveryTarget recoveryTarget;
|
||||||
|
private final ClusterService clusterService;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService, ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory, RecoveryTarget recoveryTarget, ClusterService clusterService) {
|
||||||
|
super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler, translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService, indexingService, getService, searchService, shardWarmerService, shardFilterCache, shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService, termVectorsService, indexFieldDataService, indexService, shardSuggestService, shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService, mergePolicyProvider, factory);
|
||||||
|
this.recoveryTarget = recoveryTarget;
|
||||||
|
this.clusterService = clusterService;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void flush(FlushRequest request) throws ElasticsearchException {
|
||||||
|
if (state() == IndexShardState.STARTED) {
|
||||||
|
syncFilesFromPrimary();
|
||||||
|
}
|
||||||
|
super.flush(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncFilesFromPrimary() {
|
||||||
|
final ShardRouting shardRouting = routingEntry();
|
||||||
|
if (IndexMetaData.usesSharedFilesystem(indexSettings()) == false && shardRouting.primary() == false) {
|
||||||
|
// nocommit - we are running a full recovery here I wonder if we should do this only do this if request.waitIfOngoing() == true? Or if we need a new parameter?
|
||||||
|
// I also wonder if we want to have an infrastructure for this instead that communicates with the primary etc?
|
||||||
|
ClusterState state = clusterService.state();
|
||||||
|
final CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
DiscoveryNode sourceNode = IndicesClusterStateService.findSourceNodeForPeerRecovery(state.routingTable(), state.nodes(), shardRouting, logger);
|
||||||
|
if (sourceNode != null) {
|
||||||
|
assert engine() instanceof ShadowEngine;
|
||||||
|
recoveryTarget.startFileSync(this, sourceNode, new RecoveryTarget.RecoveryListener() {
|
||||||
|
@Override
|
||||||
|
public void onRecoveryDone(RecoveryState state) {
|
||||||
|
latch.countDown();
|
||||||
|
logger.info("shadow replica catchup done {}", state);
|
||||||
|
// nocommit
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
|
||||||
|
latch.countDown();
|
||||||
|
logger.warn(" failed to catch up shadow replica can't find source node", e);
|
||||||
|
//nocommit
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.warn(" failed to catch up shadow replica can't find source node", shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public IndexShard routingEntry(ShardRouting newRouting) {
|
||||||
|
ShardRouting shardRouting = this.routingEntry();
|
||||||
|
super.routingEntry(newRouting);
|
||||||
|
// check for a shadow replica that now needs to be transformed into
|
||||||
|
// a normal primary today we simply fail it to force reallocation
|
||||||
|
if (shardRouting != null && shardRouting.primary() == false && // currently a replica
|
||||||
|
newRouting.primary() == true) {// becoming a primary
|
||||||
|
failShard("can't promote shadow replica to primary",
|
||||||
|
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void performRecoveryFinalization(boolean withFlush, RecoveryState recoveryState) throws ElasticsearchException {
|
||||||
|
if (recoveryState.getType() == RecoveryState.Type.FILE_SYNC) {
|
||||||
|
logger.debug("skipping recovery finalization file sync runs on a started engine");
|
||||||
|
} else {
|
||||||
|
super.performRecoveryFinalization(withFlush, recoveryState);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||||
import org.elasticsearch.common.compress.CompressedString;
|
import org.elasticsearch.common.compress.CompressedString;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
@ -98,7 +99,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
// a list of shards that failed during recovery
|
// a list of shards that failed during recovery
|
||||||
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
|
// we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
|
||||||
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
|
private final ConcurrentMap<ShardId, FailedShard> failedShards = ConcurrentCollections.newConcurrentMap();
|
||||||
private final NodeEnvironment nodeEnvironment;
|
|
||||||
|
|
||||||
static class FailedShard {
|
static class FailedShard {
|
||||||
public final long version;
|
public final long version;
|
||||||
|
@ -114,15 +114,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
|
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
|
||||||
|
|
||||||
private final boolean sendRefreshMapping;
|
private final boolean sendRefreshMapping;
|
||||||
private final AtomicLong recoveryIdGenerator = new AtomicLong();
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
|
public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
|
||||||
ThreadPool threadPool, RecoveryTarget recoveryTarget,
|
ThreadPool threadPool, RecoveryTarget recoveryTarget,
|
||||||
ShardStateAction shardStateAction,
|
ShardStateAction shardStateAction,
|
||||||
NodeIndexDeletedAction nodeIndexDeletedAction,
|
NodeIndexDeletedAction nodeIndexDeletedAction,
|
||||||
NodeMappingRefreshAction nodeMappingRefreshAction,
|
NodeMappingRefreshAction nodeMappingRefreshAction) {
|
||||||
NodeEnvironment nodeEnvironment) {
|
|
||||||
super(settings);
|
super(settings);
|
||||||
this.indicesService = indicesService;
|
this.indicesService = indicesService;
|
||||||
this.clusterService = clusterService;
|
this.clusterService = clusterService;
|
||||||
|
@ -133,7 +131,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
|
this.nodeMappingRefreshAction = nodeMappingRefreshAction;
|
||||||
|
|
||||||
this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true);
|
this.sendRefreshMapping = componentSettings.getAsBoolean("send_refresh_mapping", true);
|
||||||
this.nodeEnvironment = nodeEnvironment;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -573,7 +570,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
|
RecoveryState recoveryState = recoveryTarget.recoveryState(indexShard);
|
||||||
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
|
if (recoveryState != null && recoveryState.getStage() != RecoveryState.Stage.DONE) {
|
||||||
// we have an ongoing recovery, find the source based on current routing and compare them
|
// we have an ongoing recovery, find the source based on current routing and compare them
|
||||||
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting, logger);
|
||||||
if (!recoveryState.getSourceNode().equals(sourceNode)) {
|
if (!recoveryState.getSourceNode().equals(sourceNode)) {
|
||||||
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])",
|
logger.debug("[{}][{}] removing shard (recovery source changed), current [{}], global [{}])",
|
||||||
shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
|
||||||
|
@ -669,7 +666,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
// if we're in peer recovery, try to find out the source node now so in case it fails, we will not create the index shard
|
// if we're in peer recovery, try to find out the source node now so in case it fails, we will not create the index shard
|
||||||
DiscoveryNode sourceNode = null;
|
DiscoveryNode sourceNode = null;
|
||||||
if (isPeerRecovery(shardRouting)) {
|
if (isPeerRecovery(shardRouting)) {
|
||||||
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
|
sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting, logger);
|
||||||
if (sourceNode == null) {
|
if (sourceNode == null) {
|
||||||
logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
|
logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());
|
||||||
return;
|
return;
|
||||||
|
@ -767,7 +764,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
||||||
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
|
* routing to *require* peer recovery, use {@link #isPeerRecovery(org.elasticsearch.cluster.routing.ShardRouting)} to
|
||||||
* check if its needed or not.
|
* check if its needed or not.
|
||||||
*/
|
*/
|
||||||
private DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting) {
|
public static DiscoveryNode findSourceNodeForPeerRecovery(RoutingTable routingTable, DiscoveryNodes nodes, ShardRouting shardRouting, ESLogger logger) {
|
||||||
|
//nocommit factor this out somewhere useful
|
||||||
DiscoveryNode sourceNode = null;
|
DiscoveryNode sourceNode = null;
|
||||||
if (!shardRouting.primary()) {
|
if (!shardRouting.primary()) {
|
||||||
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
IndexShardRoutingTable shardRoutingTable = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.indices.recovery;
|
||||||
|
|
||||||
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
|
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A recovery handler that skips phase 2 and 3 of the recovery
|
||||||
|
*/
|
||||||
|
public class FileSyncRecoveryHandler extends ShardRecoveryHandler {
|
||||||
|
|
||||||
|
private IndexShard shard;
|
||||||
|
private final StartRecoveryRequest request;
|
||||||
|
|
||||||
|
public FileSyncRecoveryHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
|
||||||
|
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
|
||||||
|
this.shard = shard;
|
||||||
|
this.request = request;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
|
||||||
|
logger.trace("{} recovery [phase2] to {}: skipping translog for file sync", request.shardId(), request.targetNode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
|
||||||
|
logger.trace("{} recovery [phase3] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode());
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
|
@ -29,9 +29,12 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexService;
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.settings.IndexSettings;
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndicesLifecycle;
|
import org.elasticsearch.indices.IndicesLifecycle;
|
||||||
|
@ -117,15 +120,21 @@ public class RecoverySource extends AbstractComponent {
|
||||||
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode());
|
logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(), request.targetNode());
|
||||||
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
|
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
|
||||||
}
|
}
|
||||||
if (!targetShardRouting.initializing()) {
|
if (request.recoveryType() != RecoveryState.Type.FILE_SYNC && !targetShardRouting.initializing()) {
|
||||||
logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]",
|
logger.debug("delaying recovery of {} as it is not listed as initializing on the target node {}. known shards state is [{}]",
|
||||||
request.shardId(), request.targetNode(), targetShardRouting.state());
|
request.shardId(), request.targetNode(), targetShardRouting.state());
|
||||||
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
|
throw new DelayRecoveryException("source node has the state of the target shard to be [" + targetShardRouting.state() + "], expecting to be [initializing]");
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
|
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
|
||||||
|
final ShardRecoveryHandler handler;
|
||||||
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
|
if (request.recoveryType() == RecoveryState.Type.FILE_SYNC) {
|
||||||
|
logger.trace("{} taking snapshot before file sync recovery", shard.shardId());
|
||||||
|
Releasables.close(shard.snapshotIndex()); // we have to take a snapshot here and close it right away so we can catchup from the latest docs committed
|
||||||
|
handler = new FileSyncRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
|
||||||
|
} else {
|
||||||
|
handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
|
||||||
|
}
|
||||||
ongoingRecoveries.add(shard, handler);
|
ongoingRecoveries.add(shard, handler);
|
||||||
try {
|
try {
|
||||||
shard.recover(handler);
|
shard.recover(handler);
|
||||||
|
|
|
@ -83,7 +83,8 @@ public class RecoveryState implements ToXContent, Streamable {
|
||||||
GATEWAY((byte) 0),
|
GATEWAY((byte) 0),
|
||||||
SNAPSHOT((byte) 1),
|
SNAPSHOT((byte) 1),
|
||||||
REPLICA((byte) 2),
|
REPLICA((byte) 2),
|
||||||
RELOCATION((byte) 3);
|
RELOCATION((byte) 3),
|
||||||
|
FILE_SYNC((byte) 4);
|
||||||
|
|
||||||
private static final Type[] TYPES = new Type[Type.values().length];
|
private static final Type[] TYPES = new Type[Type.values().length];
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
import org.apache.lucene.store.IndexOutput;
|
import org.apache.lucene.store.IndexOutput;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||||
import org.elasticsearch.ExceptionsHelper;
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
@ -134,6 +135,21 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
|
logger.debug("{} ignore recovery. already in recovering process, {}", indexShard.shardId(), e.getMessage());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
startRecoveryInternal(indexShard, recoveryType, sourceNode, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a full recovery but skips phase 2 and 3 and only syncs the files that have been committed.
|
||||||
|
*/
|
||||||
|
public void startFileSync(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
|
||||||
|
if (indexShard.state() != IndexShardState.STARTED) {
|
||||||
|
throw new ElasticsearchIllegalStateException("Shard is not started can't sync files -state: " + indexShard.state());
|
||||||
|
}
|
||||||
|
logger.trace("{} starting file sync with {}", indexShard.shardId(), sourceNode);
|
||||||
|
startRecoveryInternal(indexShard, RecoveryState.Type.FILE_SYNC, sourceNode, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startRecoveryInternal(IndexShard indexShard, RecoveryState.Type recoveryType, DiscoveryNode sourceNode, RecoveryListener listener) {
|
||||||
// create a new recovery status, and process...
|
// create a new recovery status, and process...
|
||||||
RecoveryState recoveryState = new RecoveryState(indexShard.shardId());
|
RecoveryState recoveryState = new RecoveryState(indexShard.shardId());
|
||||||
recoveryState.setType(recoveryType);
|
recoveryState.setType(recoveryType);
|
||||||
|
@ -142,7 +158,6 @@ public class RecoveryTarget extends AbstractComponent {
|
||||||
recoveryState.setPrimary(indexShard.routingEntry().primary());
|
recoveryState.setPrimary(indexShard.routingEntry().primary());
|
||||||
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
|
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, recoveryState, listener, recoverySettings.activityTimeout());
|
||||||
threadPool.generic().execute(new RecoveryRunner(recoveryId));
|
threadPool.generic().execute(new RecoveryRunner(recoveryId));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
|
protected void retryRecovery(final RecoveryStatus recoveryStatus, final String reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) {
|
||||||
|
|
|
@ -71,9 +71,9 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
* everything relating to copying the segment files as well as sending translog
|
* everything relating to copying the segment files as well as sending translog
|
||||||
* operations across the wire once the segments have been copied.
|
* operations across the wire once the segments have been copied.
|
||||||
*/
|
*/
|
||||||
public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
public class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
||||||
|
|
||||||
private final ESLogger logger;
|
protected final ESLogger logger;
|
||||||
// Shard that is going to be recovered (the "source")
|
// Shard that is going to be recovered (the "source")
|
||||||
private final IndexShard shard;
|
private final IndexShard shard;
|
||||||
private final String indexName;
|
private final String indexName;
|
||||||
|
@ -375,6 +375,10 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
|
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
|
||||||
|
if (request.recoveryType() == RecoveryState.Type.FILE_SYNC) {
|
||||||
|
logger.trace("{} recovery [phase2] to {}: skipping translog for file sync", request.shardId(), request.targetNode());
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (shard.state() == IndexShardState.CLOSED) {
|
if (shard.state() == IndexShardState.CLOSED) {
|
||||||
throw new IndexShardClosedException(request.shardId());
|
throw new IndexShardClosedException(request.shardId());
|
||||||
}
|
}
|
||||||
|
@ -430,11 +434,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
||||||
throw new IndexShardClosedException(request.shardId());
|
throw new IndexShardClosedException(request.shardId());
|
||||||
}
|
}
|
||||||
cancellableThreads.checkForCancel();
|
cancellableThreads.checkForCancel();
|
||||||
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
|
|
||||||
StopWatch stopWatch = new StopWatch().start();
|
StopWatch stopWatch = new StopWatch().start();
|
||||||
|
final int totalOperations;
|
||||||
|
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
|
||||||
|
|
||||||
// Send the translog operations to the target node
|
// Send the translog operations to the target node
|
||||||
int totalOperations = sendSnapshot(snapshot);
|
totalOperations = sendSnapshot(snapshot);
|
||||||
|
|
||||||
cancellableThreads.execute(new Interruptable() {
|
cancellableThreads.execute(new Interruptable() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -538,7 +543,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
|
||||||
*
|
*
|
||||||
* @return the total number of translog operations that were sent
|
* @return the total number of translog operations that were sent
|
||||||
*/
|
*/
|
||||||
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
|
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
|
||||||
int ops = 0;
|
int ops = 0;
|
||||||
long size = 0;
|
long size = 0;
|
||||||
int totalOperations = 0;
|
int totalOperations = 0;
|
||||||
|
|
|
@ -23,15 +23,15 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
import org.elasticsearch.cluster.routing.*;
|
||||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.search.SearchHit;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.elasticsearch.test.InternalTestCluster;
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -262,4 +262,92 @@ public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
assertPathHasBeenCleared(dataPath);
|
assertPathHasBeenCleared(dataPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void testSimpleNonSharedFS() throws Exception {
|
||||||
|
internalCluster().startNodesAsync(3).get();
|
||||||
|
final String IDX = "test";
|
||||||
|
final Path dataPath = newTempDirPath();
|
||||||
|
|
||||||
|
Settings idxSettings = ImmutableSettings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, 2))
|
||||||
|
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
|
||||||
|
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
|
||||||
|
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
prepareCreate(IDX).setSettings(idxSettings).get();
|
||||||
|
ensureGreen(IDX);
|
||||||
|
|
||||||
|
IndexRequestBuilder[] builders = new IndexRequestBuilder[between(10, 20)];
|
||||||
|
for (int i = 0; i < builders.length; i++) {
|
||||||
|
builders[i] = client().prepareIndex(IDX, "doc", Integer.toString(i)).setSource("foo", "bar");
|
||||||
|
}
|
||||||
|
indexRandom(false, builders);
|
||||||
|
flush(IDX);
|
||||||
|
logger.info("--> performing query");
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
||||||
|
assertHitCount(resp, builders.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> restarting all nodes");
|
||||||
|
if (randomBoolean()) {
|
||||||
|
logger.info("--> rolling restart");
|
||||||
|
internalCluster().rollingRestart();
|
||||||
|
} else {
|
||||||
|
logger.info("--> full restart");
|
||||||
|
internalCluster().fullRestart();
|
||||||
|
}
|
||||||
|
|
||||||
|
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
|
||||||
|
ensureGreen(IDX);
|
||||||
|
|
||||||
|
logger.info("--> performing query");
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
||||||
|
assertHitCount(resp, builders.length);
|
||||||
|
for (SearchHit hit : resp.getHits().getHits()) {
|
||||||
|
assertEquals(hit.sourceAsMap().get("foo"), "bar");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we reindex and check if the new updated docs are all available on the primary
|
||||||
|
for (int i = 0; i < builders.length; i++) {
|
||||||
|
builders[i] = client().prepareIndex(IDX, "doc", Integer.toString(i)).setSource("foo", "foobar");
|
||||||
|
}
|
||||||
|
indexRandom(false, builders);
|
||||||
|
// only refresh no flush
|
||||||
|
refresh();
|
||||||
|
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||||
|
GroupShardsIterator shardIterators = state.getRoutingNodes().getRoutingTable().activePrimaryShardsGrouped(new String[]{IDX}, false);
|
||||||
|
ShardRouting primary = shardIterators.iterator().next().nextOrNull();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SearchResponse resp = client().prepareSearch(IDX).setExplain(true).setQuery(matchAllQuery()).get();
|
||||||
|
assertHitCount(resp, builders.length);
|
||||||
|
for (SearchHit hit : resp.getHits().getHits()) {
|
||||||
|
if (hit.shard().getNodeId().equals(primary.currentNodeId())) {
|
||||||
|
// this comes from the primary so we have the latest
|
||||||
|
assertEquals(hit.sourceAsMap().get("foo"), "foobar");
|
||||||
|
} else {
|
||||||
|
// this comes from the replica not caught up yet
|
||||||
|
assertEquals(hit.sourceAsMap().get("foo"), "bar");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
flush(IDX);
|
||||||
|
logger.info("--> performing query");
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
|
||||||
|
assertHitCount(resp, builders.length);
|
||||||
|
for (SearchHit hit : resp.getHits().getHits()) {
|
||||||
|
assertEquals(hit.sourceAsMap().get("foo"), "foobar");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> deleting index");
|
||||||
|
assertAcked(client().admin().indices().prepareDelete(IDX));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,9 +26,12 @@ import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.LockFactory;
|
import org.apache.lucene.store.LockFactory;
|
||||||
import org.apache.lucene.store.StoreRateLimiting;
|
import org.apache.lucene.store.StoreRateLimiting;
|
||||||
import org.apache.lucene.util.AbstractRandomizedTest;
|
import org.apache.lucene.util.AbstractRandomizedTest;
|
||||||
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.lease.Releasables;
|
||||||
import org.elasticsearch.common.lucene.Lucene;
|
import org.elasticsearch.common.lucene.Lucene;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
@ -87,7 +90,7 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||||
// When the the internal engine closes we do a rollback, which removes uncommitted segments
|
// When the the internal engine closes we do a rollback, which removes uncommitted segments
|
||||||
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
|
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
|
||||||
// so that even in tests where don't flush we can check the integrity of the Lucene index
|
// so that even in tests where don't flush we can check the integrity of the Lucene index
|
||||||
indexShard.engine().snapshotIndex(); // Keep translog for tests that rely on replaying it
|
Releasables.close(indexShard.engine().snapshotIndex()); // Keep translog for tests that rely on replaying it
|
||||||
logger.info("flush finished in beforeIndexShardClosed");
|
logger.info("flush finished in beforeIndexShardClosed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue