add test for failing shadow engine / remove nocommit

This commit is contained in:
Simon Willnauer 2015-02-17 10:50:59 +01:00
parent d77414c5e7
commit 48a700d23c
7 changed files with 116 additions and 132 deletions

View file

@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* *
@ -72,6 +73,10 @@ public abstract class Engine implements Closeable {
protected final AtomicBoolean isClosed = new AtomicBoolean(false); protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener; protected final FailedEngineListener failedEngineListener;
protected final SnapshotDeletionPolicy deletionPolicy; protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Throwable failedEngine = null; protected volatile Throwable failedEngine = null;
@ -415,8 +420,45 @@ public abstract class Engine implements Closeable {
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException; public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
/** fail engine due to some error. the engine will also be closed. */ public void failEngine(String reason, Throwable failure) {
public abstract void failEngine(String reason, Throwable failure); assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}
/** Check whether the engine should be failed */ /** Check whether the engine should be failed */
protected boolean maybeFailEngine(String source, Throwable t) { protected boolean maybeFailEngine(String source, Throwable t) {
@ -963,4 +1005,17 @@ public abstract class Engine implements Closeable {
} }
protected abstract SearcherManager getSearcherManager(); protected abstract SearcherManager getSearcherManager();
protected abstract void closeNoLock(String reason) throws ElasticsearchException;
@Override
public void close() throws IOException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.debug("close now acquiring writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.debug("close acquired writeLock");
closeNoLock("api");
}
}
}
} }

View file

