Add thread pool for write coordination (#129450)

This change adds a thread pool for write coordination to ensure that
bulk coordination does not get stuck on an overloaded primary node.
This commit is contained in:
Tim Brooks 2025-06-17 23:25:05 -06:00 committed by GitHub
parent f48a3c3c46
commit 9ac6576aac
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 109 additions and 38 deletions

View file

@ -33,7 +33,10 @@ $$$search-throttled$$$`search_throttled`
: For analyze requests. Thread pool type is `fixed` with a size of `1`, queue size of `16`. : For analyze requests. Thread pool type is `fixed` with a size of `1`, queue size of `16`.
`write` `write`
: For single-document index/delete/update, ingest processors, and bulk requests. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors). : For write operations and ingest processors. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
`write_coordination`
: For bulk request coordination operations. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
`snapshot` `snapshot`
: For snapshot/restore operations. Thread pool type is `scaling` with a keep-alive of `5m`. On nodes with at least 750MB of heap the maximum size of this pool is `10` by default. On nodes with less than 750MB of heap the maximum size of this pool is `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)` by default. : For snapshot/restore operations. Thread pool type is `scaling` with a keep-alive of `5m`. On nodes with at least 750MB of heap the maximum size of this pool is `10` by default. On nodes with less than 750MB of heap the maximum size of this pool is `min(5, (`[`# of allocated processors`](#node.processors)`) / 2)` by default.

View file

@ -232,7 +232,7 @@ public class IncrementalBulkIT extends ESIntegTestCase {
add512BRequests(requestsThrottle, index); add512BRequests(requestsThrottle, index);
CountDownLatch finishLatch = new CountDownLatch(1); CountDownLatch finishLatch = new CountDownLatch(1);
blockWritePool(threadPool, finishLatch); blockWriteCoordinationPool(threadPool, finishLatch);
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
refCounted.incRef(); refCounted.incRef();
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
@ -295,8 +295,8 @@ public class IncrementalBulkIT extends ESIntegTestCase {
IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName); IncrementalBulkService incrementalBulkService = internalCluster().getInstance(IncrementalBulkService.class, randomNodeName);
ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName); ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, randomNodeName);
blockWritePool(threadPool, blockingLatch); blockWriteCoordinationPool(threadPool, blockingLatch);
fillWriteQueue(threadPool); fillWriteCoordinationQueue(threadPool);
IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest(); IncrementalBulkService.Handler handler = incrementalBulkService.newBulkRequest();
if (randomBoolean()) { if (randomBoolean()) {
@ -333,7 +333,7 @@ public class IncrementalBulkIT extends ESIntegTestCase {
AtomicBoolean nextRequested = new AtomicBoolean(true); AtomicBoolean nextRequested = new AtomicBoolean(true);
AtomicLong hits = new AtomicLong(0); AtomicLong hits = new AtomicLong(0);
try { try {
blockWritePool(threadPool, blockingLatch1); blockWriteCoordinationPool(threadPool, blockingLatch1);
while (nextRequested.get()) { while (nextRequested.get()) {
nextRequested.set(false); nextRequested.set(false);
refCounted.incRef(); refCounted.incRef();
@ -348,8 +348,8 @@ public class IncrementalBulkIT extends ESIntegTestCase {
CountDownLatch blockingLatch2 = new CountDownLatch(1); CountDownLatch blockingLatch2 = new CountDownLatch(1);
try { try {
blockWritePool(threadPool, blockingLatch2); blockWriteCoordinationPool(threadPool, blockingLatch2);
fillWriteQueue(threadPool); fillWriteCoordinationQueue(threadPool);
handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future); handler.lastItems(List.of(indexRequest(index)), refCounted::decRef, future);
} finally { } finally {
@ -531,8 +531,8 @@ public class IncrementalBulkIT extends ESIntegTestCase {
} }
} }
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
final var startBarrier = new CyclicBarrier(threadCount + 1); final var startBarrier = new CyclicBarrier(threadCount + 1);
final var blockingTask = new AbstractRunnable() { final var blockingTask = new AbstractRunnable() {
@Override @Override
@ -552,13 +552,13 @@ public class IncrementalBulkIT extends ESIntegTestCase {
} }
}; };
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
} }
safeAwait(startBarrier); safeAwait(startBarrier);
} }
private static void fillWriteQueue(ThreadPool threadPool) { private static void fillWriteCoordinationQueue(ThreadPool threadPool) {
final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE).getQueueSize().singles()); final var queueSize = Math.toIntExact(threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getQueueSize().singles());
final var queueFilled = new AtomicBoolean(false); final var queueFilled = new AtomicBoolean(false);
final var queueFillingTask = new AbstractRunnable() { final var queueFillingTask = new AbstractRunnable() {
@Override @Override
@ -577,7 +577,7 @@ public class IncrementalBulkIT extends ESIntegTestCase {
} }
}; };
for (int i = 0; i < queueSize; i++) { for (int i = 0; i < queueSize; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(queueFillingTask); threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(queueFillingTask);
} }
queueFilled.set(true); queueFilled.set(true);
} }

