Reuse JoinTaskExecutor (#85325)

Currently, we are not reusing the JoinTaskExecutor that could result in
a long-running processing of outdated join or election requests. This
change keeps the executor permanently and drops all but latest requests.
This commit is contained in:
Ievgen Degtiarenko 2022-03-28 18:14:23 +02:00 committed by GitHub
parent 3dec58dabe
commit e4888dd808
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 126 additions and 68 deletions

View file

@ -0,0 +1,5 @@
pr: 85325
summary: Reuse `JoinTaskExecutor`
area: Cluster Coordination
type: enhancement
issues: []

View file

@ -70,7 +70,7 @@ public class JoinHelper {
private final AllocationService allocationService; private final AllocationService allocationService;
private final MasterService masterService; private final MasterService masterService;
private final TransportService transportService; private final TransportService transportService;
private volatile JoinTaskExecutor joinTaskExecutor; private final JoinTaskExecutor joinTaskExecutor;
private final LongSupplier currentTermSupplier; private final LongSupplier currentTermSupplier;
private final RerouteService rerouteService; private final RerouteService rerouteService;
private final NodeHealthService nodeHealthService; private final NodeHealthService nodeHealthService;
@ -97,6 +97,7 @@ public class JoinHelper {
this.allocationService = allocationService; this.allocationService = allocationService;
this.masterService = masterService; this.masterService = masterService;
this.transportService = transportService; this.transportService = transportService;
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
this.currentTermSupplier = currentTermSupplier; this.currentTermSupplier = currentTermSupplier;
this.rerouteService = rerouteService; this.rerouteService = rerouteService;
this.nodeHealthService = nodeHealthService; this.nodeHealthService = nodeHealthService;
@ -343,8 +344,12 @@ public class JoinHelper {
class LeaderJoinAccumulator implements JoinAccumulator { class LeaderJoinAccumulator implements JoinAccumulator {
@Override @Override
public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinListener) { public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinListener) {
final JoinTask task = JoinTask.singleNode(sender, joinReasonService.getJoinReason(sender, Mode.LEADER), joinListener); final JoinTask task = JoinTask.singleNode(
assert joinTaskExecutor != null; sender,
joinReasonService.getJoinReason(sender, Mode.LEADER),
joinListener,
currentTermSupplier.getAsLong()
);
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
} }
@ -406,18 +411,16 @@ public class JoinHelper {
joinReasonService.getJoinReason(discoveryNode, Mode.CANDIDATE), joinReasonService.getJoinReason(discoveryNode, Mode.CANDIDATE),
listener listener
); );
})); }), currentTermSupplier.getAsLong());
joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, currentTermSupplier.getAsLong());
masterService.submitStateUpdateTask( masterService.submitStateUpdateTask(
"elected-as-master ([" + joinTask.nodeCount() + "] nodes joined)", "elected-as-master ([" + joinTask.nodeCount() + "] nodes joined)",
joinTask, joinTask,
ClusterStateTaskConfig.build(Priority.URGENT), ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor joinTaskExecutor
); );
} else { } else {
assert newMode == Mode.FOLLOWER : newMode; assert newMode == Mode.FOLLOWER : newMode;
joinTaskExecutor = null;
joinRequestAccumulator.values() joinRequestAccumulator.values()
.forEach(joinCallback -> joinCallback.onFailure(new CoordinationStateRejectedException("became follower"))); .forEach(joinCallback -> joinCallback.onFailure(new CoordinationStateRejectedException("became follower")));
} }

View file

