diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index 3098865d00a7..4ccaa55d69c3 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -410,7 +410,7 @@ public class IngestRestartIT extends ESIntegTestCase { private void blockSystemWriteThreadPool(CountDownLatch blockingLatch, ThreadPool threadPool) { assertThat(blockingLatch.getCount(), greaterThan(0L)); - final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); + final var executor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); // Add tasks repeatedly until we get an EsRejectedExecutionException which indicates that the threadpool and its queue are full. expectThrows(EsRejectedExecutionException.class, () -> { // noinspection InfiniteLoopStatement diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index c7fd31e529c1..262384817f79 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -69,7 +69,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction private final IngestActionForwarder ingestForwarder; protected final LongSupplier relativeTimeNanosProvider; protected final Executor coordinationExecutor; - protected final Executor systemCoordinationExecutor; + protected final Executor writeExecutor; + protected final Executor systemWriteExecutor; private final ActionType bulkAction; public TransportAbstractBulkAction( @@ -93,7 +94,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction this.systemIndices = systemIndices; this.projectResolver = projectResolver; this.coordinationExecutor = threadPool.executor(ThreadPool.Names.WRITE_COORDINATION); - this.systemCoordinationExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE_COORDINATION); + this.writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); + this.systemWriteExecutor = threadPool.executor(ThreadPool.Names.SYSTEM_WRITE); this.ingestForwarder = new IngestActionForwarder(transportService); clusterService.addStateApplier(this.ingestForwarder); this.relativeTimeNanosProvider = relativeTimeNanosProvider; @@ -132,14 +134,14 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } final ActionListener releasingListener = ActionListener.runBefore(listener, releasable::close); // Use coordinationExecutor for dispatching coordination tasks - final Executor executor = isOnlySystem ? systemCoordinationExecutor : coordinationExecutor; - ensureClusterStateThenForkAndExecute(task, bulkRequest, executor, releasingListener); + ensureClusterStateThenForkAndExecute(task, bulkRequest, coordinationExecutor, isOnlySystem, releasingListener); } private void ensureClusterStateThenForkAndExecute( Task task, BulkRequest bulkRequest, Executor executor, + boolean isOnlySystem, ActionListener releasingListener ) { final ClusterState initialState = clusterService.state(); @@ -161,7 +163,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction clusterStateObserver.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - forkAndExecute(task, bulkRequest, executor, releasingListener); + forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } @Override @@ -175,21 +177,32 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } }, newState -> false == newState.blocks().hasGlobalBlockWithLevel(projectId, ClusterBlockLevel.WRITE)); } else { - forkAndExecute(task, bulkRequest, executor, releasingListener); + forkAndExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } } - private void forkAndExecute(Task task, BulkRequest bulkRequest, Executor executor, ActionListener releasingListener) { + private void forkAndExecute( + Task task, + BulkRequest bulkRequest, + Executor executor, + boolean isOnlySystem, + ActionListener releasingListener + ) { executor.execute(new ActionRunnable<>(releasingListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, releasingListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, releasingListener); } }); } - private boolean applyPipelines(Task task, BulkRequest bulkRequest, Executor executor, ActionListener listener) - throws IOException { + private boolean applyPipelines( + Task task, + BulkRequest bulkRequest, + Executor executor, + boolean isOnlySystem, + ActionListener listener + ) throws IOException { boolean hasIndexRequestsWithPipelines = false; ClusterState state = clusterService.state(); ProjectId projectId = projectResolver.getProjectId(); @@ -278,7 +291,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction assert arePipelinesResolved : bulkRequest; } if (clusterService.localNode().isIngestNode()) { - processBulkIndexIngestRequest(task, bulkRequest, executor, project, l); + processBulkIndexIngestRequest(task, bulkRequest, executor, isOnlySystem, project, l); } else { ingestForwarder.forwardIngestRequest(bulkAction, bulkRequest, l); } @@ -292,6 +305,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest original, Executor executor, + boolean isOnlySystem, ProjectMetadata metadata, ActionListener listener ) { @@ -325,7 +339,7 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction ActionRunnable runnable = new ActionRunnable<>(actionListener) { @Override protected void doRun() throws IOException { - applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, actionListener); + applyPipelinesAndDoInternalExecute(task, bulkRequest, executor, isOnlySystem, actionListener); } @Override @@ -348,7 +362,8 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction } } }, - executor + // Use the appropriate write executor for actual ingest processing + isOnlySystem ? systemWriteExecutor : writeExecutor ); } @@ -404,10 +419,11 @@ public abstract class TransportAbstractBulkAction extends HandledTransportAction Task task, BulkRequest bulkRequest, Executor executor, + boolean isOnlySystem, ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); - if (applyPipelines(task, bulkRequest, executor, listener) == false) { + if (applyPipelines(task, bulkRequest, executor, isOnlySystem, listener) == false) { doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index bac2df150045..336d978358b9 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -198,17 +198,6 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { true ) ); - result.put( - ThreadPool.Names.SYSTEM_WRITE_COORDINATION, - new FixedExecutorBuilder( - settings, - ThreadPool.Names.SYSTEM_WRITE_COORDINATION, - halfProcMaxAt5, - 1000, - new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA), - true - ) - ); result.put( ThreadPool.Names.SYSTEM_CRITICAL_READ, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java index a0c8795a388d..6d438298acff 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ExecutorBuilder.java @@ -42,7 +42,6 @@ public abstract class ExecutorBuilder, Scheduler, public static final String FETCH_SHARD_STORE = "fetch_shard_store"; public static final String SYSTEM_READ = "system_read"; public static final String SYSTEM_WRITE = "system_write"; - public static final String SYSTEM_WRITE_COORDINATION = "system_write_coordination"; public static final String SYSTEM_CRITICAL_READ = "system_critical_read"; public static final String SYSTEM_CRITICAL_WRITE = "system_critical_write"; } @@ -188,8 +187,8 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.CLUSTER_COORDINATION, ThreadPoolType.FIXED), entry(Names.GET, ThreadPoolType.FIXED), entry(Names.ANALYZE, ThreadPoolType.FIXED), - entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.WRITE_COORDINATION, ThreadPoolType.FIXED), + entry(Names.WRITE, ThreadPoolType.FIXED), entry(Names.SEARCH, ThreadPoolType.FIXED), entry(Names.SEARCH_COORDINATION, ThreadPoolType.FIXED), entry(Names.AUTO_COMPLETE, ThreadPoolType.FIXED), @@ -205,7 +204,6 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), entry(Names.SYSTEM_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_WRITE, ThreadPoolType.FIXED), - entry(Names.SYSTEM_WRITE_COORDINATION, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_READ, ThreadPoolType.FIXED), entry(Names.SYSTEM_CRITICAL_WRITE, ThreadPoolType.FIXED) ); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java index a8634d0a7dac..fc26c7d413fb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionIngestTests.java @@ -109,7 +109,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { private FeatureService mockFeatureService; private static final ExecutorService writeCoordinationExecutor = new NamedDirectExecutorService("write_coordination"); - private static final ExecutorService systemWriteCoordinationExecutor = new NamedDirectExecutorService("system_write_coordination"); + private static final ExecutorService writeExecutor = new NamedDirectExecutorService("write"); + private static final ExecutorService systemWriteExecutor = new NamedDirectExecutorService("system_write"); private final ProjectId projectId = randomProjectIdOrDefault(); @@ -295,7 +296,8 @@ public class TransportBulkActionIngestTests extends ESTestCase { // initialize captors, which must be members to use @Capture because of generics threadPool = mock(ThreadPool.class); when(threadPool.executor(eq(ThreadPool.Names.WRITE_COORDINATION))).thenReturn(writeCoordinationExecutor); - when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE_COORDINATION))).thenReturn(systemWriteCoordinationExecutor); + when(threadPool.executor(eq(ThreadPool.Names.WRITE))).thenReturn(writeExecutor); + when(threadPool.executor(eq(ThreadPool.Names.SYSTEM_WRITE))).thenReturn(systemWriteExecutor); MockitoAnnotations.openMocks(this); // setup services that will be called by action transportService = mock(TransportService.class); @@ -426,7 +428,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { redirectHandler.capture(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -477,7 +479,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -526,7 +528,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(systemWriteCoordinationExecutor) + same(systemWriteExecutor) ); completionHandler.getValue().accept(null, exception); assertTrue(failureCalled.get()); @@ -687,7 +689,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); assertEquals(indexRequest1.getPipeline(), "default_pipeline"); assertEquals(indexRequest2.getPipeline(), "default_pipeline"); @@ -738,7 +740,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); completionHandler.getValue().accept(null, exception); assertFalse(action.indexCreated); // still no index yet, the ingest node failed. @@ -832,7 +834,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); } @@ -873,7 +875,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); } @@ -903,7 +905,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); indexRequest1.autoGenerateId(); completionHandler.getValue().accept(Thread.currentThread(), null); @@ -943,7 +945,7 @@ public class TransportBulkActionIngestTests extends ESTestCase { any(), failureHandler.capture(), completionHandler.capture(), - same(writeCoordinationExecutor) + same(writeExecutor) ); assertEquals(indexRequest.getPipeline(), "default_pipeline"); completionHandler.getValue().accept(null, exception); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java index 481fdf5ea353..70964aef7cf0 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportBulkActionTests.java @@ -61,6 +61,7 @@ import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.VersionType; +import org.elasticsearch.indices.EmptySystemIndices; import org.elasticsearch.indices.SystemIndexDescriptorUtils; import org.elasticsearch.indices.SystemIndices; import org.elasticsearch.test.ESTestCase; @@ -125,15 +126,7 @@ public class TransportBulkActionTests extends ESTestCase { new ActionFilters(Collections.emptySet()), new Resolver(), new IndexingPressure(Settings.EMPTY), - new SystemIndices( - List.of( - new SystemIndices.Feature( - "plugin", - "test feature", - List.of(SystemIndexDescriptorUtils.createUnmanaged(".transport_bulk_tests_system*", "")) - ) - ) - ), + EmptySystemIndices.INSTANCE, new ProjectResolver() { @Override public void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E { @@ -393,7 +386,7 @@ public class TransportBulkActionTests extends ESTestCase { }); } - public void testDispatchesToWriteCoordinationThreadPool() throws Exception { + public void testDispatchesToWriteCoordinationThreadPoolOnce() throws Exception { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); PlainActionFuture future = new PlainActionFuture<>(); ThreadPoolStats.Stats stats = threadPool.stats() @@ -408,7 +401,8 @@ public class TransportBulkActionTests extends ESTestCase { assertBusy(() -> { // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the - // index is created. + // index + // is created. assertThat( threadPool.stats() .stats() @@ -422,37 +416,6 @@ public class TransportBulkActionTests extends ESTestCase { }); } - public void testSystemWriteDispatchesToSystemWriteCoordinationThreadPool() throws Exception { - BulkRequest bulkRequest = new BulkRequest().add( - new IndexRequest(".transport_bulk_tests_system_1").id("id").source(Collections.emptyMap()) - ); - PlainActionFuture future = new PlainActionFuture<>(); - ThreadPoolStats.Stats stats = threadPool.stats() - .stats() - .stream() - .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) - .findAny() - .get(); - assertThat(stats.completed(), equalTo(0L)); - ActionTestUtils.execute(bulkAction, null, bulkRequest, future); - future.actionGet(); - - assertBusy(() -> { - // Will increment twice because it will dispatch on the first coordination attempt. And then dispatch a second time after the - // index is created. - assertThat( - threadPool.stats() - .stats() - .stream() - .filter(s -> s.name().equals(ThreadPool.Names.SYSTEM_WRITE_COORDINATION)) - .findAny() - .get() - .completed(), - equalTo(2L) - ); - }); - } - public void testRejectCoordination() { BulkRequest bulkRequest = new BulkRequest().add(new IndexRequest("index").id("id").source(Collections.emptyMap())); diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java index 17e118edb4bc..26e692fbb7c7 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/EnrichCoordinatorProxyAction.java @@ -77,7 +77,6 @@ public class EnrichCoordinatorProxyAction extends ActionType { ThreadPool.Names.WRITE, ThreadPool.Names.WRITE_COORDINATION, ThreadPool.Names.SYSTEM_WRITE, - ThreadPool.Names.SYSTEM_WRITE_COORDINATION, ThreadPool.Names.SEARCH, ThreadPool.Names.MANAGEMENT );