View file

@ -840,7 +840,7 @@ public class NodeIndexingMetricsIT extends ESIntegTestCase {
add512BRequests(requestsThrottle, index); add512BRequests(requestsThrottle, index);
CountDownLatch finishLatch = new CountDownLatch(1); CountDownLatch finishLatch = new CountDownLatch(1);
blockWritePool(threadPool, finishLatch); blockWriteCoordinationPool(threadPool, finishLatch);
IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest(); IncrementalBulkService.Handler handlerThrottled = incrementalBulkService.newBulkRequest();
refCounted.incRef(); refCounted.incRef();
handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true)); handlerThrottled.addItems(requestsThrottle, refCounted::decRef, () -> nextPage.set(true));
@ -919,8 +919,8 @@ public class NodeIndexingMetricsIT extends ESIntegTestCase {
assertThat(total, lessThan(1024L)); assertThat(total, lessThan(1024L));
} }
private static void blockWritePool(ThreadPool threadPool, CountDownLatch finishLatch) { private static void blockWriteCoordinationPool(ThreadPool threadPool, CountDownLatch finishLatch) {
final var threadCount = threadPool.info(ThreadPool.Names.WRITE).getMax(); final var threadCount = threadPool.info(ThreadPool.Names.WRITE_COORDINATION).getMax();
final var startBarrier = new CyclicBarrier(threadCount + 1); final var startBarrier = new CyclicBarrier(threadCount + 1);
final var blockingTask = new AbstractRunnable() { final var blockingTask = new AbstractRunnable() {
@Override @Override
@ -940,7 +940,7 @@ public class NodeIndexingMetricsIT extends ESIntegTestCase {
} }
}; };
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
threadPool.executor(ThreadPool.Names.WRITE).execute(blockingTask); threadPool.executor(ThreadPool.Names.WRITE_COORDINATION).execute(blockingTask);
} }
safeAwait(startBarrier); safeAwait(startBarrier);
} }

View file