@ -13,24 +13,18 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Stream; import java.util.stream.Stream;
public record JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) implements ClusterStateTaskListener { public record JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster, long term) implements ClusterStateTaskListener {
public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener<Void> listener) { public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener<Void> listener, long term) {
return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false); return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false, term);
} }
public static JoinTask completingElection(Stream<NodeJoinTask> nodeJoinTaskStream) { public static JoinTask completingElection(Stream<NodeJoinTask> nodeJoinTaskStream, long term) {
return new JoinTask(nodeJoinTaskStream.toList(), true); return new JoinTask(nodeJoinTaskStream.toList(), true, term);
}
public JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) {
this.nodeJoinTasks = Collections.unmodifiableList(nodeJoinTasks);
this.isBecomingMaster = isBecomingMaster;
} }
public int nodeCount() { public int nodeCount() {

View file

@ -41,12 +41,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTask> {
private final AllocationService allocationService; private final AllocationService allocationService;
private final RerouteService rerouteService; private final RerouteService rerouteService;
private final long term;
public JoinTaskExecutor(AllocationService allocationService, RerouteService rerouteService, long term) { public JoinTaskExecutor(AllocationService allocationService, RerouteService rerouteService) {
this.allocationService = allocationService; this.allocationService = allocationService;
this.rerouteService = rerouteService; this.rerouteService = rerouteService;
this.term = term;
} }
@Override @Override
@ -54,6 +52,19 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTask> {
// The current state that MasterService uses might have been updated by a (different) master in a higher term already. If so, stop // The current state that MasterService uses might have been updated by a (different) master in a higher term already. If so, stop
// processing the current cluster state update, there's no point in continuing to compute it as it will later be rejected by // processing the current cluster state update, there's no point in continuing to compute it as it will later be rejected by
// Coordinator#publish anyhow. // Coordinator#publish anyhow.
assert joinTaskContexts.isEmpty() == false : "Expected to have non empty join tasks list";
var term = joinTaskContexts.stream().mapToLong(t -> t.getTask().term()).max().getAsLong();
var split = joinTaskContexts.stream().collect(Collectors.partitioningBy(t -> t.getTask().term() == term));
for (TaskContext<JoinTask> outdated : split.get(false)) {
outdated.onFailure(
new NotMasterException("Higher term encountered (encountered: " + term + " > used: " + outdated.getTask().term() + ")")
);
}
joinTaskContexts = split.get(true);
if (currentState.term() > term) { if (currentState.term() > term) {
logger.trace("encountered higher term {} than current {}, there is a newer master", currentState.term(), term); logger.trace("encountered higher term {} than current {}, there is a newer master", currentState.term(), term);
throw new NotMasterException( throw new NotMasterException(
@ -71,7 +82,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTask> {
// use these joins to try and become the master. // use these joins to try and become the master.
// Note that we don't have to do any validation of the amount of joining nodes - the commit // Note that we don't have to do any validation of the amount of joining nodes - the commit
// during the cluster state publishing guarantees that we have enough // during the cluster state publishing guarantees that we have enough
newState = becomeMasterAndTrimConflictingNodes(currentState, joinTaskContexts); newState = becomeMasterAndTrimConflictingNodes(currentState, joinTaskContexts, term);
nodesChanged = true; nodesChanged = true;
} else if (currentNodes.isLocalNodeElectedMaster()) { } else if (currentNodes.isLocalNodeElectedMaster()) {
assert currentState.term() == term : "term should be stable for the same master"; assert currentState.term() == term : "term should be stable for the same master";
@ -186,7 +197,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTask> {
protected ClusterState.Builder becomeMasterAndTrimConflictingNodes( protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(
ClusterState currentState, ClusterState currentState,
List<TaskContext<JoinTask>> taskContexts List<TaskContext<JoinTask>> taskContexts,
long term
) { ) {
assert currentState.nodes().getMasterNodeId() == null : currentState; assert currentState.nodes().getMasterNodeId() == null : currentState;
assert currentState.term() < term : term + " vs " + currentState; assert currentState.term() < term : term + " vs " + currentState;

View file

@ -39,6 +39,7 @@ import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -160,11 +161,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
// in the cluster but with a different set of roles: the node didn't change roles, but the cluster state came via an older master. // in the cluster but with a different set of roles: the node didn't change roles, but the cluster state came via an older master.
// In this case we must properly process its join to ensure that the roles are correct. // In this case we must properly process its join to ensure that the roles are correct.
final AllocationService allocationService = mock(AllocationService.class); final AllocationService allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, 0L); final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
@ -187,25 +188,18 @@ public class JoinTaskExecutorTests extends ESTestCase {
final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
List.of( List.of(JoinTask.singleNode(actualNode, "test", NOT_COMPLETED_LISTENER, 0L))
JoinTask.singleNode(
actualNode,
"test",
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
)
)
); );
assertThat(resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles())); assertThat(resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles()));
} }
public void testRejectsStatesWithStaleTerm() { public void testRejectsStatesWithStaleTerm() {
final var allocationService = mock(AllocationService.class); final var allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long executorTerm = randomLongBetween(0L, Long.MAX_VALUE - 1); final long executorTerm = randomLongBetween(0L, Long.MAX_VALUE - 1);
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final var clusterState = ClusterState.builder(ClusterName.DEFAULT) final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
@ -224,9 +218,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
randomBoolean() randomBoolean()
? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)) ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
: List.of( : List.of(
JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER))) JoinTask.completingElection(
Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
executorTerm
)
), ),
t -> fail("should not succeed"), t -> fail("should not succeed"),
(t, e) -> assertThat(e, instanceOf(NotMasterException.class)) (t, e) -> assertThat(e, instanceOf(NotMasterException.class))
@ -237,12 +234,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
} }
public void testRejectsStatesWithOtherMaster() { public void testRejectsStatesWithOtherMaster() {
final var allocationService = mock(AllocationService.class); final var allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long executorTerm = randomNonNegativeLong(); final long executorTerm = randomNonNegativeLong();
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final var localNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var localNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
@ -269,9 +265,12 @@ public class JoinTaskExecutorTests extends ESTestCase {
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
randomBoolean() randomBoolean()
? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)) ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
: List.of( : List.of(
JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER))) JoinTask.completingElection(
Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
executorTerm
)
), ),
t -> fail("should not succeed"), t -> fail("should not succeed"),
(t, e) -> assertThat(e, instanceOf(NotMasterException.class)) (t, e) -> assertThat(e, instanceOf(NotMasterException.class))
@ -282,12 +281,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
} }
public void testRejectsStatesWithNoMasterIfNotBecomingMaster() { public void testRejectsStatesWithNoMasterIfNotBecomingMaster() {
final var allocationService = mock(AllocationService.class); final var allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long executorTerm = randomNonNegativeLong(); final long executorTerm = randomNonNegativeLong();
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); final var masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
final var clusterState = ClusterState.builder(ClusterName.DEFAULT) final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
@ -305,7 +303,7 @@ public class JoinTaskExecutorTests extends ESTestCase {
() -> ClusterStateTaskExecutorUtils.executeHandlingResults( () -> ClusterStateTaskExecutorUtils.executeHandlingResults(
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)), List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm)),
t -> fail("should not succeed"), t -> fail("should not succeed"),
(t, e) -> assertThat(e, instanceOf(NotMasterException.class)) (t, e) -> assertThat(e, instanceOf(NotMasterException.class))
) )
@ -315,15 +313,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
} }
public void testRemovesOlderNodeInstancesWhenBecomingMaster() throws Exception { public void testRemovesOlderNodeInstancesWhenBecomingMaster() throws Exception {
final var allocationService = mock(AllocationService.class); final var allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then(
invocationOnMock -> invocationOnMock.getArguments()[0]
);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long executorTerm = randomLongBetween(1, Long.MAX_VALUE); final long executorTerm = randomLongBetween(1, Long.MAX_VALUE);
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final var otherNodeOld = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var otherNodeOld = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
@ -355,7 +349,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
Stream.of( Stream.of(
new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER), new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER),
new JoinTask.NodeJoinTask(otherNodeNew, "test", NOT_COMPLETED_LISTENER) new JoinTask.NodeJoinTask(otherNodeNew, "test", NOT_COMPLETED_LISTENER)
) ),
executorTerm
) )
) )
); );
@ -374,8 +369,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
afterElectionClusterState, afterElectionClusterState,
joinTaskExecutor, joinTaskExecutor,
List.of( List.of(
JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER), JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm),
JoinTask.singleNode(otherNodeOld, "test", NOT_COMPLETED_LISTENER) JoinTask.singleNode(otherNodeOld, "test", NOT_COMPLETED_LISTENER, executorTerm)
) )
).nodes().get(otherNodeNew.getId()).getEphemeralId(), ).nodes().get(otherNodeNew.getId()).getEphemeralId(),
equalTo(otherNodeNew.getEphemeralId()) equalTo(otherNodeNew.getEphemeralId())
@ -383,15 +378,11 @@ public class JoinTaskExecutorTests extends ESTestCase {
} }
public void testUpdatesVotingConfigExclusionsIfNeeded() throws Exception { public void testUpdatesVotingConfigExclusionsIfNeeded() throws Exception {
final var allocationService = mock(AllocationService.class); final var allocationService = createAllocationService();
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then(
invocationOnMock -> invocationOnMock.getArguments()[0]
);
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long executorTerm = randomLongBetween(1, Long.MAX_VALUE); final long executorTerm = randomLongBetween(1, Long.MAX_VALUE);
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final var otherNode = new DiscoveryNode( final var otherNode = new DiscoveryNode(
@ -435,7 +426,8 @@ public class JoinTaskExecutorTests extends ESTestCase {
Stream.of( Stream.of(
new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER), new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER),
new JoinTask.NodeJoinTask(otherNode, "test", NOT_COMPLETED_LISTENER) new JoinTask.NodeJoinTask(otherNode, "test", NOT_COMPLETED_LISTENER)
) ),
executorTerm
) )
) )
); );
@ -443,12 +435,17 @@ public class JoinTaskExecutorTests extends ESTestCase {
clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
List.of(JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)))) List.of(
JoinTask.completingElection(
Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)),
executorTerm
)
)
); );
clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful(
clusterState, clusterState,
joinTaskExecutor, joinTaskExecutor,
List.of(JoinTask.singleNode(otherNode, "test", NOT_COMPLETED_LISTENER)) List.of(JoinTask.singleNode(otherNode, "test", NOT_COMPLETED_LISTENER, executorTerm))
); );
} }
@ -460,4 +457,49 @@ public class JoinTaskExecutorTests extends ESTestCase {
); );
} }
public void testIgnoresOlderTerms() throws Exception {
final var allocationService = createAllocationService();
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
final long currentTerm = randomLongBetween(100, 1000);
final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService);
final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT);
final var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).build())
.metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build()).build())
.build();
var tasks = Stream.concat(
Stream.generate(() -> createRandomTask(masterNode, "outdated", randomLongBetween(0, currentTerm - 1)))
.limit(randomLongBetween(1, 10)),
Stream.of(createRandomTask(masterNode, "current", currentTerm))
).toList();
ClusterStateTaskExecutorUtils.executeHandlingResults(
clusterState,
joinTaskExecutor,
tasks,
t -> assertThat(t.term(), equalTo(currentTerm)),
(t, e) -> {
assertThat(t.term(), lessThan(currentTerm));
assertThat(e, instanceOf(NotMasterException.class));
}
);
}
private static JoinTask createRandomTask(DiscoveryNode node, String reason, long term) {
return randomBoolean()
? JoinTask.singleNode(node, reason, NOT_COMPLETED_LISTENER, term)
: JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(node, reason, NOT_COMPLETED_LISTENER)), term);
}
private static AllocationService createAllocationService() {
final var allocationService = mock(AllocationService.class);
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then(
invocationOnMock -> invocationOnMock.getArguments()[0]
);
return allocationService;
}
} }

