From 864ff66f68277db5b31e59fe5780e3772b22ecc7 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 4 Mar 2021 12:05:24 +0000 Subject: [PATCH] Unique names for bulk processor scheduler threads (#69432) Today every `BulkProcessor` creates two scheduler threads, both called `[node-name][scheduler][T#1]`, which is also the name of the main scheduler thread for the node. The duplicated thread names make it harder to interpret a thread dump. This commit makes the names of these threads distinct. Closes #68470 --- .../elasticsearch/client/BulkProcessorIT.java | 2 +- .../client/BulkProcessorRetryIT.java | 4 +- .../java/org/elasticsearch/client/CrudIT.java | 2 +- .../documentation/CRUDDocumentationIT.java | 4 +- .../threadpool/EvilThreadPoolTests.java | 4 +- .../org/elasticsearch/search/CCSDuelIT.java | 2 +- .../action/bulk/BulkProcessorIT.java | 11 ++-- .../action/bulk/BulkProcessorRetryIT.java | 2 +- .../ConcurrentSeqNoVersioningIT.java | 2 +- .../action/bulk/BulkProcessor.java | 29 +++++++--- .../elasticsearch/threadpool/Scheduler.java | 5 +- .../elasticsearch/threadpool/ThreadPool.java | 2 +- .../action/bulk/BulkProcessorTests.java | 58 +++++++++++++++++++ .../threadpool/SchedulerTests.java | 14 +++-- .../AbstractCoordinatorTestCase.java | 2 +- .../xpack/ccr/IndexFollowingIT.java | 2 +- .../ccr/action/ShardFollowNodeTaskTests.java | 2 +- .../logging/DeprecationIndexingComponent.java | 2 +- .../xpack/ilm/history/ILMHistoryStore.java | 2 +- .../xpack/rollup/v2/RollupShardIndexer.java | 2 +- .../elasticsearch/xpack/watcher/Watcher.java | 2 +- .../execution/TriggeredWatchStoreTests.java | 2 +- .../watcher/history/HistoryStoreTests.java | 3 +- 23 files changed, 121 insertions(+), 39 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java index 1a4aa3e4c819..7ced361005c9 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -58,7 +58,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { return BulkProcessor.builder( (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, - bulkListener), listener); + bulkListener), listener, "BulkProcessorIT"); } public void testThatBulkProcessorCountIsCorrect() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java index 9fcf93c7dfae..a7693fe28568 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorRetryIT.java @@ -37,8 +37,8 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase { private static final String INDEX_NAME = "index"; private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { - return BulkProcessor.builder( - (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); + return BulkProcessor.builder((request, bulkListener) + -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener, "BulkProcessorRetryIT"); } public void testBulkRejectionLoadWithoutBackoff() throws Exception { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 4a98a70b2b15..55e45166061e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -810,7 +810,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase { try (BulkProcessor processor = BulkProcessor.builder( (request, bulkListener) -> highLevelClient().bulkAsync(request, - RequestOptions.DEFAULT, bulkListener), listener) + RequestOptions.DEFAULT, bulkListener), listener, "CrudIT") .setConcurrentRequests(0) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) .setBulkActions(nbItems + 1) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index 8ea8dc7ac73a..fb631b6e68ae 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -1554,7 +1554,7 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), - listener).build(); // <5> + listener, "bulk-processor-name").build(); // <5> // end::bulk-processor-init assertNotNull(bulkProcessor); @@ -1616,7 +1616,7 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), - listener); + listener, "bulk-processor-name"); builder.setBulkActions(500); // <1> builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2> builder.setConcurrentRequests(0); // <3> diff --git a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java index 33e3c71f4920..78bd4a231093 100644 --- a/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java +++ b/qa/evil-tests/src/test/java/org/elasticsearch/threadpool/EvilThreadPoolTests.java @@ -98,7 +98,7 @@ public class EvilThreadPoolTests extends ESTestCase { } public void testExecutionErrorOnScheduler() throws InterruptedException { - final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); try { checkExecutionError(getExecuteRunner(scheduler)); checkExecutionError(getSubmitRunner(scheduler)); @@ -197,7 +197,7 @@ public class EvilThreadPoolTests extends ESTestCase { } public void testExecutionExceptionOnScheduler() throws InterruptedException { - final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); try { checkExecutionException(getExecuteRunner(scheduler), true); // while submit does return a Future, we choose to log exceptions anyway, diff --git a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java index 5b1dbb0c86ce..b4aafc797e5e 100644 --- a/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java +++ b/qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java @@ -193,7 +193,7 @@ public class CCSDuelIT extends ESRestTestCase { public void afterBulk(long executionId, BulkRequest request, Throwable failure) { throw new AssertionError("Failed to execute bulk", failure); } - }).build(); + }, "CCSDuelIT").build(); int numQuestions = randomIntBetween(50, 100); for (int i = 0; i < numQuestions; i++) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java index 8d303679a327..5a59adb90816 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorIT.java @@ -41,12 +41,13 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class BulkProcessorIT extends ESIntegTestCase { + public void testThatBulkProcessorCountIsCorrect() throws Exception { final CountDownLatch latch = new CountDownLatch(1); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) @@ -70,7 +71,7 @@ public class BulkProcessorIT extends ESIntegTestCase { int numDocs = randomIntBetween(10, 100); - try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") //let's make sure that this bulk won't be automatically flushed .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -105,7 +106,7 @@ public class BulkProcessorIT extends ESIntegTestCase { MultiGetRequestBuilder multiGetRequestBuilder; - try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { @@ -144,7 +145,7 @@ public class BulkProcessorIT extends ESIntegTestCase { BulkProcessorTestListener listener = new BulkProcessorTestListener(); int numDocs = randomIntBetween(10, 100); - BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) + BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") //let's make sure that the bulk action limit trips, one single execution will index all the documents .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), @@ -191,7 +192,7 @@ public class BulkProcessorIT extends ESIntegTestCase { MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); - try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) + try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT") .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) //set interval and size to high values .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java index 49a770a7ae41..8b060369321d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/bulk/BulkProcessorRetryIT.java @@ -85,7 +85,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase { responses.add(failure); latch.countDown(); } - }).setBulkActions(1) + }, "BulkProcssorRetryIT").setBulkActions(1) // zero means that we're in the sync case, more means that we're in the async case .setConcurrentRequests(randomIntBetween(0, 100)) .setBackoffPolicy(internalPolicy) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java b/server/src/internalClusterTest/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java index 3bb4773b71ea..028ce2ac3ee3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/versioning/ConcurrentSeqNoVersioningIT.java @@ -436,7 +436,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase { LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion); boolean linearizable = false; try { - final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); final AtomicBoolean abort = new AtomicBoolean(); // Large histories can be problematic and have the linearizability checker run OOM // Bound the time how long the checker can run on such histories (Values empirically determined) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 8965fd011df1..7c2419208a63 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -42,6 +42,9 @@ import java.util.function.Supplier; */ public class BulkProcessor implements Closeable { + static final String FLUSH_SCHEDULER_NAME_SUFFIX = "-flush-scheduler"; + static final String RETRY_SCHEDULER_NAME_SUFFIX = "-retry-scheduler"; + /** * A listener for the execution. */ @@ -198,7 +201,7 @@ public class BulkProcessor implements Closeable { * @param client The client that executes the bulk operations * @param listener The BulkProcessor listener that gets called on bulk events * @return the builder for BulkProcessor - * @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)} + * @deprecated Use {@link #builder(BiConsumer, Listener, String)} * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client, * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler, * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly @@ -214,19 +217,31 @@ public class BulkProcessor implements Closeable { * @param consumer The consumer that is called to fulfil bulk operations * @param listener The BulkProcessor listener that gets called on bulk events * @return the builder for BulkProcessor + * @deprecated use {@link #builder(BiConsumer, Listener, String)} instead */ + @Deprecated public static Builder builder(BiConsumer> consumer, Listener listener) { + return builder(consumer, listener, "anonymous-bulk-processor"); + } + + /** + * @param consumer The consumer that is called to fulfil bulk operations + * @param listener The BulkProcessor listener that gets called on bulk events + * @param name The name of this processor, e.g. to identify the scheduler threads + * @return the builder for BulkProcessor + */ + public static Builder builder(BiConsumer> consumer, Listener listener, String name) { Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(listener, "listener"); - final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); - final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor flushScheduler = Scheduler.initScheduler(Settings.EMPTY, name + FLUSH_SCHEDULER_NAME_SUFFIX); + final ScheduledThreadPoolExecutor retryScheduler = Scheduler.initScheduler(Settings.EMPTY, name + RETRY_SCHEDULER_NAME_SUFFIX); return new Builder(consumer, listener, - buildScheduler(flushScheduledThreadPoolExecutor), - buildScheduler(retryScheduledThreadPoolExecutor), + buildScheduler(flushScheduler), + buildScheduler(retryScheduler), () -> { - Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); - Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); + Scheduler.terminate(flushScheduler, 10, TimeUnit.SECONDS); + Scheduler.terminate(retryScheduler, 10, TimeUnit.SECONDS); }); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java index c1c9330d0d12..00fa4b22cdde 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java +++ b/server/src/main/java/org/elasticsearch/threadpool/Scheduler.java @@ -38,11 +38,12 @@ public interface Scheduler { * Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will * be logged as a warning. This includes jobs started using execute, submit and schedule. * @param settings the settings to use + * @param schedulerName a string that identifies the threads belonging to this scheduler * @return executor */ - static ScheduledThreadPoolExecutor initScheduler(Settings settings) { + static ScheduledThreadPoolExecutor initScheduler(Settings settings, String schedulerName) { final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, - EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); + EsExecutors.daemonThreadFactory(settings, schedulerName), new EsAbortPolicy()); scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); scheduler.setRemoveOnCancelPolicy(true); diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 4465cd70e156..699cc72dc94b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -206,7 +206,7 @@ public class ThreadPool implements ReportingService, Scheduler { .map(holder -> holder.info) .collect(Collectors.toList()); this.threadPoolInfo = new ThreadPoolInfo(infos); - this.scheduler = Scheduler.initScheduler(settings); + this.scheduler = Scheduler.initScheduler(settings, "scheduler"); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread.start(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index bb3a8f33b9c4..8d4cad5bd41c 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.bulk; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.index.IndexRequest; @@ -17,12 +18,14 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteTransportException; import org.junit.After; import org.junit.Before; @@ -41,6 +44,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + public class BulkProcessorTests extends ESTestCase { private ThreadPool threadPool; @@ -96,6 +104,56 @@ public class BulkProcessorTests extends ESTestCase { bulkProcessor.close(); } + public void testRetry() throws Exception { + final int maxAttempts = between(1, 3); + final AtomicInteger attemptRef = new AtomicInteger(); + + final BiConsumer> consumer = (request, listener) -> { + final int attempt = attemptRef.incrementAndGet(); + assertThat(attempt, lessThanOrEqualTo(maxAttempts)); + if (attempt != 1) { + assertThat(Thread.currentThread().getName(), containsString("[BulkProcessorTests-retry-scheduler]")); + } + + if (attempt == maxAttempts) { + listener.onFailure(new ElasticsearchException("final failure")); + } else { + listener.onFailure(new RemoteTransportException("remote", new EsRejectedExecutionException("retryable failure"))); + } + }; + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final BulkProcessor.Listener listener = new BulkProcessor.Listener() { + + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + fail("afterBulk should not return success"); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + assertThat(failure, instanceOf(ElasticsearchException.class)); + assertThat(failure.getMessage(), equalTo("final failure")); + countDownLatch.countDown(); + } + }; + + try (BulkProcessor bulkProcessor = BulkProcessor + .builder(consumer, listener, "BulkProcessorTests") + .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.ZERO, Integer.MAX_VALUE)) + .build()) { + bulkProcessor.add(new IndexRequest()); + bulkProcessor.flush(); + assertTrue(countDownLatch.await(5, TimeUnit.SECONDS)); + } + + assertThat(attemptRef.get(), equalTo(maxAttempts)); + } + public void testConcurrentExecutions() throws Exception { final AtomicBoolean called = new AtomicBoolean(false); final AtomicReference exceptionRef = new AtomicReference<>(); diff --git a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java index 8b03aa6a2419..c1aafbc7fa3c 100644 --- a/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/threadpool/SchedulerTests.java @@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.LongStream; +import static org.hamcrest.Matchers.containsString; + public class SchedulerTests extends ESTestCase { public void testCancelOnThreadPool() { @@ -51,7 +53,7 @@ public class SchedulerTests extends ESTestCase { } public void testCancelOnScheduler() { - ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); Scheduler scheduler = (command, delay, name) -> Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); @@ -130,13 +132,17 @@ public class SchedulerTests extends ESTestCase { // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests public void testScheduledOnScheduler() throws InterruptedException { - ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + final String schedulerName = "test-scheduler"; + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, schedulerName); Scheduler scheduler = (command, delay, name) -> Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); CountDownLatch missingExecutions = new CountDownLatch(1); try { - scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); + scheduler.schedule(() -> { + assertThat(Thread.currentThread().getName(), containsString("[" + schedulerName + "]")); + missingExecutions.countDown(); + }, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); } finally { Scheduler.terminate(executor, 10, TimeUnit.SECONDS); @@ -144,7 +150,7 @@ public class SchedulerTests extends ESTestCase { } public void testScheduleAtFixedRate() throws InterruptedException { - ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); + ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); try { CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10)); executor.scheduleAtFixedRate(missingExecutions::countDown, diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 115b70ab1d55..1f709251b2d0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -588,7 +588,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { final AtomicBoolean abort = new AtomicBoolean(); // Large histories can be problematic and have the linearizability checker run OOM // Bound the time how long the checker can run on such histories (Values empirically determined) - final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); try { if (history.size() > 300) { scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS); diff --git a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index 3b5a8c405041..ac8bfc8730d0 100644 --- a/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/internalClusterTest/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -464,7 +464,7 @@ public class IndexFollowingIT extends CcrIntegTestCase { public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }; int bulkSize = between(1, 20); - BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener) + BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener, "IndexFollowingIT") .setBulkActions(bulkSize) .setConcurrentRequests(4) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index b699336f5cd1..969682ee58f7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -1390,7 +1390,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { @Override protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { if (scheduleRetentionLeaseRenewal.get()) { - final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); + final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler"); final ScheduledFuture future = scheduler.scheduleWithFixedDelay( () -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()), 0, diff --git a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java index 47538e613817..b6ac1f174dd7 100644 --- a/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java +++ b/x-pack/plugin/deprecation/src/main/java/org/elasticsearch/xpack/deprecation/logging/DeprecationIndexingComponent.java @@ -125,7 +125,7 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp // This configuration disables the size count and size thresholds, // and instead uses a scheduled flush only. This means that calling // processor.add() will not block the calling thread. - return BulkProcessor.builder(client::bulk, listener) + return BulkProcessor.builder(client::bulk, listener, "deprecation-indexing") .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3)) .setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings))) .setBulkActions(-1) diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java index 607bb7f7bb17..99eebc5d6838 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java @@ -110,7 +110,7 @@ public class ILMHistoryStore implements Closeable { long items = request.numberOfActions(); logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure); } - }) + }, "ilm-history-store") .setBulkActions(100) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java index d73b8e97b218..d0966e20110f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/v2/RollupShardIndexer.java @@ -232,7 +232,7 @@ class RollupShardIndexer { numSent.addAndGet(-items); } }; - return BulkProcessor.builder(client::bulk, listener) + return BulkProcessor.builder(client::bulk, listener, "rollup-shard-indexer") .setBulkActions(10000) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) // execute the bulk request on the same thread diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index dcf1d3bc2243..8c7e1d0e173f 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -379,7 +379,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin, public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("error executing bulk", failure); } - }) + }, "watcher") .setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings)) .setBulkActions(SETTING_BULK_ACTIONS.get(settings)) .setBulkSize(SETTING_BULK_SIZE.get(settings)) diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index 4ae8cee8e982..26357b4607d3 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -130,7 +130,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); parser = mock(TriggeredWatch.Parser.class); BulkProcessor bulkProcessor = BulkProcessor. - builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); + builder(client::bulk, listener, "TriggeredWatchStoreTests").setConcurrentRequests(0).setBulkActions(1).build(); triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); } diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java index 431650c55841..74854945b85b 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/history/HistoryStoreTests.java @@ -72,7 +72,8 @@ public class HistoryStoreTests extends ESTestCase { when(client.settings()).thenReturn(settings); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); - BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); + BulkProcessor bulkProcessor + = BulkProcessor.builder(client::bulk, listener, "HistoryStoreTests").setConcurrentRequests(0).setBulkActions(1).build(); historyStore = new HistoryStore(bulkProcessor); }