mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-30 10:23:41 -04:00
[ENGINE] Move more methods into abstract Engine
This commit is contained in:
parent
1b8d8da648
commit
2e3c6a9118
3 changed files with 60 additions and 60 deletions
|
@ -59,6 +59,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -72,6 +73,10 @@ public abstract class Engine implements Closeable {
|
||||||
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
|
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
|
||||||
protected final FailedEngineListener failedEngineListener;
|
protected final FailedEngineListener failedEngineListener;
|
||||||
protected final SnapshotDeletionPolicy deletionPolicy;
|
protected final SnapshotDeletionPolicy deletionPolicy;
|
||||||
|
protected final ReentrantLock failEngineLock = new ReentrantLock();
|
||||||
|
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
||||||
|
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
|
||||||
|
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
|
||||||
|
|
||||||
protected volatile Throwable failedEngine = null;
|
protected volatile Throwable failedEngine = null;
|
||||||
|
|
||||||
|
@ -416,7 +421,45 @@ public abstract class Engine implements Closeable {
|
||||||
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
|
public abstract void recover(RecoveryHandler recoveryHandler) throws EngineException;
|
||||||
|
|
||||||
/** fail engine due to some error. the engine will also be closed. */
|
/** fail engine due to some error. the engine will also be closed. */
|
||||||
public abstract void failEngine(String reason, Throwable failure);
|
public void failEngine(String reason, Throwable failure) {
|
||||||
|
assert failure != null;
|
||||||
|
if (failEngineLock.tryLock()) {
|
||||||
|
store.incRef();
|
||||||
|
try {
|
||||||
|
try {
|
||||||
|
// we just go and close this engine - no way to recover
|
||||||
|
closeNoLock("engine failed on: [" + reason + "]");
|
||||||
|
// we first mark the store as corrupted before we notify any listeners
|
||||||
|
// this must happen first otherwise we might try to reallocate so quickly
|
||||||
|
// on the same node that we don't see the corrupted marker file when
|
||||||
|
// the shard is initializing
|
||||||
|
if (Lucene.isCorruptionException(failure)) {
|
||||||
|
try {
|
||||||
|
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("Couldn't marks store corrupted", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (failedEngine != null) {
|
||||||
|
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.warn("failed engine [{}]", failure, reason);
|
||||||
|
// we must set a failure exception, generate one if not supplied
|
||||||
|
failedEngine = failure;
|
||||||
|
failedEngineListener.onFailedEngine(shardId, reason, failure);
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
// don't bubble up these exceptions up
|
||||||
|
logger.warn("failEngine threw exception", t);
|
||||||
|
} finally {
|
||||||
|
store.decRef();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/** Check whether the engine should be failed */
|
/** Check whether the engine should be failed */
|
||||||
protected boolean maybeFailEngine(String source, Throwable t) {
|
protected boolean maybeFailEngine(String source, Throwable t) {
|
||||||
|
@ -963,4 +1006,17 @@ public abstract class Engine implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract SearcherManager getSearcherManager();
|
protected abstract SearcherManager getSearcherManager();
|
||||||
|
|
||||||
|
protected abstract void closeNoLock(String reason) throws ElasticsearchException;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
|
||||||
|
logger.debug("close now acquiring writeLock");
|
||||||
|
try (ReleasableLock _ = writeLock.acquire()) {
|
||||||
|
logger.debug("close acquired writeLock");
|
||||||
|
closeNoLock("api");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,10 +80,6 @@ public class InternalEngine extends Engine {
|
||||||
private final MergePolicyProvider mergePolicyProvider;
|
private final MergePolicyProvider mergePolicyProvider;
|
||||||
private final MergeSchedulerProvider mergeScheduler;
|
private final MergeSchedulerProvider mergeScheduler;
|
||||||
|
|
||||||
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
|
|
||||||
private final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
|
|
||||||
private final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
|
|
||||||
|
|
||||||
private final IndexWriter indexWriter;
|
private final IndexWriter indexWriter;
|
||||||
|
|
||||||
private final SearcherFactory searcherFactory;
|
private final SearcherFactory searcherFactory;
|
||||||
|
@ -101,7 +97,6 @@ public class InternalEngine extends Engine {
|
||||||
private final LiveVersionMap versionMap;
|
private final LiveVersionMap versionMap;
|
||||||
|
|
||||||
private final Object[] dirtyLocks;
|
private final Object[] dirtyLocks;
|
||||||
private final ReentrantLock failEngineLock = new ReentrantLock();
|
|
||||||
|
|
||||||
private final AtomicLong translogIdGenerator = new AtomicLong();
|
private final AtomicLong translogIdGenerator = new AtomicLong();
|
||||||
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
|
private final AtomicBoolean versionMapRefreshPending = new AtomicBoolean();
|
||||||
|
@ -892,23 +887,12 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws ElasticsearchException {
|
|
||||||
if (isClosed.get() == false) { // don't acquire the write lock if we are already closed
|
|
||||||
logger.trace("close now acquire writeLock");
|
|
||||||
try (ReleasableLock _ = writeLock.acquire()) {
|
|
||||||
logger.trace("close now acquired writeLock");
|
|
||||||
closeNoLock("api");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the engine without acquiring the write lock. This should only be
|
* Closes the engine without acquiring the write lock. This should only be
|
||||||
* called while the write lock is hold or in a disaster condition ie. if the engine
|
* called while the write lock is hold or in a disaster condition ie. if the engine
|
||||||
* is failed.
|
* is failed.
|
||||||
*/
|
*/
|
||||||
private void closeNoLock(String reason) throws ElasticsearchException {
|
protected final void closeNoLock(String reason) throws ElasticsearchException {
|
||||||
if (isClosed.compareAndSet(false, true)) {
|
if (isClosed.compareAndSet(false, true)) {
|
||||||
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
|
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself";
|
||||||
try {
|
try {
|
||||||
|
@ -938,47 +922,6 @@ public class InternalEngine extends Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void failEngine(String reason, Throwable failure) {
|
|
||||||
assert failure != null;
|
|
||||||
if (failEngineLock.tryLock()) {
|
|
||||||
store.incRef();
|
|
||||||
try {
|
|
||||||
try {
|
|
||||||
// we just go and close this engine - no way to recover
|
|
||||||
closeNoLock("engine failed on: [" + reason + "]");
|
|
||||||
// we first mark the store as corrupted before we notify any listeners
|
|
||||||
// this must happen first otherwise we might try to reallocate so quickly
|
|
||||||
// on the same node that we don't see the corrupted marker file when
|
|
||||||
// the shard is initializing
|
|
||||||
if (Lucene.isCorruptionException(failure)) {
|
|
||||||
try {
|
|
||||||
store.markStoreCorrupted(ExceptionsHelper.unwrapCorruption(failure));
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn("Couldn't marks store corrupted", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (failedEngine != null) {
|
|
||||||
logger.debug("tried to fail engine but engine is already failed. ignoring. [{}]", reason, failure);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.warn("failed engine [{}]", failure, reason);
|
|
||||||
// we must set a failure exception, generate one if not supplied
|
|
||||||
failedEngine = failure;
|
|
||||||
failedEngineListener.onFailedEngine(shardId, reason, failure);
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// don't bubble up these exceptions up
|
|
||||||
logger.warn("failEngine threw exception", t);
|
|
||||||
} finally {
|
|
||||||
store.decRef();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logger.debug("tried to fail engine but could not acquire lock - engine should be failed by now [{}]", reason, failure);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected SearcherManager getSearcherManager() {
|
protected SearcherManager getSearcherManager() {
|
||||||
return searcherManager;
|
return searcherManager;
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.InternalEngine;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -76,7 +77,7 @@ public class MockInternalEngine extends InternalEngine {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() throws IOException {
|
||||||
try {
|
try {
|
||||||
super.close();
|
super.close();
|
||||||
} finally {
|
} finally {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue