From 24132d3834cbadee6597a31494493b06532dc21f Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Thu, 20 Mar 2025 07:50:10 -0500 Subject: [PATCH] Reindex data stream indices on different nodes (#125171) --- docs/changelog/125171.yaml | 5 + ...ReindexDataStreamIndexTransportAction.java | 38 ++++- ...exDataStreamIndexTransportActionTests.java | 158 +++++++++++++++++- 3 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 docs/changelog/125171.yaml diff --git a/docs/changelog/125171.yaml b/docs/changelog/125171.yaml new file mode 100644 index 000000000000..717e23d8d67b --- /dev/null +++ b/docs/changelog/125171.yaml @@ -0,0 +1,5 @@ +pr: 125171 +summary: Reindex data stream indices on different nodes +area: Data streams +type: enhancement +issues: [] diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java index 6a98b2b43645..189bdc101279 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportAction.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; @@ -35,9 +36,12 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.transport.NoNodeAvailableException; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Assertions; @@ -52,6 +56,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; @@ -60,6 +65,7 @@ import java.util.Arrays; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; @@ -101,6 +107,14 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio private final ClusterService clusterService; private final Client client; + private final TransportService transportService; + /* + * The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests + * to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that + * the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and + * Integer.MAX_VALUE + 1). + */ + private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30)); @Inject public ReindexDataStreamIndexTransportAction( @@ -119,6 +133,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio ); this.clusterService = clusterService; this.client = client; + this.transportService = transportService; } @Override @@ -305,7 +320,28 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio listener.onResponse(bulkByScrollResponse); } }, listener::onFailure); - client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener); + /* + * Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that + * becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a + * data stream has many indices, this can improve performance a good bit. + */ + final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new); + if (ingestNodes.length == 0) { + listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster")); + } else { + DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)]; + logger.debug("Sending reindex request to {}", ingestNode.getName()); + transportService.sendRequest( + ingestNode, + ReindexAction.NAME, + reindexRequest, + new ActionListenerResponseHandler<>( + checkForFailuresListener, + BulkByScrollResponse::new, + TransportResponseHandler.TRANSPORT_WORKER + ) + ); + } } private void updateSettings( diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java index 55e4da30cdf1..f73f5e5d725e 100644 --- a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamIndexTransportActionTests.java @@ -10,6 +10,10 @@ package org.elasticsearch.xpack.migrate.action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -29,10 +33,16 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { @@ -112,7 +122,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { ) ); - doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes()); + when(clusterService.state()).thenReturn(clusterState); + doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any()); action.reindex(sourceIndex, destIndex, listener, taskId); @@ -137,7 +150,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) ) ); - doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes()); + when(clusterService.state()).thenReturn(clusterState); + doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any()); action.reindex(sourceIndex, destIndex, listener, taskId); @@ -204,4 +220,142 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { e.getMessage() ); } + + public void testRoundRobin() { + /* + * This tests that the action will round-robin through the list of ingest nodes in the cluster. + */ + String sourceIndex = randomAlphanumericOfLength(10); + String destIndex = randomAlphanumericOfLength(10); + AtomicBoolean failed = new AtomicBoolean(false); + ActionListener listener = new ActionListener<>() { + @Override + public void onResponse(BulkByScrollResponse bulkByScrollResponse) {} + + @Override + public void onFailure(Exception e) { + failed.set(true); + } + }; + TaskId taskId = TaskId.EMPTY_TASK_ID; + + when(clusterService.getClusterSettings()).thenReturn( + new ClusterSettings( + Settings.EMPTY, + Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) + ) + ); + + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes()); + when(clusterService.state()).thenReturn(clusterState); + ArgumentCaptor nodeCaptor = ArgumentCaptor.captor(); + doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any()); + + action.reindex(sourceIndex, destIndex, listener, taskId); + DiscoveryNode node1 = nodeCaptor.getValue(); + assertNotNull(node1); + + action.reindex(sourceIndex, destIndex, listener, taskId); + DiscoveryNode node2 = nodeCaptor.getValue(); + assertNotNull(node2); + + int ingestNodeCount = clusterState.getNodes().getIngestNodes().size(); + if (ingestNodeCount > 1) { + assertThat(node1.getName(), not(equalTo(node2.getName()))); + } + + // check that if we keep going we eventually get back to the original node: + DiscoveryNode node = node2; + for (int i = 0; i < ingestNodeCount - 1; i++) { + action.reindex(sourceIndex, destIndex, listener, taskId); + node = nodeCaptor.getValue(); + } + assertNotNull(node); + assertThat(node1.getName(), equalTo(node.getName())); + assertThat(failed.get(), equalTo(false)); + + // make sure the listener gets notified of failure if there are no ingest nodes: + when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest()); + action.reindex(sourceIndex, destIndex, listener, taskId); + assertThat(failed.get(), equalTo(true)); + } + + private DiscoveryNodes getTestDiscoveryNodes() { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + boolean nodeHasIngestRole = false; + int nodeCount = randomIntBetween(1, 10); + for (int i = 0; i < nodeCount; i++) { + final DiscoveryNode discoveryNode = new DiscoveryNode( + "test-name-" + i, + "test-id-" + i, + "test-ephemeral-id-" + i, + "test-hostname-" + i, + "test-hostaddr", + buildNewFakeTransportAddress(), + Map.of(), + randomSet( + 1, + 5, + () -> randomFrom( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.INGEST_ROLE, + DiscoveryNodeRole.SEARCH_ROLE, + DiscoveryNodeRole.MASTER_ROLE, + DiscoveryNodeRole.MASTER_ROLE + ) + ), + null, + null + ); + nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE); + builder.add(discoveryNode); + } + if (nodeHasIngestRole == false) { + final DiscoveryNode discoveryNode = new DiscoveryNode( + "test-name-" + nodeCount, + "test-id-" + nodeCount, + "test-ephemeral-id-" + nodeCount, + "test-hostname-" + nodeCount, + "test-hostaddr", + buildNewFakeTransportAddress(), + Map.of(), + Set.of(DiscoveryNodeRole.INGEST_ROLE), + null, + null + ); + builder.add(discoveryNode); + } + return builder.build(); + } + + private DiscoveryNodes getTestDiscoveryNodesNoIngest() { + DiscoveryNodes.Builder builder = DiscoveryNodes.builder(); + int nodeCount = randomIntBetween(0, 10); + for (int i = 0; i < nodeCount; i++) { + final DiscoveryNode discoveryNode = new DiscoveryNode( + "test-name-" + i, + "test-id-" + i, + "test-ephemeral-id-" + i, + "test-hostname-" + i, + "test-hostaddr", + buildNewFakeTransportAddress(), + Map.of(), + randomSet( + 1, + 4, + () -> randomFrom( + DiscoveryNodeRole.DATA_ROLE, + DiscoveryNodeRole.SEARCH_ROLE, + DiscoveryNodeRole.MASTER_ROLE, + DiscoveryNodeRole.MASTER_ROLE + ) + ), + null, + null + ); + builder.add(discoveryNode); + } + return builder.build(); + } }