From 73bfdc4066be080dc4cad1f0521bf6ea14cded93 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 29 Apr 2019 20:57:31 -0400 Subject: [PATCH] Simplify initialization of max_seq_no of updates (#41161) Today we choose to initialize max_seq_no_of_updates on primaries only so we can deal with a situation where a primary is on an old node (before 6.5) which does not have MUS while replicas on new nodes (6.5+). However, this strategy is quite complex and can lead to bugs (for example #40249) since we have to assign a correct value (not too low) to MSU in all possible situations (before recovering from translog, restoring history on promotion, and handing off relocation). Fortunately, we don't have to deal with this BWC in 7.0+ since all nodes in the cluster should have MSU. This change simplifies the initialization of MSU by always assigning it a correct value in the constructor of Engine regardless of whether it's a replica or primary. Relates #33842 --- .../elasticsearch/index/engine/Engine.java | 27 ++------------ .../index/engine/InternalEngine.java | 36 ++++++++++--------- .../index/engine/ReadOnlyEngine.java | 16 ++++++--- .../elasticsearch/index/shard/IndexShard.java | 23 +----------- .../index/engine/InternalEngineTests.java | 27 +------------- .../index/engine/ReadOnlyEngineTests.java | 2 -- .../index/shard/IndexShardTests.java | 14 ++------ .../index/shard/RefreshListenersTests.java | 1 - .../index/engine/EngineTestCase.java | 1 - .../ccr/index/engine/FollowingEngine.java | 3 +- .../index/engine/FollowingEngineTests.java | 2 -- 11 files changed, 40 insertions(+), 112 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index 9bed93c37169..63659126f843 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -98,7 +98,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -142,16 +141,6 @@ public abstract class Engine implements Closeable { */ protected volatile long lastWriteNanos = System.nanoTime(); - /* - * This marker tracks the max seq_no of either update operations or delete operations have been processed in this engine. - * An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. - * This marker is started uninitialized (-2), and the optimization using seq_no will be disabled if this marker is uninitialized. - * The value of this marker never goes backwards, and is updated/changed differently on primary and replica: - * 1. A primary initializes this marker once using the max_seq_no from its history, then advances when processing an update or delete. - * 2. A replica never advances this marker by itself but only inherits from its primary (via advanceMaxSeqNoOfUpdatesOrDeletes). - */ - private final AtomicLong maxSeqNoOfUpdatesOrDeletes = new AtomicLong(UNASSIGNED_SEQ_NO); - protected Engine(EngineConfig engineConfig) { Objects.requireNonNull(engineConfig.getStore(), "Store must be provided to the engine"); @@ -1961,25 +1950,13 @@ public abstract class Engine implements Closeable { * Moreover, operations that are optimized using the MSU optimization must not be processed twice as this will create duplicates * in Lucene. To avoid this we check the local checkpoint tracker to see if an operation was already processed. * - * @see #reinitializeMaxSeqNoOfUpdatesOrDeletes() * @see #advanceMaxSeqNoOfUpdatesOrDeletes(long) */ - public final long getMaxSeqNoOfUpdatesOrDeletes() { - return maxSeqNoOfUpdatesOrDeletes.get(); - } - - /** - * A primary shard calls this method to re-initialize the max_seq_no_of_updates marker using the - * max_seq_no from Lucene index and translog before replaying the local translog in its local recovery. - */ - public abstract void reinitializeMaxSeqNoOfUpdatesOrDeletes(); + public abstract long getMaxSeqNoOfUpdatesOrDeletes(); /** * A replica shard receives a new max_seq_no_of_updates from its primary shard, then calls this method * to advance this marker to at least the given sequence number. */ - public final void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { - maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, seqNo)); - assert maxSeqNoOfUpdatesOrDeletes.get() >= seqNo : maxSeqNoOfUpdatesOrDeletes.get() + " < " + seqNo; - } + public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 654d31d22671..bb301bc4addb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -47,7 +47,6 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; import org.elasticsearch.Assertions; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.SuppressForbidden; @@ -146,6 +145,10 @@ public class InternalEngine extends Engine { private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeenAutoIdTimestamp = new AtomicLong(-1); private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + // max_seq_no_of_updates_or_deletes tracks the max seq_no of update or delete operations that have been processed in this engine. + // An index request is considered as an update if it overwrites existing documents with the same docId in the Lucene index. + // The value of this marker never goes backwards, and is tracked/updated differently on primary and replica. + private final AtomicLong maxSeqNoOfUpdatesOrDeletes; private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); // Lucene operations since this engine was opened - not include operations from existing segments. @@ -228,6 +231,7 @@ public class InternalEngine extends Engine { () -> acquireSearcher("create_local_checkpoint_tracker", SearcherScope.INTERNAL), localCheckpointTrackerSupplier); this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint()); this.internalSearcherManager.addListener(lastRefreshedCheckpointListener); + maxSeqNoOfUpdatesOrDeletes = new AtomicLong(SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo())); success = true; } finally { if (success == false) { @@ -405,7 +409,6 @@ public class InternalEngine extends Engine { flushLock.lock(); try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is uninitialized"; if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -874,7 +877,7 @@ public class InternalEngine extends Engine { final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletes(index.seqNo()); + advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); } } else { markSeqNoAsSeen(index.seqNo()); @@ -981,7 +984,6 @@ public class InternalEngine extends Engine { protected final IndexingStrategy planIndexingAsPrimary(Index index) throws IOException { assert index.origin() == Operation.Origin.PRIMARY : "planing as primary but origin isn't. got " + index.origin(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; final IndexingStrategy plan; // resolve an external operation into an internal one which is safe to replay if (canOptimizeAddDocument(index)) { @@ -1322,7 +1324,6 @@ public class InternalEngine extends Engine { protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException { assert delete.origin() == Operation.Origin.PRIMARY : "planing as primary but got " + delete.origin(); - assert getMaxSeqNoOfUpdatesOrDeletes() != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; // resolve operation from external to internal final VersionValue versionValue = resolveDocVersion(delete, delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO); assert incrementVersionLookup(); @@ -2718,13 +2719,22 @@ public class InternalEngine extends Engine { assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get(); } + @Override + public long getMaxSeqNoOfUpdatesOrDeletes() { + return maxSeqNoOfUpdatesOrDeletes.get(); + } + + @Override + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { + if (maxSeqNoOfUpdatesOnPrimary == SequenceNumbers.UNASSIGNED_SEQ_NO) { + assert false : "max_seq_no_of_updates on primary is unassigned"; + throw new IllegalArgumentException("max_seq_no_of_updates on primary is unassigned"); + } + this.maxSeqNoOfUpdatesOrDeletes.updateAndGet(curr -> Math.max(curr, maxSeqNoOfUpdatesOnPrimary)); + } + private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) { final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); - // If the primary is on an old version which does not replicate msu, we need to relax this assertion for that. - if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) { - assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_5_0); - return true; - } // We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument). if (allowDeleted) { final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes()); @@ -2742,12 +2752,6 @@ public class InternalEngine extends Engine { return true; } - @Override - public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { - final long maxSeqNo = SequenceNumbers.max(localCheckpointTracker.getMaxSeqNo(), translog.getMaxSeqNo()); - advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNo); - } - private static void trimUnsafeCommits(EngineConfig engineConfig) throws IOException { final Store store = engineConfig.getStore(); final String translogUUID = store.readLastCommittedSegmentsInfo().getUserData().get(Translog.TRANSLOG_UUID_KEY); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 777aff88e9db..7b47d60437fe 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -456,11 +456,6 @@ public class ReadOnlyEngine extends Engine { } - @Override - public void reinitializeMaxSeqNoOfUpdatesOrDeletes() { - advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats.getMaxSeqNo()); - } - protected void processReaders(IndexReader reader, IndexReader previousReader) { searcherFactory.processReaders(reader, previousReader); } @@ -487,4 +482,15 @@ public class ReadOnlyEngine extends Engine { } }; } + + @Override + public long getMaxSeqNoOfUpdatesOrDeletes() { + return seqNoStats.getMaxSeqNo(); + } + + @Override + public void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary) { + assert maxSeqNoOfUpdatesOnPrimary <= getMaxSeqNoOfUpdatesOrDeletes() : + maxSeqNoOfUpdatesOnPrimary + ">" + getMaxSeqNoOfUpdatesOrDeletes(); + } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7d6faa73a941..ee67597efe31 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -532,14 +532,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * the reverted operations on this shard by replaying the translog to avoid losing acknowledged writes. */ final Engine engine = getEngine(); - if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { - // If the old primary was on an old version that did not replicate the msu, - // we need to bootstrap it manually from its local history. - assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); - engine.advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); - } - // in case we previously reset engine, we need to forward MSU before replaying translog. - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.restoreLocalHistoryFromTranslog((resettingEngine, snapshot) -> runTranslogRecovery(resettingEngine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {})); /* Rolling the translog generation is not strictly needed here (as we will never have collisions between @@ -1411,9 +1403,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl translogRecoveryStats::incrementRecoveredOperations); }; innerOpenEngineAndTranslog(); - final Engine engine = getEngine(); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); - engine.recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); + getEngine().recoverFromTranslog(translogRecoveryRunner, Long.MAX_VALUE); } /** @@ -2206,12 +2196,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { replicationTracker.activateWithPrimaryContext(primaryContext); // make changes to primaryMode flag only under mutex - if (getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO) { - // If the old primary was on an old version that did not replicate the msu, - // we need to bootstrap it manually from its local history. - assert indexSettings.getIndexVersionCreated().before(Version.V_6_5_0); - getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNoStats().getMaxSeqNo()); - } } } @@ -3138,7 +3122,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig())); onNewEngine(newEngineReference.get()); } - newEngineReference.get().advanceMaxSeqNoOfUpdatesOrDeletes(globalCheckpoint); final Engine.TranslogRecoveryRunner translogRunner = (engine, snapshot) -> runTranslogRecovery( engine, snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { // TODO: add a dedicate recovery stats for the reset translog @@ -3185,11 +3168,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, long, ActionListener) */ public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) { - assert seqNo != UNASSIGNED_SEQ_NO - || getMaxSeqNoOfUpdatesOrDeletes() == UNASSIGNED_SEQ_NO : - "replica has max_seq_no_of_updates=" + getMaxSeqNoOfUpdatesOrDeletes() + " but primary does not"; getEngine().advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); - assert seqNo <= getMaxSeqNoOfUpdatesOrDeletes() : getMaxSeqNoOfUpdatesOrDeletes() + " < " + seqNo; } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 23763426b374..ae11500e54e5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -683,7 +683,6 @@ public class InternalEngineTests extends EngineTestCase { engine = new InternalEngine(engine.config()); assertTrue(engine.isRecovering()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test")); assertThat(counter.get(), equalTo(2)); @@ -700,7 +699,6 @@ public class InternalEngineTests extends EngineTestCase { engine = new InternalEngine(engine.config()); expectThrows(IllegalStateException.class, () -> engine.flush(true, true)); assertTrue(engine.isRecovering()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertFalse(engine.isRecovering()); doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null); @@ -732,7 +730,6 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(engine); } try (Engine recoveringEngine = new InternalEngine(engine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { final TotalHitCountCollector collector = new TotalHitCountCollector(); @@ -768,7 +765,6 @@ public class InternalEngineTests extends EngineTestCase { } }; assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs)); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertTrue(committed.get()); } finally { @@ -802,7 +798,6 @@ public class InternalEngineTests extends EngineTestCase { } initialEngine.close(); recoveringEngine = new InternalEngine(initialEngine.config()); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs); @@ -837,14 +832,12 @@ public class InternalEngineTests extends EngineTestCase { engine.syncTranslog(); } try (InternalEngine engine = new InternalEngine(config)) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo)); } try (InternalEngine engine = new InternalEngine(config)) { long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, upToSeqNo); assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo)); assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo)); @@ -1260,7 +1253,6 @@ public class InternalEngineTests extends EngineTestCase { store.associateIndexWithNewTranslog(translogUUID); } engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId); } @@ -1280,7 +1272,6 @@ public class InternalEngineTests extends EngineTestCase { EngineConfig config = engine.config(); engine.close(); engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)); @@ -2378,7 +2369,6 @@ public class InternalEngineTests extends EngineTestCase { } try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); @@ -2733,7 +2723,6 @@ public class InternalEngineTests extends EngineTestCase { assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); } assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY)); @@ -2751,7 +2740,6 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(2, engine.getTranslog().currentFileGeneration()); assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations()); @@ -2765,7 +2753,6 @@ public class InternalEngineTests extends EngineTestCase { Map userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY)); assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); userData = engine.getLastCommittedSegmentInfos().getUserData(); assertEquals("no changes - nothing to commit", "1", @@ -2872,7 +2859,6 @@ public class InternalEngineTests extends EngineTestCase { } } }) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null); @@ -2885,7 +2871,6 @@ public class InternalEngineTests extends EngineTestCase { try (InternalEngine engine = new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null, globalCheckpointSupplier))) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, 1); final long committedGen = Long.valueOf( @@ -2954,7 +2939,6 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); // we need to reuse the engine config unless the parser.mappingModified won't work engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertVisibleCount(engine, numDocs, false); @@ -3718,7 +3702,6 @@ public class InternalEngineTests extends EngineTestCase { InternalEngine engine = new InternalEngine(configSupplier.apply(store))) { assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(timestamp1, engine.segmentsStats(false, false).getMaxUnsafeAutoIdTimestamp()); final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), @@ -4085,7 +4068,6 @@ public class InternalEngineTests extends EngineTestCase { IOUtils.close(initialEngine); } try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); recoveringEngine.fillSeqNoGaps(2); assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1))); @@ -4197,7 +4179,6 @@ public class InternalEngineTests extends EngineTestCase { throw new UnsupportedOperationException(); } }; - noOpEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get()); final String reason = "filling gaps"; @@ -4433,7 +4414,6 @@ public class InternalEngineTests extends EngineTestCase { totalTranslogOps = engine.getTranslog().totalOperations(); } try (InternalEngine engine = new InternalEngine(engineConfig)) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, globalCheckpoint.get()); engine.restoreLocalHistoryFromTranslog(translogHandler); assertThat(getDocIds(engine, true), equalTo(prevDocs)); @@ -4480,7 +4460,6 @@ public class InternalEngineTests extends EngineTestCase { assertEquals(checkpointOnReplica, replicaEngine.getLocalCheckpoint()); recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get)); assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations()); - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4516,7 +4495,6 @@ public class InternalEngineTests extends EngineTestCase { if (flushed) { assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0)); } - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo()); assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint()); @@ -4711,7 +4689,6 @@ public class InternalEngineTests extends EngineTestCase { super.commitIndexWriter(writer, translog, syncId); } }) { - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); int numDocs = scaledRandomIntBetween(10, 100); for (int docId = 0; docId < numDocs; docId++) { @@ -5485,8 +5462,7 @@ public class InternalEngineTests extends EngineTestCase { engine.close(); Set liveDocIds = new HashSet<>(); engine = new InternalEngine(engine.config()); - assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-2L)); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); + assertThat(engine.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(-1L)); int numOps = between(1, 500); for (int i = 0; i < numOps; i++) { long currentMaxSeqNoOfUpdates = engine.getMaxSeqNoOfUpdatesOrDeletes(); @@ -5556,7 +5532,6 @@ public class InternalEngineTests extends EngineTestCase { "seq_no=" + op.seqNo() + " max_seq_no=" + tracker.getMaxSeqNo() + "checkpoint=" + tracker.getCheckpoint(), tracker.contains(op.seqNo()), equalTo(seqNosInSafeCommit.contains(op.seqNo()))); } - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); assertThat(getDocIds(engine, true), equalTo(docs)); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java index b689400601dc..f9437ac9251b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java @@ -92,7 +92,6 @@ public class ReadOnlyEngineTests extends EngineTestCase { } // Close and reopen the main engine try (InternalEngine recoveringEngine = new InternalEngine(config)) { - recoveringEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); // the locked down engine should still point to the previous commit assertThat(readOnlyEngine.getLocalCheckpoint(), equalTo(lastSeqNoStats.getLocalCheckpoint())); @@ -224,7 +223,6 @@ public class ReadOnlyEngineTests extends EngineTestCase { } try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null , null, true, Function.identity())) { final TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - readOnlyEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); readOnlyEngine.recoverFromTranslog(translogHandler, randomNonNegativeLong()); assertThat(translogHandler.appliedOperations(), equalTo(0L)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index bf1bfa668829..b34f364bbed2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -1110,11 +1110,9 @@ public class IndexShardTests extends IndexShardTestCase { indexShard.updateGlobalCheckpointOnReplica(globalCheckpointOnReplica, "test"); final long globalCheckpoint = randomLongBetween(UNASSIGNED_SEQ_NO, indexShard.getLocalCheckpoint()); - final long currentMaxSeqNoOfUpdates = indexShard.getMaxSeqNoOfUpdatesOrDeletes(); final long maxSeqNoOfUpdatesOrDeletes = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo); final Set docsBeforeRollback = getShardDocUIDs(indexShard); final CountDownLatch latch = new CountDownLatch(1); - final boolean shouldRollback = Math.max(globalCheckpointOnReplica, globalCheckpoint) < maxSeqNo; randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + 1, globalCheckpoint, @@ -1133,13 +1131,7 @@ public class IndexShardTests extends IndexShardTestCase { }, ""); latch.await(); - if (shouldRollback) { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Collections.max( - Arrays.asList(maxSeqNoOfUpdatesOrDeletes, globalCheckpoint, globalCheckpointOnReplica)) - )); - } else { - assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(maxSeqNoOfUpdatesOrDeletes, currentMaxSeqNoOfUpdates))); - } + assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); final ShardRouting newRouting = indexShard.routingEntry().moveActiveReplicaToPrimary(); final CountDownLatch resyncLatch = new CountDownLatch(1); indexShard.updateShardState( @@ -1154,7 +1146,6 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo)); assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo)); assertThat(getShardDocUIDs(indexShard), equalTo(docsBeforeRollback)); - // we conservatively roll MSU forward to maxSeqNo during restoreLocalHistory, ideally it should become just currentMaxSeqNoOfUpdates assertThat(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNo)); closeShard(indexShard, false); } @@ -3653,6 +3644,7 @@ public class IndexShardTests extends IndexShardTestCase { public void testResetEngine() throws Exception { IndexShard shard = newStartedShard(false); indexOnReplicaWithGaps(shard, between(0, 1000), Math.toIntExact(shard.getLocalCheckpoint())); + long maxSeqNoBeforeRollback = shard.seqNoStats().getMaxSeqNo(); final long globalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint(), shard.getLocalCheckpoint()); shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); Set docBelowGlobalCheckpoint = getShardDocUIDs(shard).stream() @@ -3694,7 +3686,7 @@ public class IndexShardTests extends IndexShardTestCase { assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); - assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(globalCheckpoint)); + assertThat(shard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(maxSeqNoBeforeRollback)); done.set(true); thread.join(); closeShard(shard, false); diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 0216756e65a8..e264d33ffed6 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -152,7 +152,6 @@ public class RefreshListenersTests extends ESTestCase { () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); - engine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2a5b11079562..afa319af7e1c 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -515,7 +515,6 @@ public abstract class EngineTestCase extends ESTestCase { } InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config); - internalEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return internalEngine; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index bbb0689a8a7e..619e0a04baf9 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -133,7 +133,8 @@ public final class FollowingEngine extends InternalEngine { @Override protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { - // ignore, this is not really a primary + assert getMaxSeqNoOfUpdatesOrDeletes() >= seqNo : seqNo + " < " + getMaxSeqNoOfUpdatesOrDeletes(); + super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index e3d997886334..4a56d6370eb9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -289,7 +289,6 @@ public class FollowingEngineTests extends ESTestCase { store.associateIndexWithNewTranslog(translogUuid); FollowingEngine followingEngine = new FollowingEngine(config); TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings()); - followingEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); return followingEngine; } @@ -495,7 +494,6 @@ public class FollowingEngineTests extends ESTestCase { leaderStore.associateIndexWithNewTranslog(Translog.createEmptyTranslog( leaderConfig.getTranslogConfig().getTranslogPath(), SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L)); try (InternalEngine leaderEngine = new InternalEngine(leaderConfig)) { - leaderEngine.reinitializeMaxSeqNoOfUpdatesOrDeletes(); leaderEngine.skipTranslogRecovery(); Settings followerSettings = Settings.builder() .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)