@ -68,6 +68,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
private final IngestService ingestService; private final IngestService ingestService;
private final IngestActionForwarder ingestForwarder; private final IngestActionForwarder ingestForwarder;
protected final LongSupplier relativeTimeNanosProvider; protected final LongSupplier relativeTimeNanosProvider;
protected final Executor coordinationExecutor;
protected final Executor writeExecutor; protected final Executor writeExecutor;
protected final Executor systemWriteExecutor; protected final Executor systemWriteExecutor;
private final ActionType<BulkResponse> bulkAction; private final ActionType<BulkResponse> bulkAction;
@ -92,6 +93,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
this.indexingPressure = indexingPressure; this.indexingPressure = indexingPressure;
this.systemIndices = systemIndices; this.systemIndices = systemIndices;
this.projectResolver = projectResolver; this.projectResolver = projectResolver;
this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE);
this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE);
this.ingestForwarder = new IngestActionForwarder(transportService); this.ingestForwarder = new IngestActionForwarder(transportService);
@ -106,8 +108,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
* This is called on the Transport thread so we can check the indexing * This is called on the Transport thread so we can check the indexing
* memory pressure *quickly* but we don't want to keep the transport * memory pressure *quickly* but we don't want to keep the transport
* thread busy. Then, as soon as we have the indexing pressure in we fork * thread busy. Then, as soon as we have the indexing pressure in we fork
* to one of the write thread pools. We do this because juggling the * to the coordinator thread pool for coordination tasks. We do this because
* bulk request can get expensive for a few reasons: * juggling the bulk request can get expensive for a few reasons:
* 1. Figuring out which shard should receive a bulk request might require * 1. Figuring out which shard should receive a bulk request might require
* parsing the _source. * parsing the _source.
* 2. When dispatching the sub-requests to shards we may have to compress * 2. When dispatching the sub-requests to shards we may have to compress
@ -131,14 +133,15 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem); releasable = indexingPressure.markCoordinatingOperationStarted(indexingOps, indexingBytes, isOnlySystem);
} }
final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close); final ActionListener<BulkResponse> releasingListener = ActionListener.runBefore(listener, releasable::close);
final Executor executor = isOnlySystem ? systemWriteExecutor : writeExecutor; // Use coordinationExecutor for dispatching coordination tasks
ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener);
} }
private void ensureClusterStateThenForkAndExecute( private void ensureClusterStateThenForkAndExecute(
Task task, Task task,
BulkRequest bulkRequest, BulkRequest bulkRequest,
Executor executor, Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener ActionListener<BulkResponse> releasingListener
) { ) {
final ClusterState initialState = clusterService.state(); final ClusterState initialState = clusterService.state();
@ -160,7 +163,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() {
@Override @Override
public void onNewClusterState(ClusterState state) { public void onNewClusterState(ClusterState state) {
forkAndExecute(task, bulkRequest, executor, releasingListener); forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
} }
@Override @Override
@ -174,21 +177,32 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
} }
}, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE)); }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE));
} else { } else {
forkAndExecute(task, bulkRequest, executor, releasingListener); forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
} }
} }
private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> releasingListener) { private void forkAndExecute(
Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> releasingListener
) {
executor.execute(new ActionRunnable<>(releasingListener) { executor.execute(new ActionRunnable<>(releasingListener) {
@Override @Override
protected void doRun() throws IOException { protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener);
} }
}); });
} }
private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener<BulkResponse> listener) private boolean applyPipelines(
throws IOException { Task task,
BulkRequest bulkRequest,
Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener
) throws IOException {
boolean hasIndexRequestsWithPipelines = false; boolean hasIndexRequestsWithPipelines = false;
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
ProjectId projectId = projectResolver.getProjectId(); ProjectId projectId = projectResolver.getProjectId();
@ -277,7 +291,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
assert arePipelinesResolved : bulkRequest; assert arePipelinesResolved : bulkRequest;
} }
if (clusterService.localNode().isIngestNode()) { if (clusterService.localNode().isIngestNode()) {
processBulkIndexIngestRequest(task, bulkRequest, executor, project, l); processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l);
} else { } else {
ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l);
} }
@ -291,6 +305,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
Task task, Task task,
BulkRequest original, BulkRequest original,
Executor executor, Executor executor,
boolean isOnlySystem,
ProjectMetadata metadata, ProjectMetadata metadata,
ActionListener<BulkResponse> listener ActionListener<BulkResponse> listener
) { ) {
@ -324,12 +339,12 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) { ActionRunnable<BulkResponse> runnable = new ActionRunnable<>(actionListener) {
@Override @Override
protected void doRun() throws IOException { protected void doRun() throws IOException {
applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener);
} }
@Override @Override
public boolean isForceExecution() { public boolean isForceExecution() {
// If we fork back to a write thread we **not** should fail, because tp queue is full. // If we fork back to a coordination thread we **not** should fail, because tp queue is full.
// (Otherwise the work done during ingest will be lost) // (Otherwise the work done during ingest will be lost)
// It is okay to force execution here. Throttling of write requests happens prior to // It is okay to force execution here. Throttling of write requests happens prior to
// ingest when a node receives a bulk request. // ingest when a node receives a bulk request.
@ -337,7 +352,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
} }
}; };
// If a processor went async and returned a response on a different thread then // If a processor went async and returned a response on a different thread then
// before we continue the bulk request we should fork back on a write thread: // before we continue the bulk request we should fork back on a coordination thread. Otherwise it is fine to perform
// coordination steps on the write thread
if (originalThread == Thread.currentThread()) { if (originalThread == Thread.currentThread()) {
runnable.run(); runnable.run();
} else { } else {
@ -346,7 +362,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
} }
} }
}, },
executor // Use the appropriate write executor for actual ingest processing
isOnlySystem ? systemWriteExecutor : writeExecutor
); );
} }
@ -402,10 +419,11 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction
Task task, Task task,
BulkRequest bulkRequest, BulkRequest bulkRequest,
Executor executor, Executor executor,
boolean isOnlySystem,
ActionListener<BulkResponse> listener ActionListener<BulkResponse> listener
) throws IOException { ) throws IOException {
final long relativeStartTimeNanos = relativeTimeNanos(); final long relativeStartTimeNanos = relativeTimeNanos();
if (applyPipelines(task, bulkRequest, executor, listener) == false) { if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) {
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
} }
} }

