mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Remove checking of sync commit ids (#114246)
A Lucene commit doesn't contain sync ids `SegmentInfos` anymore, so we can't rely on them during recovery. The fields was marked as deprecated in #102343.
This commit is contained in:
parent
d5931941ae
commit
d93d333141
18 changed files with 41 additions and 261 deletions
|
@ -9,13 +9,13 @@
|
|||
====
|
||||
cat APIs are only intended for human consumption using the command line or {kib}
|
||||
console.
|
||||
They are _not_ intended for use by applications. For application
|
||||
They are _not_ intended for use by applications. For application
|
||||
consumption, use the <<cluster-state,cluster state API>>.
|
||||
====
|
||||
|
||||
The `shards` command is the detailed view of all nodes' shard <<shard-allocation-relocation-recovery,allocation>>.
|
||||
It will tell you if the shard is a primary or replica, the number of docs, the
|
||||
bytes it takes on disk, the node where it's located, and if the shard is
|
||||
It will tell you if the shard is a primary or replica, the number of docs, the
|
||||
bytes it takes on disk, the node where it's located, and if the shard is
|
||||
currently <<shard-allocation-relocation-recovery,recovering>>.
|
||||
|
||||
For <<data-streams,data streams>>, the API returns information about the stream's backing indices.
|
||||
|
@ -258,9 +258,6 @@ Time spent in suggest, such as `0`.
|
|||
`suggest.total`, `suto`, `suggestTotal`::
|
||||
Number of suggest operations, such as `0`.
|
||||
|
||||
`sync_id`::
|
||||
Sync ID of the shard.
|
||||
|
||||
`unassigned.at`, `ua`::
|
||||
Time at which the shard became unassigned in
|
||||
{wikipedia}/List_of_UTC_time_offsets[Coordinated Universal Time (UTC)].
|
||||
|
|
|
@ -65,4 +65,5 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task ->
|
|||
task.skipTest("indices.create/20_synthetic_source/nested object with unmapped fields", "temporary until backported")
|
||||
task.skipTest("indices.create/21_synthetic_source_stored/object param - nested object with stored array", "temporary until backported")
|
||||
task.skipTest("cat.aliases/10_basic/Deprecated local parameter", "CAT APIs not covered by compatibility policy")
|
||||
task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0")
|
||||
})
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
ip .+ \n
|
||||
id .+ \n
|
||||
node .+ \n
|
||||
sync_id .+ \n
|
||||
unassigned.reason .+ \n
|
||||
unassigned.at .+ \n
|
||||
unassigned.for .+ \n
|
||||
|
|
|
@ -14,8 +14,6 @@ import org.elasticsearch.action.admin.cluster.configuration.ClearVotingConfigExc
|
|||
import org.elasticsearch.action.admin.cluster.configuration.TransportAddVotingConfigExclusionsAction;
|
||||
import org.elasticsearch.action.admin.cluster.configuration.TransportClearVotingConfigExclusionsAction;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndexStats;
|
||||
import org.elasticsearch.action.admin.indices.stats.ShardStats;
|
||||
import org.elasticsearch.action.support.ActionTestUtils;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.coordination.ElectionSchedulerFactory;
|
||||
|
@ -30,7 +28,6 @@ import org.elasticsearch.index.Index;
|
|||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
|
@ -577,13 +574,6 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void assertSyncIdsNotNull() {
|
||||
IndexStats indexStats = indicesAdmin().prepareStats("test").get().getIndex("test");
|
||||
for (ShardStats shardStats : indexStats.getShards()) {
|
||||
assertNotNull(shardStats.getCommitStats().getUserData().get(Engine.SYNC_COMMIT_ID));
|
||||
}
|
||||
}
|
||||
|
||||
public void testStartedShardFoundIfStateNotYetProcessed() throws Exception {
|
||||
// nodes may need to report the shards they processed the initial recovered cluster state from the master
|
||||
final String nodeName = internalCluster().startNode();
|
||||
|
|
|
@ -210,19 +210,8 @@ public class NodeAllocationResult implements ToXContentObject, Writeable, Compar
|
|||
return allocationId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@code true} if the shard copy has a matching sync id with the primary shard.
|
||||
* Returns {@code false} if the shard copy does not have a matching sync id with the primary
|
||||
* shard, or this explanation pertains to the allocation of a primary shard, in which case
|
||||
* matching sync ids are irrelevant.
|
||||
*/
|
||||
public boolean hasMatchingSyncId() {
|
||||
return matchingBytes == Long.MAX_VALUE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the number of matching bytes the shard copy has with the primary shard.
|
||||
* Returns {@code Long.MAX_VALUE} if {@link #hasMatchingSyncId()} returns {@code true}.
|
||||
* Returns -1 if not applicable (this value only applies to assigning replica shards).
|
||||
*/
|
||||
public long getMatchingBytes() {
|
||||
|
@ -263,11 +252,7 @@ public class NodeAllocationResult implements ToXContentObject, Writeable, Compar
|
|||
builder.field("allocation_id", allocationId);
|
||||
}
|
||||
if (matchingBytes >= 0) {
|
||||
if (hasMatchingSyncId()) {
|
||||
builder.field("matching_sync_id", true);
|
||||
} else {
|
||||
builder.humanReadableField("matching_size_in_bytes", "matching_size", ByteSizeValue.ofBytes(matchingBytes));
|
||||
}
|
||||
builder.humanReadableField("matching_size_in_bytes", "matching_size", ByteSizeValue.ofBytes(matchingBytes));
|
||||
}
|
||||
if (storeException != null) {
|
||||
builder.startObject("store_exception");
|
||||
|
|
|
@ -439,14 +439,6 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
return sizeMatched;
|
||||
}
|
||||
|
||||
private static boolean hasMatchingSyncId(
|
||||
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
|
||||
TransportNodesListShardStoreMetadata.StoreFilesMetadata replicaStore
|
||||
) {
|
||||
String primarySyncId = primaryStore.syncId();
|
||||
return primarySyncId != null && primarySyncId.equals(replicaStore.syncId());
|
||||
}
|
||||
|
||||
private static MatchingNode computeMatchingNode(
|
||||
DiscoveryNode primaryNode,
|
||||
TransportNodesListShardStoreMetadata.StoreFilesMetadata primaryStore,
|
||||
|
@ -455,8 +447,7 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
) {
|
||||
final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode);
|
||||
final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode);
|
||||
final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0)
|
||||
|| hasMatchingSyncId(primaryStore, replicaStore);
|
||||
final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0);
|
||||
final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore);
|
||||
return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery);
|
||||
}
|
||||
|
@ -470,9 +461,6 @@ public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
|
|||
if (targetNodeStore == null || targetNodeStore.storeFilesMetadata().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (hasMatchingSyncId(primaryStore, targetNodeStore.storeFilesMetadata())) {
|
||||
return true;
|
||||
}
|
||||
return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,6 @@ import org.elasticsearch.core.RefCounted;
|
|||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.core.UpdateForV9;
|
||||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.mapper.DocumentParser;
|
||||
|
@ -117,8 +116,6 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
|
|||
|
||||
public abstract class Engine implements Closeable {
|
||||
|
||||
@UpdateForV9(owner = UpdateForV9.Owner.DISTRIBUTED_INDEXING) // TODO: Remove sync_id in 9.0
|
||||
public static final String SYNC_COMMIT_ID = "sync_id";
|
||||
public static final String HISTORY_UUID_KEY = "history_uuid";
|
||||
public static final String FORCE_MERGE_UUID_KEY = "force_merge_uuid";
|
||||
public static final String MIN_RETAINED_SEQNO = "min_retained_seq_no";
|
||||
|
|
|
@ -1150,14 +1150,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the sync id of the commit point that this MetadataSnapshot represents.
|
||||
*
|
||||
* @return sync id if exists, else null
|
||||
*/
|
||||
public String getSyncId() {
|
||||
return commitUserData.get(Engine.SYNC_COMMIT_ID);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -534,56 +534,30 @@ public class RecoverySourceHandler {
|
|||
);
|
||||
}
|
||||
}
|
||||
// When sync ids were used we could use them to check if two shard copies were equivalent,
|
||||
// if that's the case we can skip sending files from the source shard to the target shard.
|
||||
// If the shard uses the current replication mechanism, we have to compute the recovery plan,
|
||||
// and it is still possible to skip the sending files from the source shard to the target shard
|
||||
// using a different mechanism to determine it.
|
||||
// TODO: is this still relevant today?
|
||||
if (hasSameLegacySyncId(recoverySourceMetadata, request.metadataSnapshot()) == false) {
|
||||
cancellableThreads.checkForCancel();
|
||||
SubscribableListener
|
||||
// compute the plan
|
||||
.<ShardRecoveryPlan>newForked(
|
||||
l -> recoveryPlannerService.computeRecoveryPlan(
|
||||
shard.shardId(),
|
||||
shardStateIdentifier,
|
||||
recoverySourceMetadata,
|
||||
request.metadataSnapshot(),
|
||||
startingSeqNo,
|
||||
translogOps.getAsInt(),
|
||||
getRequest().targetNode().getMaxIndexVersion(),
|
||||
canUseSnapshots(),
|
||||
request.isPrimaryRelocation(),
|
||||
l
|
||||
)
|
||||
cancellableThreads.checkForCancel();
|
||||
SubscribableListener
|
||||
// compute the plan
|
||||
.<ShardRecoveryPlan>newForked(
|
||||
l -> recoveryPlannerService.computeRecoveryPlan(
|
||||
shard.shardId(),
|
||||
shardStateIdentifier,
|
||||
recoverySourceMetadata,
|
||||
request.metadataSnapshot(),
|
||||
startingSeqNo,
|
||||
translogOps.getAsInt(),
|
||||
getRequest().targetNode().getMaxIndexVersion(),
|
||||
canUseSnapshots(),
|
||||
request.isPrimaryRelocation(),
|
||||
l
|
||||
)
|
||||
// perform the file recovery
|
||||
.<SendFileResult>andThen((l, plan) -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, l))
|
||||
// and respond
|
||||
.addListener(listener);
|
||||
} else {
|
||||
logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());
|
||||
SubscribableListener
|
||||
// but we must still create a retention lease
|
||||
.<RetentionLease>newForked(leaseListener -> createRetentionLease(startingSeqNo, leaseListener))
|
||||
// and then compute the result of sending no files
|
||||
.andThenApply(ignored -> {
|
||||
final TimeValue took = stopWatch.totalTime();
|
||||
logger.trace("recovery [phase1]: took [{}]", took);
|
||||
return new SendFileResult(
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
0L,
|
||||
Collections.emptyList(),
|
||||
Collections.emptyList(),
|
||||
0L,
|
||||
took
|
||||
);
|
||||
})
|
||||
// and finally respond
|
||||
.addListener(listener);
|
||||
}
|
||||
)
|
||||
// perform the file recovery
|
||||
.<SendFileResult>andThen((l, plan) -> recoverFilesFromSourceAndSnapshot(plan, store, stopWatch, l))
|
||||
// and respond
|
||||
.addListener(listener);
|
||||
} catch (Exception e) {
|
||||
throw new RecoverFilesRecoveryException(request.shardId(), 0, ByteSizeValue.ZERO, e);
|
||||
}
|
||||
|
@ -1030,43 +1004,6 @@ public class RecoverySourceHandler {
|
|||
return new ThreadedActionListener<>(shard.getThreadPool().generic(), listener).map(ignored -> null);
|
||||
}
|
||||
|
||||
boolean hasSameLegacySyncId(Store.MetadataSnapshot source, Store.MetadataSnapshot target) {
|
||||
if (source.getSyncId() == null || source.getSyncId().equals(target.getSyncId()) == false) {
|
||||
return false;
|
||||
}
|
||||
if (source.numDocs() != target.numDocs()) {
|
||||
throw new IllegalStateException(
|
||||
"try to recover "
|
||||
+ request.shardId()
|
||||
+ " from primary shard with sync id but number "
|
||||
+ "of docs differ: "
|
||||
+ source.numDocs()
|
||||
+ " ("
|
||||
+ request.sourceNode().getName()
|
||||
+ ", primary) vs "
|
||||
+ target.numDocs()
|
||||
+ "("
|
||||
+ request.targetNode().getName()
|
||||
+ ")"
|
||||
);
|
||||
}
|
||||
SequenceNumbers.CommitInfo sourceSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(source.commitUserData().entrySet());
|
||||
SequenceNumbers.CommitInfo targetSeqNos = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(target.commitUserData().entrySet());
|
||||
if (sourceSeqNos.localCheckpoint() != targetSeqNos.localCheckpoint() || targetSeqNos.maxSeqNo() != sourceSeqNos.maxSeqNo()) {
|
||||
final String message = "try to recover "
|
||||
+ request.shardId()
|
||||
+ " with sync id but "
|
||||
+ "seq_no stats are mismatched: ["
|
||||
+ source.commitUserData()
|
||||
+ "] vs ["
|
||||
+ target.commitUserData()
|
||||
+ "]";
|
||||
assert false : message;
|
||||
throw new IllegalStateException(message);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void prepareTargetForTranslog(int totalTranslogOps, ActionListener<TimeValue> listener) {
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
final ActionListener<Void> wrappedListener = ActionListener.wrap(nullVal -> {
|
||||
|
|
|
@ -259,22 +259,9 @@ public class TransportNodesListShardStoreMetadata extends TransportNodesAction<
|
|||
.orElse(-1L);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return commit sync id if exists, else null
|
||||
*/
|
||||
public String syncId() {
|
||||
return metadataSnapshot.getSyncId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StoreFilesMetadata{"
|
||||
+ ", metadataSnapshot{size="
|
||||
+ metadataSnapshot.size()
|
||||
+ ", syncId="
|
||||
+ metadataSnapshot.getSyncId()
|
||||
+ "}"
|
||||
+ '}';
|
||||
return "StoreFilesMetadata{" + ", metadataSnapshot{size=" + metadataSnapshot.size() + "}" + '}';
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -23,11 +23,11 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.Table;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ListenableFuture;
|
||||
import org.elasticsearch.core.RestApiVersion;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.index.bulk.stats.BulkStats;
|
||||
import org.elasticsearch.index.cache.query.QueryCacheStats;
|
||||
import org.elasticsearch.index.engine.CommitStats;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.SegmentsStats;
|
||||
import org.elasticsearch.index.fielddata.FieldDataStats;
|
||||
import org.elasticsearch.index.flush.FlushStats;
|
||||
|
@ -123,7 +123,9 @@ public class RestShardsAction extends AbstractCatAction {
|
|||
.addCell("id", "default:false;desc:unique id of node where it lives")
|
||||
.addCell("node", "default:true;alias:n;desc:name of node where it lives");
|
||||
|
||||
table.addCell("sync_id", "alias:sync_id;default:false;desc:sync id");
|
||||
if (request.getRestApiVersion() == RestApiVersion.V_8) {
|
||||
table.addCell("sync_id", "alias:sync_id;default:false;desc:sync id");
|
||||
}
|
||||
|
||||
table.addCell("unassigned.reason", "alias:ur;default:false;desc:reason shard became unassigned");
|
||||
table.addCell("unassigned.at", "alias:ua;default:false;desc:time shard became unassigned (UTC)");
|
||||
|
@ -320,7 +322,9 @@ public class RestShardsAction extends AbstractCatAction {
|
|||
table.addCell(null);
|
||||
}
|
||||
|
||||
table.addCell(commitStats == null ? null : commitStats.getUserData().get(Engine.SYNC_COMMIT_ID));
|
||||
if (request.getRestApiVersion() == RestApiVersion.V_8) {
|
||||
table.addCell(null);
|
||||
}
|
||||
|
||||
if (shard.unassignedInfo() != null) {
|
||||
table.addCell(shard.unassignedInfo().reason());
|
||||
|
|
|
@ -48,7 +48,6 @@ public class NodeAllocationResultTests extends ESTestCase {
|
|||
assertEquals(matchingBytes, explanation.getShardStoreInfo().getMatchingBytes());
|
||||
assertNull(explanation.getShardStoreInfo().getAllocationId());
|
||||
assertFalse(explanation.getShardStoreInfo().isInSync());
|
||||
assertFalse(explanation.getShardStoreInfo().hasMatchingSyncId());
|
||||
|
||||
String allocId = randomAlphaOfLength(5);
|
||||
boolean inSync = randomBoolean();
|
||||
|
@ -60,7 +59,6 @@ public class NodeAllocationResultTests extends ESTestCase {
|
|||
assertNodeExplanationEquals(explanation, readExplanation);
|
||||
assertEquals(inSync, explanation.getShardStoreInfo().isInSync());
|
||||
assertEquals(-1, explanation.getShardStoreInfo().getMatchingBytes());
|
||||
assertFalse(explanation.getShardStoreInfo().hasMatchingSyncId());
|
||||
assertEquals(allocId, explanation.getShardStoreInfo().getAllocationId());
|
||||
}
|
||||
|
||||
|
@ -72,7 +70,6 @@ public class NodeAllocationResultTests extends ESTestCase {
|
|||
assertEquals(expl1.getShardStoreInfo().isInSync(), expl2.getShardStoreInfo().isInSync());
|
||||
assertEquals(expl1.getShardStoreInfo().getAllocationId(), expl2.getShardStoreInfo().getAllocationId());
|
||||
assertEquals(expl1.getShardStoreInfo().getMatchingBytes(), expl2.getShardStoreInfo().getMatchingBytes());
|
||||
assertEquals(expl1.getShardStoreInfo().hasMatchingSyncId(), expl2.getShardStoreInfo().hasMatchingSyncId());
|
||||
} else {
|
||||
assertNull(expl2.getShardStoreInfo());
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.elasticsearch.common.util.set.Sets;
|
|||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.index.IndexVersion;
|
||||
import org.elasticsearch.index.IndexVersions;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.seqno.ReplicationTracker;
|
||||
import org.elasticsearch.index.seqno.RetentionLease;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -149,22 +148,6 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that when there is a sync id match but no files match, we allocate it to matching node.
|
||||
*/
|
||||
public void testSyncIdMatch() {
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders());
|
||||
DiscoveryNode nodeToMatch = randomBoolean() ? node2 : node3;
|
||||
testAllocator.addData(node1, "MATCH", new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
|
||||
.addData(nodeToMatch, "MATCH", new StoreFileMetadata("file1", 10, "NO_MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
allocateAllUnassigned(allocation);
|
||||
assertThat(shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING).size(), equalTo(1));
|
||||
assertThat(
|
||||
shardsWithState(allocation.routingNodes(), ShardRoutingState.INITIALIZING).get(0).currentNodeId(),
|
||||
equalTo(nodeToMatch.getId())
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies that when there is no sync id match but files match, we allocate it to matching node.
|
||||
*/
|
||||
|
@ -439,17 +422,6 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
public void testCancelRecoveryBetterSyncId() {
|
||||
RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders());
|
||||
testAllocator.addData(node1, "MATCH", new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
|
||||
.addData(node2, "NO_MATCH", new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION))
|
||||
.addData(node3, "MATCH", new StoreFileMetadata("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION));
|
||||
testAllocator.processExistingRecoveries(allocation, shardRouting -> true);
|
||||
assertThat(allocation.routingNodesChanged(), equalTo(true));
|
||||
assertThat(shardsWithState(allocation.routingNodes(), ShardRoutingState.UNASSIGNED).size(), equalTo(1));
|
||||
assertThat(shardsWithState(allocation.routingNodes(), ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId));
|
||||
}
|
||||
|
||||
public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() {
|
||||
final UnassignedInfo unassignedInfo;
|
||||
if (randomBoolean()) {
|
||||
|
@ -688,9 +660,6 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
|
|||
filesAsMap.put(file.name(), file);
|
||||
}
|
||||
Map<String, String> commitData = new HashMap<>();
|
||||
if (syncId != null) {
|
||||
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
|
||||
}
|
||||
data.put(
|
||||
node,
|
||||
new TransportNodesListShardStoreMetadata.StoreFilesMetadata(
|
||||
|
|
|
@ -1282,13 +1282,11 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
null,
|
||||
globalCheckpoint::get
|
||||
);
|
||||
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
|
||||
engine.index(indexForDoc(doc));
|
||||
globalCheckpoint.set(0L);
|
||||
engine.flush();
|
||||
syncFlush(indexWriterHolder.get(), engine, syncId);
|
||||
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
|
||||
syncFlush(indexWriterHolder.get(), engine);
|
||||
EngineConfig config = engine.config();
|
||||
if (randomBoolean()) {
|
||||
engine.close();
|
||||
|
@ -1306,7 +1304,6 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
}
|
||||
engine = new InternalEngine(config);
|
||||
recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE);
|
||||
assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
|
||||
}
|
||||
|
||||
public void testSyncedFlushVanishesOnReplay() throws IOException {
|
||||
|
@ -1327,31 +1324,21 @@ public class InternalEngineTests extends EngineTestCase {
|
|||
null,
|
||||
globalCheckpoint::get
|
||||
);
|
||||
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
|
||||
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), new BytesArray("{}"), null);
|
||||
globalCheckpoint.set(engine.getProcessedLocalCheckpoint());
|
||||
engine.index(indexForDoc(doc));
|
||||
engine.flush();
|
||||
syncFlush(indexWriterHolder.get(), engine, syncId);
|
||||
assertEquals(store.readLastCommittedSegmentsInfo().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
|
||||
syncFlush(indexWriterHolder.get(), engine);
|
||||
doc = testParsedDocument("2", null, testDocumentWithTextField(), new BytesArray("{}"), null);
|
||||
engine.index(indexForDoc(doc));
|
||||
EngineConfig config = engine.config();
|
||||
engine.close();
|
||||
engine = new InternalEngine(config);
|
||||
recoverFromTranslog(engine, translogHandler, Long.MAX_VALUE);
|
||||
assertNull(
|
||||
"Sync ID must be gone since we have a document to replay",
|
||||
engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID)
|
||||
);
|
||||
}
|
||||
|
||||
void syncFlush(IndexWriter writer, InternalEngine engine, String syncId) throws IOException {
|
||||
void syncFlush(IndexWriter writer, InternalEngine engine) throws IOException {
|
||||
try (var ignored = engine.acquireEnsureOpenRef()) {
|
||||
Map<String, String> userData = new HashMap<>();
|
||||
writer.getLiveCommitData().forEach(e -> userData.put(e.getKey(), e.getValue()));
|
||||
userData.put(Engine.SYNC_COMMIT_ID, syncId);
|
||||
writer.setLiveCommitData(userData.entrySet());
|
||||
writer.commit();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
|
|||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.Maps;
|
||||
import org.elasticsearch.core.IOUtils;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
|
@ -996,17 +995,12 @@ public class StoreTests extends ESTestCase {
|
|||
Document doc = new Document();
|
||||
doc.add(new TextField("id", "1", Field.Store.NO));
|
||||
writer.addDocument(doc);
|
||||
Map<String, String> commitData = Maps.newMapWithExpectedSize(2);
|
||||
String syncId = "a sync id";
|
||||
commitData.put(Engine.SYNC_COMMIT_ID, syncId);
|
||||
writer.setLiveCommitData(commitData.entrySet());
|
||||
writer.commit();
|
||||
writer.close();
|
||||
Store.MetadataSnapshot metadata;
|
||||
metadata = store.getMetadata(randomBoolean() ? null : deletionPolicy.snapshot());
|
||||
assertFalse(metadata.fileMetadataMap().isEmpty());
|
||||
// do not check for correct files, we have enough tests for that above
|
||||
assertThat(metadata.commitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
|
||||
TestUtil.checkIndex(store.directory());
|
||||
assertDeleteContent(store, store.directory());
|
||||
IOUtils.close(store);
|
||||
|
@ -1041,7 +1035,6 @@ public class StoreTests extends ESTestCase {
|
|||
for (StoreFileMetadata inFile : inStoreFileMetadata) {
|
||||
assertThat(inFile.name(), equalTo(outFiles.next().name()));
|
||||
}
|
||||
assertThat(outStoreFileMetadata.syncId(), equalTo(inStoreFileMetadata.syncId()));
|
||||
assertThat(outStoreFileMetadata.peerRecoveryRetentionLeases(), equalTo(peerRecoveryRetentionLeases));
|
||||
}
|
||||
|
||||
|
|
|
@ -1079,50 +1079,6 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
|
|||
store.close();
|
||||
}
|
||||
|
||||
public void testVerifySeqNoStatsWhenRecoverWithSyncId() throws Exception {
|
||||
IndexShard shard = mock(IndexShard.class);
|
||||
when(shard.state()).thenReturn(IndexShardState.STARTED);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(
|
||||
shard,
|
||||
new TestRecoveryTargetHandler(),
|
||||
threadPool,
|
||||
getStartRecoveryRequest(),
|
||||
between(1, 16),
|
||||
between(1, 4),
|
||||
between(1, 4),
|
||||
between(1, 4),
|
||||
false,
|
||||
recoveryPlannerService
|
||||
);
|
||||
|
||||
String syncId = UUIDs.randomBase64UUID();
|
||||
int numDocs = between(0, 1000);
|
||||
long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
assertTrue(
|
||||
handler.hasSameLegacySyncId(
|
||||
newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs),
|
||||
newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs)
|
||||
)
|
||||
);
|
||||
|
||||
AssertionError error = expectThrows(AssertionError.class, () -> {
|
||||
long localCheckpointOnTarget = randomValueOtherThan(
|
||||
localCheckpoint,
|
||||
() -> randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE)
|
||||
);
|
||||
long maxSeqNoOnTarget = randomValueOtherThan(
|
||||
maxSeqNo,
|
||||
() -> randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE)
|
||||
);
|
||||
handler.hasSameLegacySyncId(
|
||||
newMetadataSnapshot(syncId, Long.toString(localCheckpoint), Long.toString(maxSeqNo), numDocs),
|
||||
newMetadataSnapshot(syncId, Long.toString(localCheckpointOnTarget), Long.toString(maxSeqNoOnTarget), numDocs)
|
||||
);
|
||||
});
|
||||
assertThat(error.getMessage(), containsString("try to recover [index][1] with sync id but seq_no stats are mismatched:"));
|
||||
}
|
||||
|
||||
public void testRecoveryPlannerServiceIsUsed() throws Exception {
|
||||
try (Store store = newStore(createTempDir("source"), false)) {
|
||||
IndexShard shard = mock(IndexShard.class);
|
||||
|
@ -1784,7 +1740,6 @@ public class RecoverySourceHandlerTests extends MapperServiceTestCase {
|
|||
|
||||
private Store.MetadataSnapshot newMetadataSnapshot(String syncId, String localCheckpoint, String maxSeqNo, int numDocs) {
|
||||
Map<String, String> userData = new HashMap<>();
|
||||
userData.put(Engine.SYNC_COMMIT_ID, syncId);
|
||||
if (localCheckpoint != null) {
|
||||
userData.put(SequenceNumbers.LOCAL_CHECKPOINT_KEY, localCheckpoint);
|
||||
}
|
||||
|
|
|
@ -100,6 +100,7 @@ public class RestShardsActionTests extends ESTestCase {
|
|||
assertThat(headers.get(7).value, equalTo("ip"));
|
||||
assertThat(headers.get(8).value, equalTo("id"));
|
||||
assertThat(headers.get(9).value, equalTo("node"));
|
||||
assertThat(headers.get(10).value, equalTo("unassigned.reason"));
|
||||
|
||||
final List<List<Table.Cell>> rows = table.getRows();
|
||||
assertThat(rows.size(), equalTo(numShards));
|
||||
|
@ -114,8 +115,8 @@ public class RestShardsActionTests extends ESTestCase {
|
|||
assertThat(row.get(3).value, equalTo(shardRouting.state()));
|
||||
assertThat(row.get(7).value, equalTo(localNode.getHostAddress()));
|
||||
assertThat(row.get(8).value, equalTo(localNode.getId()));
|
||||
assertThat(row.get(70).value, equalTo(shardStats.getDataPath()));
|
||||
assertThat(row.get(71).value, equalTo(shardStats.getStatePath()));
|
||||
assertThat(row.get(69).value, equalTo(shardStats.getDataPath()));
|
||||
assertThat(row.get(70).value, equalTo(shardStats.getStatePath()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,4 +20,5 @@ testClusters.configureEach {
|
|||
|
||||
tasks.named("yamlRestCompatTestTransform").configure({ task ->
|
||||
task.skipTest("stack/10_basic/Test kibana reporting index auto creation", "warning does not exist for compatibility")
|
||||
task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0")
|
||||
})
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue