From e4888dd80814a75d45c1f0dad4b1314a3419cd7c Mon Sep 17 00:00:00 2001 From: Ievgen Degtiarenko Date: Mon, 28 Mar 2022 18:14:23 +0200 Subject: [PATCH] Reuse JoinTaskExecutor (#85325) Currently, we are not reusing the JoinTaskExecutor that could result in a long-running processing of outdated join or election requests. This change keeps the executor permanently and drops all but latest requests. --- docs/changelog/85325.yaml | 5 + .../cluster/coordination/JoinHelper.java | 17 ++- .../cluster/coordination/JoinTask.java | 16 +-- .../coordination/JoinTaskExecutor.java | 22 +++- .../coordination/JoinTaskExecutorTests.java | 124 ++++++++++++------ .../indices/cluster/ClusterStateChanges.java | 10 +- 6 files changed, 126 insertions(+), 68 deletions(-) create mode 100644 docs/changelog/85325.yaml diff --git a/docs/changelog/85325.yaml b/docs/changelog/85325.yaml new file mode 100644 index 000000000000..bd9d07566ba2 --- /dev/null +++ b/docs/changelog/85325.yaml @@ -0,0 +1,5 @@ +pr: 85325 +summary: Reuse `JoinTaskExecutor` +area: Cluster Coordination +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index 958638e6f4d9..2e37b0726cfb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -70,7 +70,7 @@ public class JoinHelper { private final AllocationService allocationService; private final MasterService masterService; private final TransportService transportService; - private volatile JoinTaskExecutor joinTaskExecutor; + private final JoinTaskExecutor joinTaskExecutor; private final LongSupplier currentTermSupplier; private final RerouteService rerouteService; private final NodeHealthService nodeHealthService; @@ -97,6 +97,7 @@ public class JoinHelper { this.allocationService = allocationService; this.masterService = masterService; this.transportService = transportService; + this.joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); this.currentTermSupplier = currentTermSupplier; this.rerouteService = rerouteService; this.nodeHealthService = nodeHealthService; @@ -343,8 +344,12 @@ public class JoinHelper { class LeaderJoinAccumulator implements JoinAccumulator { @Override public void handleJoinRequest(DiscoveryNode sender, ActionListener joinListener) { - final JoinTask task = JoinTask.singleNode(sender, joinReasonService.getJoinReason(sender, Mode.LEADER), joinListener); - assert joinTaskExecutor != null; + final JoinTask task = JoinTask.singleNode( + sender, + joinReasonService.getJoinReason(sender, Mode.LEADER), + joinListener, + currentTermSupplier.getAsLong() + ); masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor); } @@ -406,18 +411,16 @@ public class JoinHelper { joinReasonService.getJoinReason(discoveryNode, Mode.CANDIDATE), listener ); - })); - - joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, currentTermSupplier.getAsLong()); + }), currentTermSupplier.getAsLong()); masterService.submitStateUpdateTask( "elected-as-master ([" + joinTask.nodeCount() + "] nodes joined)", joinTask, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor + ); } else { assert newMode == Mode.FOLLOWER : newMode; - joinTaskExecutor = null; joinRequestAccumulator.values() .forEach(joinCallback -> joinCallback.onFailure(new CoordinationStateRejectedException("became follower"))); } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java index 142823d87844..61cc9196c2a9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java @@ -13,24 +13,18 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.node.DiscoveryNode; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.stream.Stream; -public record JoinTask(List nodeJoinTasks, boolean isBecomingMaster) implements ClusterStateTaskListener { +public record JoinTask(List nodeJoinTasks, boolean isBecomingMaster, long term) implements ClusterStateTaskListener { - public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener listener) { - return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false); + public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener listener, long term) { + return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false, term); } - public static JoinTask completingElection(Stream nodeJoinTaskStream) { - return new JoinTask(nodeJoinTaskStream.toList(), true); - } - - public JoinTask(List nodeJoinTasks, boolean isBecomingMaster) { - this.nodeJoinTasks = Collections.unmodifiableList(nodeJoinTasks); - this.isBecomingMaster = isBecomingMaster; + public static JoinTask completingElection(Stream nodeJoinTaskStream, long term) { + return new JoinTask(nodeJoinTaskStream.toList(), true, term); } public int nodeCount() { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index 51978f69194e..ef8e47b9cab6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -41,12 +41,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { private final AllocationService allocationService; private final RerouteService rerouteService; - private final long term; - public JoinTaskExecutor(AllocationService allocationService, RerouteService rerouteService, long term) { + public JoinTaskExecutor(AllocationService allocationService, RerouteService rerouteService) { this.allocationService = allocationService; this.rerouteService = rerouteService; - this.term = term; } @Override @@ -54,6 +52,19 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { // The current state that MasterService uses might have been updated by a (different) master in a higher term already. If so, stop // processing the current cluster state update, there's no point in continuing to compute it as it will later be rejected by // Coordinator#publish anyhow. + assert joinTaskContexts.isEmpty() == false : "Expected to have non empty join tasks list"; + + var term = joinTaskContexts.stream().mapToLong(t -> t.getTask().term()).max().getAsLong(); + + var split = joinTaskContexts.stream().collect(Collectors.partitioningBy(t -> t.getTask().term() == term)); + for (TaskContext outdated : split.get(false)) { + outdated.onFailure( + new NotMasterException("Higher term encountered (encountered: " + term + " > used: " + outdated.getTask().term() + ")") + ); + } + + joinTaskContexts = split.get(true); + if (currentState.term() > term) { logger.trace("encountered higher term {} than current {}, there is a newer master", currentState.term(), term); throw new NotMasterException( @@ -71,7 +82,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { // use these joins to try and become the master. // Note that we don't have to do any validation of the amount of joining nodes - the commit // during the cluster state publishing guarantees that we have enough - newState = becomeMasterAndTrimConflictingNodes(currentState, joinTaskContexts); + newState = becomeMasterAndTrimConflictingNodes(currentState, joinTaskContexts, term); nodesChanged = true; } else if (currentNodes.isLocalNodeElectedMaster()) { assert currentState.term() == term : "term should be stable for the same master"; @@ -186,7 +197,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor { protected ClusterState.Builder becomeMasterAndTrimConflictingNodes( ClusterState currentState, - List> taskContexts + List> taskContexts, + long term ) { assert currentState.nodes().getMasterNodeId() == null : currentState; assert currentState.term() < term : term + " vs " + currentState; diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java index eb46c6dbd7d7..2b5ceb77262a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java @@ -39,6 +39,7 @@ import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThan; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -160,11 +161,11 @@ public class JoinTaskExecutorTests extends ESTestCase { // in the cluster but with a different set of roles: the node didn't change roles, but the cluster state came via an older master. // In this case we must properly process its join to ensure that the roles are correct. - final AllocationService allocationService = mock(AllocationService.class); + final AllocationService allocationService = createAllocationService(); when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); - final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, 0L); + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); @@ -187,25 +188,18 @@ public class JoinTaskExecutorTests extends ESTestCase { final var resultingState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState, joinTaskExecutor, - List.of( - JoinTask.singleNode( - actualNode, - "test", - ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) - ) - ) + List.of(JoinTask.singleNode(actualNode, "test", NOT_COMPLETED_LISTENER, 0L)) ); assertThat(resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles())); } public void testRejectsStatesWithStaleTerm() { - final var allocationService = mock(AllocationService.class); - when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final var allocationService = createAllocationService(); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final long executorTerm = randomLongBetween(0L, Long.MAX_VALUE - 1); - final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var clusterState = ClusterState.builder(ClusterName.DEFAULT) @@ -224,9 +218,12 @@ public class JoinTaskExecutorTests extends ESTestCase { clusterState, joinTaskExecutor, randomBoolean() - ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)) + ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm)) : List.of( - JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER))) + JoinTask.completingElection( + Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)), + executorTerm + ) ), t -> fail("should not succeed"), (t, e) -> assertThat(e, instanceOf(NotMasterException.class)) @@ -237,12 +234,11 @@ public class JoinTaskExecutorTests extends ESTestCase { } public void testRejectsStatesWithOtherMaster() { - final var allocationService = mock(AllocationService.class); - when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final var allocationService = createAllocationService(); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final long executorTerm = randomNonNegativeLong(); - final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var localNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); @@ -269,9 +265,12 @@ public class JoinTaskExecutorTests extends ESTestCase { clusterState, joinTaskExecutor, randomBoolean() - ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)) + ? List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm)) : List.of( - JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER))) + JoinTask.completingElection( + Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)), + executorTerm + ) ), t -> fail("should not succeed"), (t, e) -> assertThat(e, instanceOf(NotMasterException.class)) @@ -282,12 +281,11 @@ public class JoinTaskExecutorTests extends ESTestCase { } public void testRejectsStatesWithNoMasterIfNotBecomingMaster() { - final var allocationService = mock(AllocationService.class); - when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final var allocationService = createAllocationService(); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final long executorTerm = randomNonNegativeLong(); - final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final var masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); final var clusterState = ClusterState.builder(ClusterName.DEFAULT) @@ -305,7 +303,7 @@ public class JoinTaskExecutorTests extends ESTestCase { () -> ClusterStateTaskExecutorUtils.executeHandlingResults( clusterState, joinTaskExecutor, - List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER)), + List.of(JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm)), t -> fail("should not succeed"), (t, e) -> assertThat(e, instanceOf(NotMasterException.class)) ) @@ -315,15 +313,11 @@ public class JoinTaskExecutorTests extends ESTestCase { } public void testRemovesOlderNodeInstancesWhenBecomingMaster() throws Exception { - final var allocationService = mock(AllocationService.class); - when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); - when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then( - invocationOnMock -> invocationOnMock.getArguments()[0] - ); + final var allocationService = createAllocationService(); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final long executorTerm = randomLongBetween(1, Long.MAX_VALUE); - final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var otherNodeOld = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); @@ -355,7 +349,8 @@ public class JoinTaskExecutorTests extends ESTestCase { Stream.of( new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER), new JoinTask.NodeJoinTask(otherNodeNew, "test", NOT_COMPLETED_LISTENER) - ) + ), + executorTerm ) ) ); @@ -374,8 +369,8 @@ public class JoinTaskExecutorTests extends ESTestCase { afterElectionClusterState, joinTaskExecutor, List.of( - JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER), - JoinTask.singleNode(otherNodeOld, "test", NOT_COMPLETED_LISTENER) + JoinTask.singleNode(masterNode, "test", NOT_COMPLETED_LISTENER, executorTerm), + JoinTask.singleNode(otherNodeOld, "test", NOT_COMPLETED_LISTENER, executorTerm) ) ).nodes().get(otherNodeNew.getId()).getEphemeralId(), equalTo(otherNodeNew.getEphemeralId()) @@ -383,15 +378,11 @@ public class JoinTaskExecutorTests extends ESTestCase { } public void testUpdatesVotingConfigExclusionsIfNeeded() throws Exception { - final var allocationService = mock(AllocationService.class); - when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); - when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then( - invocationOnMock -> invocationOnMock.getArguments()[0] - ); + final var allocationService = createAllocationService(); final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); final long executorTerm = randomLongBetween(1, Long.MAX_VALUE); - final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService, executorTerm); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); final var otherNode = new DiscoveryNode( @@ -435,7 +426,8 @@ public class JoinTaskExecutorTests extends ESTestCase { Stream.of( new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER), new JoinTask.NodeJoinTask(otherNode, "test", NOT_COMPLETED_LISTENER) - ) + ), + executorTerm ) ) ); @@ -443,12 +435,17 @@ public class JoinTaskExecutorTests extends ESTestCase { clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState, joinTaskExecutor, - List.of(JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)))) + List.of( + JoinTask.completingElection( + Stream.of(new JoinTask.NodeJoinTask(masterNode, "test", NOT_COMPLETED_LISTENER)), + executorTerm + ) + ) ); clusterState = ClusterStateTaskExecutorUtils.executeAndAssertSuccessful( clusterState, joinTaskExecutor, - List.of(JoinTask.singleNode(otherNode, "test", NOT_COMPLETED_LISTENER)) + List.of(JoinTask.singleNode(otherNode, "test", NOT_COMPLETED_LISTENER, executorTerm)) ); } @@ -460,4 +457,49 @@ public class JoinTaskExecutorTests extends ESTestCase { ); } + public void testIgnoresOlderTerms() throws Exception { + final var allocationService = createAllocationService(); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + + final long currentTerm = randomLongBetween(100, 1000); + final var joinTaskExecutor = new JoinTaskExecutor(allocationService, rerouteService); + + final var masterNode = new DiscoveryNode(UUIDs.randomBase64UUID(random()), buildNewFakeTransportAddress(), Version.CURRENT); + final var clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes(DiscoveryNodes.builder().add(masterNode).localNodeId(masterNode.getId()).masterNodeId(masterNode.getId()).build()) + .metadata(Metadata.builder().coordinationMetadata(CoordinationMetadata.builder().term(currentTerm).build()).build()) + .build(); + + var tasks = Stream.concat( + Stream.generate(() -> createRandomTask(masterNode, "outdated", randomLongBetween(0, currentTerm - 1))) + .limit(randomLongBetween(1, 10)), + Stream.of(createRandomTask(masterNode, "current", currentTerm)) + ).toList(); + + ClusterStateTaskExecutorUtils.executeHandlingResults( + clusterState, + joinTaskExecutor, + tasks, + t -> assertThat(t.term(), equalTo(currentTerm)), + (t, e) -> { + assertThat(t.term(), lessThan(currentTerm)); + assertThat(e, instanceOf(NotMasterException.class)); + } + ); + } + + private static JoinTask createRandomTask(DiscoveryNode node, String reason, long term) { + return randomBoolean() + ? JoinTask.singleNode(node, reason, NOT_COMPLETED_LISTENER, term) + : JoinTask.completingElection(Stream.of(new JoinTask.NodeJoinTask(node, reason, NOT_COMPLETED_LISTENER)), term); + } + + private static AllocationService createAllocationService() { + final var allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + when(allocationService.disassociateDeadNodes(any(), anyBoolean(), any())).then( + invocationOnMock -> invocationOnMock.getArguments()[0] + ); + return allocationService; + } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index ea4afd586cf0..7b3e0cae1638 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -349,13 +349,14 @@ public class ClusterStateChanges { public ClusterState addNode(ClusterState clusterState, DiscoveryNode discoveryNode) { return runTasks( - new JoinTaskExecutor(allocationService, (s, p, r) -> {}, clusterState.term()), + new JoinTaskExecutor(allocationService, (s, p, r) -> {}), clusterState, List.of( JoinTask.singleNode( discoveryNode, "dummy reason", - ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) + ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }), + clusterState.term() ) ) ); @@ -363,7 +364,7 @@ public class ClusterStateChanges { public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List nodes) { return runTasks( - new JoinTaskExecutor(allocationService, (s, p, r) -> {}, clusterState.term() + between(1, 10)), + new JoinTaskExecutor(allocationService, (s, p, r) -> {}), clusterState, List.of( JoinTask.completingElection( @@ -374,7 +375,8 @@ public class ClusterStateChanges { "dummy reason", ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); }) ) - ) + ), + clusterState.term() + between(1, 10) ) ) );