View file

@ -349,13 +349,14 @@ public class ClusterStateChanges {
public ClusterState addNode(ClusterState clusterState, DiscoveryNode discoveryNode) { public ClusterState addNode(ClusterState clusterState, DiscoveryNode discoveryNode) {
return runTasks( return runTasks(
new JoinTaskExecutor(allocationService, (s, p, r) -> {}, clusterState.term()), new JoinTaskExecutor(allocationService, (s, p, r) -> {}),
clusterState, clusterState,
List.of( List.of(
JoinTask.singleNode( JoinTask.singleNode(
discoveryNode, discoveryNode,
"dummy reason", "dummy reason",
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }),
clusterState.term()
) )
) )
); );
@ -363,7 +364,7 @@ public class ClusterStateChanges {
public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) { public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<DiscoveryNode> nodes) {
return runTasks( return runTasks(
new JoinTaskExecutor(allocationService, (s, p, r) -> {}, clusterState.term() + between(1, 10)), new JoinTaskExecutor(allocationService, (s, p, r) -> {}),
clusterState, clusterState,
List.of( List.of(
JoinTask.completingElection( JoinTask.completingElection(
@ -374,7 +375,8 @@ public class ClusterStateChanges {
"dummy reason", "dummy reason",
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
) )
) ),
clusterState.term() + between(1, 10)
) )
) )
); );