Reindex data stream indices on different nodes (#125171)

This commit is contained in:
Keith Massey 2025-03-20 07:50:10 -05:00 committed by GitHub
parent 5182e733d7
commit 24132d3834
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 198 additions and 3 deletions

View file

@ -0,0 +1,5 @@
pr: 125171
summary: Reindex data stream indices on different nodes
area: Data streams
type: enhancement
issues: []

View file

@ -12,6 +12,7 @@ import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
@ -35,9 +36,12 @@ import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.transport.NoNodeAvailableException;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Assertions; import org.elasticsearch.core.Assertions;
@ -52,6 +56,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate; import org.elasticsearch.xpack.core.deprecation.DeprecatedIndexPredicate;
import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry; import org.elasticsearch.xpack.migrate.MigrateTemplateRegistry;
@ -60,6 +65,7 @@ import java.util.Arrays;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.METADATA;
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY; import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.READ_ONLY;
@ -101,6 +107,14 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
private final ClusterService clusterService; private final ClusterService clusterService;
private final Client client; private final Client client;
private final TransportService transportService;
/*
* The following is incremented in order to keep track of the current round-robin position for ingest nodes that we send sliced requests
* to. We bound its random starting value to less than or equal to 2 ^ 30 (the default is Integer.MAX_VALUE or 2 ^ 31 - 1) only so that
* the unit test doesn't fail if it rolls over Integer.MAX_VALUE (since the node selected is the same for Integer.MAX_VALUE and
* Integer.MAX_VALUE + 1).
*/
private final AtomicInteger ingestNodeOffsetGenerator = new AtomicInteger(Randomness.get().nextInt(2 ^ 30));
@Inject @Inject
public ReindexDataStreamIndexTransportAction( public ReindexDataStreamIndexTransportAction(
@ -119,6 +133,7 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
); );
this.clusterService = clusterService; this.clusterService = clusterService;
this.client = client; this.client = client;
this.transportService = transportService;
} }
@Override @Override
@ -305,7 +320,28 @@ public class ReindexDataStreamIndexTransportAction extends HandledTransportActio
listener.onResponse(bulkByScrollResponse); listener.onResponse(bulkByScrollResponse);
} }
}, listener::onFailure); }, listener::onFailure);
client.execute(ReindexAction.INSTANCE, reindexRequest, checkForFailuresListener); /*
* Reindex will potentially run a pipeline for each document. If we run all reindex requests on the same node (locally), that
* becomes a bottleneck. This code round-robins reindex requests to all ingest nodes to spread out the pipeline workload. When a
* data stream has many indices, this can improve performance a good bit.
*/
final DiscoveryNode[] ingestNodes = clusterService.state().getNodes().getIngestNodes().values().toArray(DiscoveryNode[]::new);
if (ingestNodes.length == 0) {
listener.onFailure(new NoNodeAvailableException("No ingest nodes in cluster"));
} else {
DiscoveryNode ingestNode = ingestNodes[Math.floorMod(ingestNodeOffsetGenerator.incrementAndGet(), ingestNodes.length)];
logger.debug("Sending reindex request to {}", ingestNode.getName());
transportService.sendRequest(
ingestNode,
ReindexAction.NAME,
reindexRequest,
new ActionListenerResponseHandler<>(
checkForFailuresListener,
BulkByScrollResponse::new,
TransportResponseHandler.TRANSPORT_WORKER
)
);
}
} }
private void updateSettings( private void updateSettings(

View file

@ -10,6 +10,10 @@ package org.elasticsearch.xpack.migrate.action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -29,10 +33,16 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
import java.util.Collections; import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class ReindexDataStreamIndexTransportActionTests extends ESTestCase { public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
@ -112,7 +122,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
) )
); );
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
action.reindex(sourceIndex, destIndex, listener, taskId); action.reindex(sourceIndex, destIndex, listener, taskId);
@ -137,7 +150,10 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING) Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
) )
); );
doNothing().when(client).execute(eq(ReindexAction.INSTANCE), request.capture(), any()); ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
doNothing().when(transportService).sendRequest(any(), eq(ReindexAction.NAME), request.capture(), any());
action.reindex(sourceIndex, destIndex, listener, taskId); action.reindex(sourceIndex, destIndex, listener, taskId);
@ -204,4 +220,142 @@ public class ReindexDataStreamIndexTransportActionTests extends ESTestCase {
e.getMessage() e.getMessage()
); );
} }
public void testRoundRobin() {
/*
* This tests that the action will round-robin through the list of ingest nodes in the cluster.
*/
String sourceIndex = randomAlphanumericOfLength(10);
String destIndex = randomAlphanumericOfLength(10);
AtomicBoolean failed = new AtomicBoolean(false);
ActionListener<BulkByScrollResponse> listener = new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) {}
@Override
public void onFailure(Exception e) {
failed.set(true);
}
};
TaskId taskId = TaskId.EMPTY_TASK_ID;
when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(
Settings.EMPTY,
Collections.singleton(ReindexDataStreamIndexTransportAction.REINDEX_MAX_REQUESTS_PER_SECOND_SETTING)
)
);
ClusterState clusterState = mock(ClusterState.class);
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodes());
when(clusterService.state()).thenReturn(clusterState);
ArgumentCaptor<DiscoveryNode> nodeCaptor = ArgumentCaptor.captor();
doNothing().when(transportService).sendRequest(nodeCaptor.capture(), eq(ReindexAction.NAME), request.capture(), any());
action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node1 = nodeCaptor.getValue();
assertNotNull(node1);
action.reindex(sourceIndex, destIndex, listener, taskId);
DiscoveryNode node2 = nodeCaptor.getValue();
assertNotNull(node2);
int ingestNodeCount = clusterState.getNodes().getIngestNodes().size();
if (ingestNodeCount > 1) {
assertThat(node1.getName(), not(equalTo(node2.getName())));
}
// check that if we keep going we eventually get back to the original node:
DiscoveryNode node = node2;
for (int i = 0; i < ingestNodeCount - 1; i++) {
action.reindex(sourceIndex, destIndex, listener, taskId);
node = nodeCaptor.getValue();
}
assertNotNull(node);
assertThat(node1.getName(), equalTo(node.getName()));
assertThat(failed.get(), equalTo(false));
// make sure the listener gets notified of failure if there are no ingest nodes:
when(clusterState.getNodes()).thenReturn(getTestDiscoveryNodesNoIngest());
action.reindex(sourceIndex, destIndex, listener, taskId);
assertThat(failed.get(), equalTo(true));
}
private DiscoveryNodes getTestDiscoveryNodes() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
boolean nodeHasIngestRole = false;
int nodeCount = randomIntBetween(1, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
5,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.INGEST_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
nodeHasIngestRole = nodeHasIngestRole || discoveryNode.getRoles().contains(DiscoveryNodeRole.INGEST_ROLE);
builder.add(discoveryNode);
}
if (nodeHasIngestRole == false) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + nodeCount,
"test-id-" + nodeCount,
"test-ephemeral-id-" + nodeCount,
"test-hostname-" + nodeCount,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
Set.of(DiscoveryNodeRole.INGEST_ROLE),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}
private DiscoveryNodes getTestDiscoveryNodesNoIngest() {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
int nodeCount = randomIntBetween(0, 10);
for (int i = 0; i < nodeCount; i++) {
final DiscoveryNode discoveryNode = new DiscoveryNode(
"test-name-" + i,
"test-id-" + i,
"test-ephemeral-id-" + i,
"test-hostname-" + i,
"test-hostaddr",
buildNewFakeTransportAddress(),
Map.of(),
randomSet(
1,
4,
() -> randomFrom(
DiscoveryNodeRole.DATA_ROLE,
DiscoveryNodeRole.SEARCH_ROLE,
DiscoveryNodeRole.MASTER_ROLE,
DiscoveryNodeRole.MASTER_ROLE
)
),
null,
null
);
builder.add(discoveryNode);
}
return builder.build();
}
} }