From 53dae7a3a20cda1828a287514d921aed4346869d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Mon, 23 Jun 2025 09:31:36 -0600 Subject: [PATCH] Dispatch ingest work to coordination thread pool (#129820) The vast majority of ingest pipelines are light CPU operations. We don't want these to be put behind IO work on the write executor. Instead, execute these on the coordination pool. --- .../ingest/common/IngestRestartIT.java | 2 +- .../bulk/TransportAbstractBulkAction.java | 44 ++++++----------- .../DefaultBuiltInExecutorBuilders.java | 11 +++++ .../threadpool/ExecutorBuilder.java | 1 + .../elasticsearch/threadpool/ThreadPool.java | 4 +- .../bulk/TransportBulkActionIngestTests.java | 24 +++++----- .../action/bulk/TransportBulkActionTests.java | 47 +++++++++++++++++-- .../action/EnrichCoordinatorProxyAction.java | 1 + 8 files changed, 84 insertions(+), 50 deletions(-) diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 4ccaa55d69c3..3098865d00a7 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -410,7 +410,7 @@ public class IngestRestartIT extends ESIntegTestCase { private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) { assertThat(blockingLatch.getCount(), greaterThan(0L)); - final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); + final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full. expectThrows(EsRejectedExecutionException.class, () -> { // noinspection InfiniteLoopStatement diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 262384817f79..c7fd31e529c1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -69,8 +69,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction private final IngestActionForwarder ingestForwarder; protected final LongSupplier relativeTimeNanosProvider; protected final Executor coordinationExecutor; - protected final Executor writeExecutor; - protected final Executor systemWriteExecutor; + protected final Executor systemCoordinationExecutor; private final ActionType bulkAction; public TransportAbstractBulkAction( @@ -94,8 +93,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction this.systemIndices = systemIndices; this.projectResolver = projectResolver; this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); - this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); - this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); + this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); this.ingestForwarder = new IngestActionForwarder(transportService); clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; @@ -134,14 +132,14 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); // Use coordinationExecutor for dispatching coordination tasks - ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener); + final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor; + ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); } private void ensureClusterStateThenForkAndExecute( Task task, BulkRequest bulkRequest, Executor executor, - boolean isOnlySystem, ActionListener releasingListener ) { final ClusterState initialState = clusterService.state(); @@ -163,7 +161,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + forkAndExecute(task, bulkRequest, executor, releasingListener); } @Override @@ -177,32 +175,21 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE)); } else { - forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + forkAndExecute(task, bulkRequest, executor, releasingListener); } } - private void forkAndExecute( - Task task, - BulkRequest bulkRequest, - Executor executor, - boolean isOnlySystem, - ActionListener releasingListener - ) { + private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener releasingListener) { executor.execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); } }); } - private boolean applyPipelines( - Task task, - BulkRequest bulkRequest, - Executor executor, - boolean isOnlySystem, - ActionListener listener - ) throws IOException { + private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener listener) + throws IOException { boolean hasIndexRequestsWithPipelines = false; ClusterState state = clusterService.state(); ProjectId projectId = projectResolver.getProjectId(); @@ -291,7 +278,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l); + processBulkIndexIngestRequest(task, bulkRequest, executor, project, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -305,7 +292,6 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest original, Executor executor, - boolean isOnlySystem, ProjectMetadata metadata, ActionListener listener ) { @@ -339,7 +325,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction ActionRunnable runnable = new ActionRunnable<>(actionListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); } @Override @@ -362,8 +348,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } } }, - // Use the appropriate write executor for actual ingest processing - isOnlySystem ? systemWriteExecutor : writeExecutor + executor ); } @@ -419,11 +404,10 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest bulkRequest, Executor executor, - boolean isOnlySystem, ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) { + if (applyPipelines(task, bulkRequest, executor, listener) == false) { doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 336d978358b9..bac2df150045 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -198,6 +198,17 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { true ) ); + result.put( + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, + new FixedExecutorBuilder( + settings, + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, + halfProcMaxAt5, + 1000, + new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + true + ) + ); result.put( ThreadPool.Names.SYSTEM_CRITICAL_READ, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 6d438298acff..a0c8795a388d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -42,6 +42,7 @@ public abstract class ExecutorBuilder, Scheduler, public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; + public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination"; public static final String SYSTEM_CRITICAL_READ = "system_critical_read"; public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write"; } @@ -187,8 +188,8 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED), - entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED), + entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED), @@ -204,6 +205,7 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), entry(Names.SYSTEM_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED), + entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED) ); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index ef24a7944d50..e2c3591092d8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -108,8 +108,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { private FeatureService mockFeatureService; private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination"); - private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write"); - private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write"); + private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination"); private final ProjectId projectId = randomProjectIdOrDefault(); @@ -295,8 +294,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { // initialize captors, which must be members to use @Capture because of generics threadPool = mock(ThreadPool.class); when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor); - when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor); - when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor); + when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor); MockitoAnnotations.openMocks(this); // setup services that will be called by action transportService = mock(TransportService.class); @@ -424,7 +422,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { redirectHandler.capture(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -475,7 +473,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -524,7 +522,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(systemWriteExecutor) + same(systemWriteCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -685,7 +683,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); @@ -736,7 +734,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. @@ -830,7 +828,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); } @@ -871,7 +869,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); } @@ -901,7 +899,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); indexRequest1.autoGenerateId(); completionHandler.getValue().accept(Thread.currentThread(), null); @@ -941,7 +939,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(null, exception); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 70964aef7cf0..481fdf5ea353 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -61,7 +61,6 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; -import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; @@ -126,7 +125,15 @@ public class TransportBulkActionTests extends ESTestCase { new ActionFilters(Collections.emptySet()), new Resolver(), new IndexingPressure(Settings.EMPTY), - EmptySystemIndices.INSTANCE, + new SystemIndices( + List.of( + new SystemIndices.Feature( + "plugin", + "test feature", + List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", "")) + ) + ) + ), new ProjectResolver() { @Override public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { @@ -386,7 +393,7 @@ public class TransportBulkActionTests extends ESTestCase { }); } - public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception { + public void testDispatchesToWriteCoordinationThreadPool() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); PlainActionFuture future = new PlainActionFuture<>(); ThreadPoolStats.Stats stats = threadPool.stats() @@ -401,8 +408,7 @@ public class TransportBulkActionTests extends ESTestCase { assertBusy(() -> { // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the - // index - // is created. + // index is created. assertThat( threadPool.stats() .stats() @@ -416,6 +422,37 @@ public class TransportBulkActionTests extends ESTestCase { }); } + public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception { + BulkRequest bulkRequest = new BulkRequest().add( + new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap()) + ); + PlainActionFuture future = new PlainActionFuture<>(); + ThreadPoolStats.Stats stats = threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) + .findAny() + .get(); + assertThat(stats.completed(), equalTo(0L)); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + + assertBusy(() -> { + // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the + // index is created. + assertThat( + threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) + .findAny() + .get() + .completed(), + equalTo(2L) + ); + }); + } + public void testRejectCoordination() { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 26e692fbb7c7..17e118edb4bc 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -77,6 +77,7 @@ public class EnrichCoordinatorProxyAction extends ActionType { ThreadPool.Names.WRITE, ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.SYSTEM_WRITE, + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, ThreadPool.Names.SEARCH, ThreadPool.Names.MANAGEMENT );