diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 4ccaa55d69c3..3098865d00a7 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -410,7 +410,7 @@ public class IngestRestartIT extends ESIntegTestCase { private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) { assertThat(blockingLatch.getCount(), greaterThan(0L)); - final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); + final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full. expectThrows(EsRejectedExecutionException.class, () -> { // noinspection InfiniteLoopStatement diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 262384817f79..c7fd31e529c1 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -69,8 +69,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction private final IngestActionForwarder ingestForwarder; protected final LongSupplier relativeTimeNanosProvider; protected final Executor coordinationExecutor; - protected final Executor writeExecutor; - protected final Executor systemWriteExecutor; + protected final Executor systemCoordinationExecutor; private final ActionType bulkAction; public TransportAbstractBulkAction( @@ -94,8 +93,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction this.systemIndices = systemIndices; this.projectResolver = projectResolver; this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); - this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); - this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); + this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); this.ingestForwarder = new IngestActionForwarder(transportService); clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; @@ -134,14 +132,14 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); // Use coordinationExecutor for dispatching coordination tasks - ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener); + final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor; + ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); } private void ensureClusterStateThenForkAndExecute( Task task, BulkRequest bulkRequest, Executor executor, - boolean isOnlySystem, ActionListener releasingListener ) { final ClusterState initialState = clusterService.state(); @@ -163,7 +161,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + forkAndExecute(task, bulkRequest, executor, releasingListener); } @Override @@ -177,32 +175,21 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE)); } else { - forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + forkAndExecute(task, bulkRequest, executor, releasingListener); } } - private void forkAndExecute( - Task task, - BulkRequest bulkRequest, - Executor executor, - boolean isOnlySystem, - ActionListener releasingListener - ) { + private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener releasingListener) { executor.execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); } }); } - private boolean applyPipelines( - Task task, - BulkRequest bulkRequest, - Executor executor, - boolean isOnlySystem, - ActionListener listener - ) throws IOException { + private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener listener) + throws IOException { boolean hasIndexRequestsWithPipelines = false; ClusterState state = clusterService.state(); ProjectId projectId = projectResolver.getProjectId(); @@ -291,7 +278,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l); + processBulkIndexIngestRequest(task, bulkRequest, executor, project, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -305,7 +292,6 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest original, Executor executor, - boolean isOnlySystem, ProjectMetadata metadata, ActionListener listener ) { @@ -339,7 +325,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction ActionRunnable runnable = new ActionRunnable<>(actionListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); } @Override @@ -362,8 +348,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } } }, - // Use the appropriate write executor for actual ingest processing - isOnlySystem ? systemWriteExecutor : writeExecutor + executor ); } @@ -419,11 +404,10 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest bulkRequest, Executor executor, - boolean isOnlySystem, ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) { + if (applyPipelines(task, bulkRequest, executor, listener) == false) { doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 336d978358b9..bac2df150045 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -198,6 +198,17 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { true ) ); + result.put( + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, + new FixedExecutorBuilder( + settings, + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, + halfProcMaxAt5, + 1000, + new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), + true + ) + ); result.put( ThreadPool.Names.SYSTEM_CRITICAL_READ, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index 6d438298acff..a0c8795a388d 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -42,6 +42,7 @@ public abstract class ExecutorBuilder, Scheduler, public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; + public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination"; public static final String SYSTEM_CRITICAL_READ = "system_critical_read"; public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write"; } @@ -187,8 +188,8 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED), - entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.WRITE, ThreadPoolType.FIXED), + entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED), @@ -204,6 +205,7 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), entry(Names.SYSTEM_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED), + entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED) ); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index ef24a7944d50..e2c3591092d8 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -108,8 +108,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { private FeatureService mockFeatureService; private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination"); - private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write"); - private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write"); + private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination"); private final ProjectId projectId = randomProjectIdOrDefault(); @@ -295,8 +294,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { // initialize captors, which must be members to use @Capture because of generics threadPool = mock(ThreadPool.class); when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor); - when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor); - when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor); + when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor); MockitoAnnotations.openMocks(this); // setup services that will be called by action transportService = mock(TransportService.class); @@ -424,7 +422,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { redirectHandler.capture(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -475,7 +473,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -524,7 +522,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(systemWriteExecutor) + same(systemWriteCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -685,7 +683,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); @@ -736,7 +734,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. @@ -830,7 +828,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); } @@ -871,7 +869,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); } @@ -901,7 +899,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); indexRequest1.autoGenerateId(); completionHandler.getValue().accept(Thread.currentThread(), null); @@ -941,7 +939,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeExecutor) + same(writeCoordinationExecutor) ); assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(null, exception); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 70964aef7cf0..481fdf5ea353 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -61,7 +61,6 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; -import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; @@ -126,7 +125,15 @@ public class TransportBulkActionTests extends ESTestCase { new ActionFilters(Collections.emptySet()), new Resolver(), new IndexingPressure(Settings.EMPTY), - EmptySystemIndices.INSTANCE, + new SystemIndices( + List.of( + new SystemIndices.Feature( + "plugin", + "test feature", + List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", "")) + ) + ) + ), new ProjectResolver() { @Override public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { @@ -386,7 +393,7 @@ public class TransportBulkActionTests extends ESTestCase { }); } - public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception { + public void testDispatchesToWriteCoordinationThreadPool() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); PlainActionFuture future = new PlainActionFuture<>(); ThreadPoolStats.Stats stats = threadPool.stats() @@ -401,8 +408,7 @@ public class TransportBulkActionTests extends ESTestCase { assertBusy(() -> { // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the - // index - // is created. + // index is created. assertThat( threadPool.stats() .stats() @@ -416,6 +422,37 @@ public class TransportBulkActionTests extends ESTestCase { }); } + public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception { + BulkRequest bulkRequest = new BulkRequest().add( + new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap()) + ); + PlainActionFuture future = new PlainActionFuture<>(); + ThreadPoolStats.Stats stats = threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) + .findAny() + .get(); + assertThat(stats.completed(), equalTo(0L)); + ActionTestUtils.execute(bulkAction, null, bulkRequest, future); + future.actionGet(); + + assertBusy(() -> { + // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the + // index is created. + assertThat( + threadPool.stats() + .stats() + .stream() + .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) + .findAny() + .get() + .completed(), + equalTo(2L) + ); + }); + } + public void testRejectCoordination() { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 26e692fbb7c7..17e118edb4bc 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -77,6 +77,7 @@ public class EnrichCoordinatorProxyAction extends ActionType { ThreadPool.Names.WRITE, ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.SYSTEM_WRITE, + ThreadPool.Names.SYSTEM_WRITE_COORDINATION, ThreadPool.Names.SEARCH, ThreadPool.Names.MANAGEMENT );