@ -80,10 +80,6 @@ public class InternalEngine extends Engine {
private final MergePolicyProvider mergePolicyProvider; private final MergePolicyProvider mergePolicyProvider;
private final MergeSchedulerProvider mergeScheduler; private final MergeSchedulerProvider mergeScheduler;
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
private final IndexWriter indexWriter; private final IndexWriter indexWriter;
private final SearcherFactory searcherFactory; private final SearcherFactory searcherFactory;
@ -101,7 +97,6 @@ public class InternalEngine extends Engine {
private final LiveVersionMap versionMap; private final LiveVersionMap versionMap;
private final Object[] dirtyLocks; private final Object[] dirtyLocks;
private final ReentrantLock failEngineLock = new ReentrantLock();
private final AtomicLong translogIdGenerator = new AtomicLong(); private final AtomicLong translogIdGenerator = new AtomicLong();
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean(); private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
@ -892,23 +887,12 @@ public class InternalEngine extends Engine {
} }
} }
@Override
public void close() throws ElasticsearchException {
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
logger.trace("close now acquire writeLock");
try (ReleasableLock _ = writeLock.acquire()) {
logger.trace("close now acquired writeLock");
closeNoLock("api");
}
}
}
/** /**
* Closes the engine without acquiring the write lock. This should only be * Closes the engine without acquiring the write lock. This should only be
* called while the write lock is hold or in a disaster condition ie. if the engine * called while the write lock is hold or in a disaster condition ie. if the engine
* is failed. * is failed.
*/ */
private void closeNoLock(String reason) throws ElasticsearchException { protected final void closeNoLock(String reason) throws ElasticsearchException {
if (isClosed.compareAndSet(false, true)) { if (isClosed.compareAndSet(false, true)) {
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
try { try {
@ -938,47 +922,6 @@ public class InternalEngine extends Engine {
} }
} }
@Override
public void failEngine(String reason, Throwable failure) {
assert failure != null;
if (failEngineLock.tryLock()) {
store.incRef();
try {
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
store.decRef();
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}
@Override @Override
protected SearcherManager getSearcherManager() { protected SearcherManager getSearcherManager() {
return searcherManager; return searcherManager;

View file

@ -27,6 +27,7 @@ import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager; import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
@ -61,13 +62,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/ */
public class ShadowEngine extends Engine { public class ShadowEngine extends Engine {
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
private final Lock failReleasableLock = new ReentrantLock();
private final RecoveryCounter onGoingRecoveries; private final RecoveryCounter onGoingRecoveries;
private volatile boolean closedOrFailed = false;
private volatile SearcherManager searcherManager; private volatile SearcherManager searcherManager;
private SegmentInfos lastCommittedSegmentInfos; private SegmentInfos lastCommittedSegmentInfos;
@ -230,9 +226,7 @@ public class ShadowEngine extends Engine {
// take a write lock here so it won't happen while a flush is in progress // take a write lock here so it won't happen while a flush is in progress
// this means that next commits will not be allowed once the lock is released // this means that next commits will not be allowed once the lock is released
try (ReleasableLock _ = writeLock.acquire()) { try (ReleasableLock _ = writeLock.acquire()) {
if (closedOrFailed) { ensureOpen();
throw new EngineClosedException(shardId, failedEngine);
}
onGoingRecoveries.startRecovery(); onGoingRecoveries.startRecovery();
} }
@ -261,64 +255,22 @@ public class ShadowEngine extends Engine {
// engine, there is no phase2 and phase3 of recovery // engine, there is no phase2 and phase3 of recovery
} }
@Override
public void failEngine(String reason, Throwable failure) {
// Note, there is no IndexWriter, so nothing to rollback here
assert failure != null;
if (failReleasableLock.tryLock()) {
try {
try {
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
// the shard is initializing
if (Lucene.isCorruptionException(failure)) {
try {
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
} catch (IOException e) {
logger.warn("Couldn't marks store corrupted", e);
}
}
} finally {
if (failedEngine != null) {
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
return;
}
logger.warn("failed engine [{}]", failure, reason);
// we must set a failure exception, generate one if not supplied
failedEngine = failure;
failedEngineListener.onFailedEngine(shardId, reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
logger.warn("failEngine threw exception", t);
} finally {
closedOrFailed = true;
}
} else {
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
}
}
@Override @Override
protected SearcherManager getSearcherManager() { protected SearcherManager getSearcherManager() {
return searcherManager; return searcherManager;
} }
@Override @Override
public void close() throws IOException { protected void closeNoLock(String reason) throws ElasticsearchException {
logger.debug("shadow replica close now acquiring writeLock"); if (isClosed.compareAndSet(false, true)) {
try (ReleasableLock _ = writeLock.acquire()) { try {
logger.debug("shadow replica close acquired writeLock"); logger.debug("shadow replica close searcher manager refCount: {}", store.refCount());
if (isClosed.compareAndSet(false, true)) { IOUtils.close(searcherManager);
try { } catch (Throwable t) {
logger.debug("shadow replica close searcher manager refCount: {}", store.refCount()); logger.warn("shadow replica failed to close searcher manager", t);
IOUtils.close(searcherManager); } finally {
} catch (Throwable t) { store.decRef();
logger.warn("shadow replica failed to close searcher manager", t);
} finally {
store.decRef();
}
} }
} }
} }

View file

@ -31,10 +31,13 @@ public final class DirectoryUtils {
private DirectoryUtils() {} // no instance private DirectoryUtils() {} // no instance
static final Directory getLeafDirectory(FilterDirectory dir) { static final <T extends Directory> Directory getLeafDirectory(FilterDirectory dir, Class<T> targetClass) {
Directory current = dir.getDelegate(); Directory current = dir.getDelegate();
while (true) { while (true) {
if ((current instanceof FilterDirectory)) { if ((current instanceof FilterDirectory)) {
if (targetClass != null && targetClass.isAssignableFrom(current.getClass())) {
break;
}
current = ((FilterDirectory) current).getDelegate(); current = ((FilterDirectory) current).getDelegate();
} else { } else {
break; break;
@ -59,7 +62,7 @@ public final class DirectoryUtils {
public static <T extends Directory> T getLeaf(Directory dir, Class<T> targetClass, T defaultValue) { public static <T extends Directory> T getLeaf(Directory dir, Class<T> targetClass, T defaultValue) {
Directory d = dir; Directory d = dir;
if (dir instanceof FilterDirectory) { if (dir instanceof FilterDirectory) {
d = getLeafDirectory((FilterDirectory) dir); d = getLeafDirectory((FilterDirectory) dir, targetClass);
} }
if (d instanceof FileSwitchDirectory) { if (d instanceof FileSwitchDirectory) {
T leaf = getLeaf(((FileSwitchDirectory) d).getPrimaryDir(), targetClass); T leaf = getLeaf(((FileSwitchDirectory) d).getPrimaryDir(), targetClass);

View file

@ -24,12 +24,10 @@ import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField; import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.index.*;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LiveIndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -42,6 +40,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService; import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService; import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParseContext;
@ -56,6 +55,7 @@ import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor; import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -825,7 +825,37 @@ public class ShadowEngineTests extends ElasticsearchLuceneTestCase {
@Test @Test
public void testFailEngineOnCorruption() { public void testFailEngineOnCorruption() {
// nocommit - figure out how to implement me for shadow replica ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
primaryEngine.flush();
final boolean failEngine = replicaEngine.config().isFailEngineOnCorruption();
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class);
leaf.setRandomIOExceptionRate(1.0);
leaf.setRandomIOExceptionRateOnOpen(1.0);
try {
replicaEngine.refresh("foo");
fail("exception expected");
} catch (Exception ex) {
}
try {
Engine.Searcher searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
primaryEngine.create(new Engine.Create(null, newUid("2"), doc2));
primaryEngine.refresh("foo");
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 2));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(2));
searchResult.close();
assertThat(failEngine, is(false));
} catch (EngineClosedException ex) {
assertThat(failEngine, is(true));
}
} }
@Test @Test

View file

@ -43,7 +43,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file); BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(dir) {}, FSDirectory.class, null); FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(dir) {}, FSDirectory.class, null);
assertThat(directory, notNullValue()); assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close(); dir.close();
} }
@ -51,7 +51,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file); BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(dir, FSDirectory.class, null); FSDirectory directory = DirectoryUtils.getLeaf(dir, FSDirectory.class, null);
assertThat(directory, notNullValue()); assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close(); dir.close();
} }
@ -60,7 +60,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file); BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean()), FSDirectory.class, null); FSDirectory directory = DirectoryUtils.getLeaf(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean()), FSDirectory.class, null);
assertThat(directory, notNullValue()); assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close(); dir.close();
} }
@ -69,7 +69,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file); BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean())) {}, FSDirectory.class, null); FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean())) {}, FSDirectory.class, null);
assertThat(directory, notNullValue()); assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir))); assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close(); dir.close();
} }

View file

@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -76,7 +77,7 @@ public class MockInternalEngine extends InternalEngine {
} }
@Override @Override
public void close() { public void close() throws IOException {
try { try {
super.close(); super.close();
} finally { } finally {