View file

@ -38,6 +38,16 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
ThreadPool.Names.GENERIC, ThreadPool.Names.GENERIC,
new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false) new ScalingExecutorBuilder(ThreadPool.Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), false)
); );
result.put(
ThreadPool.Names.WRITE_COORDINATION,
new FixedExecutorBuilder(
settings,
ThreadPool.Names.WRITE_COORDINATION,
allocatedProcessors,
10000,
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
);
result.put( result.put(
ThreadPool.Names.WRITE, ThreadPool.Names.WRITE,
new FixedExecutorBuilder( new FixedExecutorBuilder(

View file

@ -41,6 +41,7 @@ public abstract class ExecutorBuilder<U extends ExecutorBuilder.ExecutorSettings
protected static int applyHardSizeLimit(final Settings settings, final String name) { protected static int applyHardSizeLimit(final Settings settings, final String name) {
if (name.equals("bulk") if (name.equals("bulk")
|| name.equals(ThreadPool.Names.WRITE_COORDINATION)
|| name.equals(ThreadPool.Names.WRITE) || name.equals(ThreadPool.Names.WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_WRITE) || name.equals(ThreadPool.Names.SYSTEM_WRITE)
|| name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) { || name.equals(ThreadPool.Names.SYSTEM_CRITICAL_WRITE)) {

View file

@ -113,6 +113,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
public static final String GET = "get"; public static final String GET = "get";
public static final String ANALYZE = "analyze"; public static final String ANALYZE = "analyze";
public static final String WRITE = "write"; public static final String WRITE = "write";
public static final String WRITE_COORDINATION = "write_coordination";
public static final String SEARCH = "search"; public static final String SEARCH = "search";
public static final String SEARCH_COORDINATION = "search_coordination"; public static final String SEARCH_COORDINATION = "search_coordination";
public static final String AUTO_COMPLETE = "auto_complete"; public static final String AUTO_COMPLETE = "auto_complete";
@ -186,6 +187,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED), entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED),
entry(Names.GET, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED),
entry(Names.ANALYZE, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED),
entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED),
entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED),
entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED),
entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED),

View file

@ -107,6 +107,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE); private static final Thread DUMMY_WRITE_THREAD = new Thread(ThreadPool.Names.WRITE);
private FeatureService mockFeatureService; private FeatureService mockFeatureService;
private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination");
private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write"); private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write");
private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write"); private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write");
@ -293,6 +294,7 @@ public class TransportBulkActionIngestTests extends ESTestCase {
public void setupAction() { public void setupAction() {
// initialize captors, which must be members to use @Capture because of generics // initialize captors, which must be members to use @Capture because of generics
threadPool = mock(ThreadPool.class); threadPool = mock(ThreadPool.class);
when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor);
when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor); when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor);
when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor); when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor);
MockitoAnnotations.openMocks(this); MockitoAnnotations.openMocks(this);

View file

