Dispatch ingest work to coordination thread pool (#129820)

The vast majority of ingest pipelines are light CPU
operations. We don't want these to be put behind IO work on the write
executor. Instead, execute these on the coordination pool.
This commit is contained in:
Tim Brooks 2025-06-23 09:31:36 -06:00 committed by GitHub
parent bae6e3c66d
commit 53dae7a3a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 84 additions and 50 deletions

View file

@ -410,7 +410,7 @@ public class IngestRestartIT extends ESIntegTestCase {
private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) {
assertThat(blockingLatch.getCount(), greaterThan(0L));
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
expectThrows(EsRejectedExecutionException.class, () -> {
// noinspection InfiniteLoopStatement

View file

@ -69,8 +69,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
private final IngestActionForwarder ingestForwarder;
protected final LongSupplier relativeTimeNanosProvider;
protected final Executor coordinationExecutor;
protected final Executor writeExecutor;
protected final Executor systemWriteExecutor;
protected final Executor systemCoordinationExecutor;
private final ActionType<BulkResponse> bulkAction;
public TransportAbstractBulkAction(
@ -94,8 +93,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
this.systemIndices = systemIndices;
this.projectResolver = projectResolver;
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION);
this.ingestForwarder = new IngestActionForwarder(transportService);
clusterService.addStateApplier(this.ingestForwarder);
this.relativeTimeNanosProvider = relativeTimeNanosProvider;
@ -134,14 +132,14 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
}
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
// Use coordinationExecutor for dispatching coordination tasks
ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor;
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener);
}
private void ensureClusterStateThenForkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
final ClusterState initialState = clusterService.state();
@ -163,7 +161,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
forkAndExecute(task, bulkRequest, executor, releasingListener);
}
@Override
@ -177,32 +175,21 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
}
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
} else {
forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
forkAndExecute(task, bulkRequest, executor, releasingListener);
}
}
private void forkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) {
executor.execute(new ActionRunnable<>(releasingListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener);
}
});
}
private boolean applyPipelines(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener)
throws IOException {
boolean hasIndexRequestsWithPipelines = false;
ClusterState state = clusterService.state();
ProjectId projectId = projectResolver.getProjectId();
@ -291,7 +278,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
assert arePipelinesResolved : bulkRequest;
}
if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l);
} else {
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
}
@ -305,7 +292,6 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
Task task,
BulkRequest original,
Executor executor,
boolean isOnlySystem,
ProjectMetadata metadata,
ActionListener<BulkResponse> listener
) {
@ -339,7 +325,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override
protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener);
}
@Override
@ -362,8 +348,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
}
}
},
// Use the appropriate write executor for actual ingest processing
isOnlySystem ? systemWriteExecutor : writeExecutor
executor
);
}
@ -419,11 +404,10 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
}
}

View file

@ -198,6 +198,17 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
true
)
);
result.put(
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
halfProcMaxAt5,
1000,
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA),
true
)
);
result.put(
ThreadPool.Names.SYSTEM_CRITICAL_READ,
new FixedExecutorBuilder(

View file

@ -42,6 +42,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
protected static int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals("bulk")
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

View file

@ -142,6 +142,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
public static final String FETCH_SHARD_STORE = "fetch_shard_store";
public static final String SYSTEM_READ = "system_read";
public static final String SYSTEM_WRITE = "system_write";
public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination";
public static final String SYSTEM_CRITICAL_READ = "system_critical_read";
public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write";
}
@ -187,8 +188,8 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.SEARCH, ThreadPoolType.FIXED),
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),
entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED),
@ -204,6 +205,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING),
entry(Names.SYSTEM_READ, ThreadPoolType.FIXED),
entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED),
entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED),
entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED)
);

View file

@ -108,8 +108,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private FeatureService mockFeatureService;
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination");
private final ProjectId projectId = randomProjectIdOrDefault();
@ -295,8 +294,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
// initialize captors, which must be members to use @Capture because of generics
threadPool = mock(ThreadPool.class);
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor);
MockitoAnnotations.openMocks(this);
// setup services that will be called by action
transportService = mock(TransportService.class);
@ -424,7 +422,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
redirectHandler.capture(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
@ -475,7 +473,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
@ -524,7 +522,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(systemWriteExecutor)
same(systemWriteCoordinationExecutor)
);
completionHandler.getValue().accept(null, exception);
assertTrue(failureCalled.get());
@ -685,7 +683,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
assertEquals(indexRequest1.getPipeline(), "default_pipeline");
assertEquals(indexRequest2.getPipeline(), "default_pipeline");
@ -736,7 +734,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
completionHandler.getValue().accept(null, exception);
assertFalse(action.indexCreated); // still no index yet, the ingest node failed.
@ -830,7 +828,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
}
@ -871,7 +869,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
}
@ -901,7 +899,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
indexRequest1.autoGenerateId();
completionHandler.getValue().accept(Thread.currentThread(), null);
@ -941,7 +939,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
any(),
failureHandler.capture(),
completionHandler.capture(),
same(writeExecutor)
same(writeCoordinationExecutor)
);
assertEquals(indexRequest.getPipeline(), "default_pipeline");
completionHandler.getValue().accept(null, exception);

View file

@ -61,7 +61,6 @@ import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.EmptySystemIndices;
import org.elasticsearch.indices.SystemIndexDescriptorUtils;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.test.ESTestCase;
@ -126,7 +125,15 @@ public class TransportBulkActionTests extends ESTestCase {
new ActionFilters(Collections.emptySet()),
new Resolver(),
new IndexingPressure(Settings.EMPTY),
EmptySystemIndices.INSTANCE,
new SystemIndices(
List.of(
new SystemIndices.Feature(
"plugin",
"test feature",
List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", ""))
)
)
),
new ProjectResolver() {
@Override
public <E extends Exception> void executeOnProject(ProjectId projectId, CheckedRunnable<E> body) throws E {
@ -386,7 +393,7 @@ public class TransportBulkActionTests extends ESTestCase {
});
}
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
public void testDispatchesToWriteCoordinationThreadPool() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ThreadPoolStats.Stats stats = threadPool.stats()
@ -401,8 +408,7 @@ public class TransportBulkActionTests extends ESTestCase {
assertBusy(() -> {
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
// index
// is created.
// index is created.
assertThat(
threadPool.stats()
.stats()
@ -416,6 +422,37 @@ public class TransportBulkActionTests extends ESTestCase {
});
}
public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(
new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap())
);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ThreadPoolStats.Stats stats = threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
.findAny()
.get();
assertThat(stats.completed(), equalTo(0L));
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
future.actionGet();
assertBusy(() -> {
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
// index is created.
assertThat(
threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))
.findAny()
.get()
.completed(),
equalTo(2L)
);
});
}
public void testRejectCoordination() {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));

View file

@ -77,6 +77,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
ThreadPool.Names.WRITE,
ThreadPool.Names.WRITE_COORDINATION,
ThreadPool.Names.SYSTEM_WRITE,
ThreadPool.Names.SYSTEM_WRITE_COORDINATION,
ThreadPool.Names.SEARCH,
ThreadPool.Names.MANAGEMENT
);