@ -70,6 +70,7 @@ import org.elasticsearch.test.index.IndexVersionUtils;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -373,9 +374,9 @@ public class TransportBulkActionTests extends ESTestCase {
assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(mixed), indicesLookup, systemIndices)); assertFalse(TransportBulkAction.isOnlySystem(buildBulkStreamRequest(mixed), indicesLookup, systemIndices));
} }
private void blockWriteThreadPool(CountDownLatch blockingLatch) { private void blockWriteCoordinationThreadPool(CountDownLatch blockingLatch) {
assertThat(blockingLatch.getCount(), greaterThan(0L)); assertThat(blockingLatch.getCount(), greaterThan(0L));
final var executor = threadPool.executor(ThreadPool.Names.WRITE); final var executor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION);
// Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full. // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full.
expectThrows(EsRejectedExecutionException.class, () -> { expectThrows(EsRejectedExecutionException.class, () -> {
// noinspection InfiniteLoopStatement // noinspection InfiniteLoopStatement
@ -385,12 +386,42 @@ public class TransportBulkActionTests extends ESTestCase {
}); });
} }
public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ThreadPoolStats.Stats stats = threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION))
.findAny()
.get();
assertThat(stats.completed(), equalTo(0L));
ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
future.actionGet();
assertBusy(() -> {
// Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the
// index
// is created.
assertThat(
threadPool.stats()
.stats()
.stream()
.filter(s -> s.name().equals(ThreadPool.Names.WRITE_COORDINATION))
.findAny()
.get()
.completed(),
equalTo(2L)
);
});
}
public void testRejectCoordination() { public void testRejectCoordination() {
BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap()));
final var blockingLatch = new CountDownLatch(1); final var blockingLatch = new CountDownLatch(1);
try { try {
blockWriteThreadPool(blockingLatch); blockWriteCoordinationThreadPool(blockingLatch);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>(); PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future); ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
expectThrows(EsRejectedExecutionException.class, future); expectThrows(EsRejectedExecutionException.class, future);
@ -405,7 +436,7 @@ public class TransportBulkActionTests extends ESTestCase {
bulkAction.failIndexCreationException = randomBoolean() ? new ResourceAlreadyExistsException("index already exists") : null; bulkAction.failIndexCreationException = randomBoolean() ? new ResourceAlreadyExistsException("index already exists") : null;
final var blockingLatch = new CountDownLatch(1); final var blockingLatch = new CountDownLatch(1);
try { try {
bulkAction.beforeIndexCreation = () -> blockWriteThreadPool(blockingLatch); bulkAction.beforeIndexCreation = () -> blockWriteCoordinationThreadPool(blockingLatch);
PlainActionFuture<BulkResponse> future = new PlainActionFuture<>(); PlainActionFuture<BulkResponse> future = new PlainActionFuture<>();
ActionTestUtils.execute(bulkAction, null, bulkRequest, future); ActionTestUtils.execute(bulkAction, null, bulkRequest, future);
expectThrows(EsRejectedExecutionException.class, future); expectThrows(EsRejectedExecutionException.class, future);

View file

@ -32,6 +32,8 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
protected Settings nodeSettings() { protected Settings nodeSettings() {
return Settings.builder() return Settings.builder()
.put("node.name", "es-thread-pool-executor-tests") .put("node.name", "es-thread-pool-executor-tests")
.put("thread_pool.write_coordination.size", 1)
.put("thread_pool.write_coordination.queue_size", 0)
.put("thread_pool.write.size", 1) .put("thread_pool.write.size", 1)
.put("thread_pool.write.queue_size", 0) .put("thread_pool.write.queue_size", 0)
.put("thread_pool.search.size", 1) .put("thread_pool.search.size", 1)
@ -41,7 +43,7 @@ public class EsThreadPoolExecutorTests extends ESSingleNodeTestCase {
public void testRejectedExecutionExceptionContainsNodeName() { public void testRejectedExecutionExceptionContainsNodeName() {
// we test a fixed and an auto-queue executor but not scaling since it does not reject // we test a fixed and an auto-queue executor but not scaling since it does not reject
runThreadPoolExecutorTest(1, ThreadPool.Names.WRITE); runThreadPoolExecutorTest(1, randomFrom(ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.WRITE));
runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH); runThreadPoolExecutorTest(2, ThreadPool.Names.SEARCH);
} }

View file

@ -573,6 +573,7 @@ public class ThreadPoolTests extends ESTestCase {
ThreadPool.Names.GENERIC, ThreadPool.Names.GENERIC,
ThreadPool.Names.ANALYZE, ThreadPool.Names.ANALYZE,
ThreadPool.Names.WRITE, ThreadPool.Names.WRITE,
ThreadPool.Names.WRITE_COORDINATION,
ThreadPool.Names.SEARCH ThreadPool.Names.SEARCH
); );
final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName); final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName);

View file

@ -75,6 +75,7 @@ public class EnrichCoordinatorProxyAction extends ActionType<SearchResponse> {
// search thread, which could end up here again if there is more than one enrich processor in a pipeline. // search thread, which could end up here again if there is more than one enrich processor in a pipeline.
assert ThreadPool.assertCurrentThreadPool( assert ThreadPool.assertCurrentThreadPool(
ThreadPool.Names.WRITE, ThreadPool.Names.WRITE,
ThreadPool.Names.WRITE_COORDINATION,
ThreadPool.Names.SYSTEM_WRITE, ThreadPool.Names.SYSTEM_WRITE,
ThreadPool.Names.SEARCH, ThreadPool.Names.SEARCH,
ThreadPool.Names.MANAGEMENT ThreadPool.Names.MANAGEMENT