Unique names for bulk processor scheduler threads (#69432)

Today every `BulkProcessor` creates two scheduler threads, both called
`[node-name][scheduler][T#1]`, which is also the name of the main
scheduler thread for the node. The duplicated thread names make it
harder to interpret a thread dump.

This commit makes the names of these threads distinct.

Closes #68470
This commit is contained in:
David Turner 2021-03-04 12:05:24 +00:00 committed by GitHub
parent ae7fd00208
commit 864ff66f68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 121 additions and 39 deletions

View file

@ -58,7 +58,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
return BulkProcessor.builder( return BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT,
bulkListener), listener); bulkListener), listener, "BulkProcessorIT");
} }
public void testThatBulkProcessorCountIsCorrect() throws Exception { public void testThatBulkProcessorCountIsCorrect() throws Exception {

View file

@ -37,8 +37,8 @@ public class BulkProcessorRetryIT extends ESRestHighLevelClientTestCase {
private static final String INDEX_NAME = "index"; private static final String INDEX_NAME = "index";
private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) { private static BulkProcessor.Builder initBulkProcessorBuilder(BulkProcessor.Listener listener) {
return BulkProcessor.builder( return BulkProcessor.builder((request, bulkListener)
(request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener, "BulkProcessorRetryIT");
} }
public void testBulkRejectionLoadWithoutBackoff() throws Exception { public void testBulkRejectionLoadWithoutBackoff() throws Exception {

View file

@ -810,7 +810,7 @@ public class CrudIT extends ESRestHighLevelClientTestCase {
try (BulkProcessor processor = BulkProcessor.builder( try (BulkProcessor processor = BulkProcessor.builder(
(request, bulkListener) -> highLevelClient().bulkAsync(request, (request, bulkListener) -> highLevelClient().bulkAsync(request,
RequestOptions.DEFAULT, bulkListener), listener) RequestOptions.DEFAULT, bulkListener), listener, "CrudIT")
.setConcurrentRequests(0) .setConcurrentRequests(0)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB)) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.GB))
.setBulkActions(nbItems + 1) .setBulkActions(nbItems + 1)

View file

@ -1554,7 +1554,7 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
BulkProcessor bulkProcessor = BulkProcessor.builder( BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) -> (request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener).build(); // <5> listener, "bulk-processor-name").build(); // <5>
// end::bulk-processor-init // end::bulk-processor-init
assertNotNull(bulkProcessor); assertNotNull(bulkProcessor);
@ -1616,7 +1616,7 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase {
BulkProcessor.Builder builder = BulkProcessor.builder( BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) -> (request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener); listener, "bulk-processor-name");
builder.setBulkActions(500); // <1> builder.setBulkActions(500); // <1>
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2> builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2>
builder.setConcurrentRequests(0); // <3> builder.setConcurrentRequests(0); // <3>

View file

@ -98,7 +98,7 @@ public class EvilThreadPoolTests extends ESTestCase {
} }
public void testExecutionErrorOnScheduler() throws InterruptedException { public void testExecutionErrorOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
try { try {
checkExecutionError(getExecuteRunner(scheduler)); checkExecutionError(getExecuteRunner(scheduler));
checkExecutionError(getSubmitRunner(scheduler)); checkExecutionError(getSubmitRunner(scheduler));
@ -197,7 +197,7 @@ public class EvilThreadPoolTests extends ESTestCase {
} }
public void testExecutionExceptionOnScheduler() throws InterruptedException { public void testExecutionExceptionOnScheduler() throws InterruptedException {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
try { try {
checkExecutionException(getExecuteRunner(scheduler), true); checkExecutionException(getExecuteRunner(scheduler), true);
// while submit does return a Future, we choose to log exceptions anyway, // while submit does return a Future, we choose to log exceptions anyway,

View file

@ -193,7 +193,7 @@ public class CCSDuelIT extends ESRestTestCase {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
throw new AssertionError("Failed to execute bulk", failure); throw new AssertionError("Failed to execute bulk", failure);
} }
}).build(); }, "CCSDuelIT").build();
int numQuestions = randomIntBetween(50, 100); int numQuestions = randomIntBetween(50, 100);
for (int i = 0; i < numQuestions; i++) { for (int i = 0; i < numQuestions; i++) {

View file

@ -41,12 +41,13 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class BulkProcessorIT extends ESIntegTestCase { public class BulkProcessorIT extends ESIntegTestCase {
public void testThatBulkProcessorCountIsCorrect() throws Exception { public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
int numDocs = randomIntBetween(10, 100); int numDocs = randomIntBetween(10, 100);
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
//let's make sure that the bulk action limit trips, one single execution will index all the documents //let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
@ -70,7 +71,7 @@ public class BulkProcessorIT extends ESIntegTestCase {
int numDocs = randomIntBetween(10, 100); int numDocs = randomIntBetween(10, 100);
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
//let's make sure that this bulk won't be automatically flushed //let's make sure that this bulk won't be automatically flushed
.setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100)) .setConcurrentRequests(randomIntBetween(0, 10)).setBulkActions(numDocs + randomIntBetween(1, 100))
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@ -105,7 +106,7 @@ public class BulkProcessorIT extends ESIntegTestCase {
MultiGetRequestBuilder multiGetRequestBuilder; MultiGetRequestBuilder multiGetRequestBuilder;
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values //set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {
@ -144,7 +145,7 @@ public class BulkProcessorIT extends ESIntegTestCase {
BulkProcessorTestListener listener = new BulkProcessorTestListener(); BulkProcessorTestListener listener = new BulkProcessorTestListener();
int numDocs = randomIntBetween(10, 100); int numDocs = randomIntBetween(10, 100);
BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
//let's make sure that the bulk action limit trips, one single execution will index all the documents //let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
@ -191,7 +192,7 @@ public class BulkProcessorIT extends ESIntegTestCase {
MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet(); MultiGetRequestBuilder multiGetRequestBuilder = client().prepareMultiGet();
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch, closeLatch);
try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener) try (BulkProcessor processor = BulkProcessor.builder(client()::bulk, listener, "BulkProcessorIT")
.setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions) .setConcurrentRequests(concurrentRequests).setBulkActions(bulkActions)
//set interval and size to high values //set interval and size to high values
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) { .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)).build()) {

View file

@ -85,7 +85,7 @@ public class BulkProcessorRetryIT extends ESIntegTestCase {
responses.add(failure); responses.add(failure);
latch.countDown(); latch.countDown();
} }
}).setBulkActions(1) }, "BulkProcssorRetryIT").setBulkActions(1)
// zero means that we're in the sync case, more means that we're in the async case // zero means that we're in the sync case, more means that we're in the async case
.setConcurrentRequests(randomIntBetween(0, 100)) .setConcurrentRequests(randomIntBetween(0, 100))
.setBackoffPolicy(internalPolicy) .setBackoffPolicy(internalPolicy)

View file

@ -436,7 +436,7 @@ public class ConcurrentSeqNoVersioningIT extends AbstractDisruptionTestCase {
LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion); LinearizabilityChecker.SequentialSpec spec = new CASSequentialSpec(initialVersion);
boolean linearizable = false; boolean linearizable = false;
try { try {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
final AtomicBoolean abort = new AtomicBoolean(); final AtomicBoolean abort = new AtomicBoolean();
// Large histories can be problematic and have the linearizability checker run OOM // Large histories can be problematic and have the linearizability checker run OOM
// Bound the time how long the checker can run on such histories (Values empirically determined) // Bound the time how long the checker can run on such histories (Values empirically determined)

View file

@ -42,6 +42,9 @@ import java.util.function.Supplier;
*/ */
public class BulkProcessor implements Closeable { public class BulkProcessor implements Closeable {
static final String FLUSH_SCHEDULER_NAME_SUFFIX = "-flush-scheduler";
static final String RETRY_SCHEDULER_NAME_SUFFIX = "-retry-scheduler";
/** /**
* A listener for the execution. * A listener for the execution.
*/ */
@ -198,7 +201,7 @@ public class BulkProcessor implements Closeable {
* @param client The client that executes the bulk operations * @param client The client that executes the bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events * @param listener The BulkProcessor listener that gets called on bulk events
* @return the builder for BulkProcessor * @return the builder for BulkProcessor
* @deprecated Use {@link #builder(java.util.function.BiConsumer, org.elasticsearch.action.bulk.BulkProcessor.Listener)} * @deprecated Use {@link #builder(BiConsumer, Listener, String)}
* with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client, * with client::bulk as the first argument, or {@link #builder(org.elasticsearch.client.Client,
* org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler, * org.elasticsearch.action.bulk.BulkProcessor.Listener, org.elasticsearch.threadpool.Scheduler,
* org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly * org.elasticsearch.threadpool.Scheduler, java.lang.Runnable)} and manage the flush and retry schedulers explicitly
@ -214,19 +217,31 @@ public class BulkProcessor implements Closeable {
* @param consumer The consumer that is called to fulfil bulk operations * @param consumer The consumer that is called to fulfil bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events * @param listener The BulkProcessor listener that gets called on bulk events
* @return the builder for BulkProcessor * @return the builder for BulkProcessor
* @deprecated use {@link #builder(BiConsumer, Listener, String)} instead
*/ */
@Deprecated
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) { public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
return builder(consumer, listener, "anonymous-bulk-processor");
}
/**
* @param consumer The consumer that is called to fulfil bulk operations
* @param listener The BulkProcessor listener that gets called on bulk events
* @param name The name of this processor, e.g. to identify the scheduler threads
* @return the builder for BulkProcessor
*/
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener, String name) {
Objects.requireNonNull(consumer, "consumer"); Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener"); Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor flushScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor flushScheduler = Scheduler.initScheduler(Settings.EMPTY, name + FLUSH_SCHEDULER_NAME_SUFFIX);
final ScheduledThreadPoolExecutor retryScheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor retryScheduler = Scheduler.initScheduler(Settings.EMPTY, name + RETRY_SCHEDULER_NAME_SUFFIX);
return new Builder(consumer, listener, return new Builder(consumer, listener,
buildScheduler(flushScheduledThreadPoolExecutor), buildScheduler(flushScheduler),
buildScheduler(retryScheduledThreadPoolExecutor), buildScheduler(retryScheduler),
() -> () ->
{ {
Scheduler.terminate(flushScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); Scheduler.terminate(flushScheduler, 10, TimeUnit.SECONDS);
Scheduler.terminate(retryScheduledThreadPoolExecutor, 10, TimeUnit.SECONDS); Scheduler.terminate(retryScheduler, 10, TimeUnit.SECONDS);
}); });
} }

View file

@ -38,11 +38,12 @@ public interface Scheduler {
* Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will * Notice that if any scheduled jobs fail with an exception, these will bubble up to the uncaught exception handler where they will
* be logged as a warning. This includes jobs started using execute, submit and schedule. * be logged as a warning. This includes jobs started using execute, submit and schedule.
* @param settings the settings to use * @param settings the settings to use
* @param schedulerName a string that identifies the threads belonging to this scheduler
* @return executor * @return executor
*/ */
static ScheduledThreadPoolExecutor initScheduler(Settings settings) { static ScheduledThreadPoolExecutor initScheduler(Settings settings, String schedulerName) {
final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1, final ScheduledThreadPoolExecutor scheduler = new SafeScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy()); EsExecutors.daemonThreadFactory(settings, schedulerName), new EsAbortPolicy());
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true); scheduler.setRemoveOnCancelPolicy(true);

View file

@ -206,7 +206,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler {
.map(holder -> holder.info) .map(holder -> holder.info)
.collect(Collectors.toList()); .collect(Collectors.toList());
this.threadPoolInfo = new ThreadPoolInfo(infos); this.threadPoolInfo = new ThreadPoolInfo(infos);
this.scheduler = Scheduler.initScheduler(settings); this.scheduler = Scheduler.initScheduler(settings, "scheduler");
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings); TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis()); this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start(); this.cachedTimeThread.start();

View file

@ -10,6 +10,7 @@ package org.elasticsearch.action.bulk;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
@ -17,12 +18,14 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.Scheduler;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteTransportException;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -41,6 +44,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class BulkProcessorTests extends ESTestCase { public class BulkProcessorTests extends ESTestCase {
private ThreadPool threadPool; private ThreadPool threadPool;
@ -96,6 +104,56 @@ public class BulkProcessorTests extends ESTestCase {
bulkProcessor.close(); bulkProcessor.close();
} }
public void testRetry() throws Exception {
final int maxAttempts = between(1, 3);
final AtomicInteger attemptRef = new AtomicInteger();
final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer = (request, listener) -> {
final int attempt = attemptRef.incrementAndGet();
assertThat(attempt, lessThanOrEqualTo(maxAttempts));
if (attempt != 1) {
assertThat(Thread.currentThread().getName(), containsString("[BulkProcessorTests-retry-scheduler]"));
}
if (attempt == maxAttempts) {
listener.onFailure(new ElasticsearchException("final failure"));
} else {
listener.onFailure(new RemoteTransportException("remote", new EsRejectedExecutionException("retryable failure")));
}
};
final CountDownLatch countDownLatch = new CountDownLatch(1);
final BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
fail("afterBulk should not return success");
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
assertThat(failure, instanceOf(ElasticsearchException.class));
assertThat(failure.getMessage(), equalTo("final failure"));
countDownLatch.countDown();
}
};
try (BulkProcessor bulkProcessor = BulkProcessor
.builder(consumer, listener, "BulkProcessorTests")
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.ZERO, Integer.MAX_VALUE))
.build()) {
bulkProcessor.add(new IndexRequest());
bulkProcessor.flush();
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
}
assertThat(attemptRef.get(), equalTo(maxAttempts));
}
public void testConcurrentExecutions() throws Exception { public void testConcurrentExecutions() throws Exception {
final AtomicBoolean called = new AtomicBoolean(false); final AtomicBoolean called = new AtomicBoolean(false);
final AtomicReference<Throwable> exceptionRef = new AtomicReference<>(); final AtomicReference<Throwable> exceptionRef = new AtomicReference<>();

View file

@ -23,6 +23,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.LongStream; import java.util.stream.LongStream;
import static org.hamcrest.Matchers.containsString;
public class SchedulerTests extends ESTestCase { public class SchedulerTests extends ESTestCase {
public void testCancelOnThreadPool() { public void testCancelOnThreadPool() {
@ -51,7 +53,7 @@ public class SchedulerTests extends ESTestCase {
} }
public void testCancelOnScheduler() { public void testCancelOnScheduler() {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
Scheduler scheduler = (command, delay, name) -> Scheduler scheduler = (command, delay, name) ->
Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
@ -130,13 +132,17 @@ public class SchedulerTests extends ESTestCase {
// simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests // simple test for successful scheduling, exceptions tested more thoroughly in EvilThreadPoolTests
public void testScheduledOnScheduler() throws InterruptedException { public void testScheduledOnScheduler() throws InterruptedException {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); final String schedulerName = "test-scheduler";
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, schedulerName);
Scheduler scheduler = (command, delay, name) -> Scheduler scheduler = (command, delay, name) ->
Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); Scheduler.wrapAsScheduledCancellable(executor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS));
CountDownLatch missingExecutions = new CountDownLatch(1); CountDownLatch missingExecutions = new CountDownLatch(1);
try { try {
scheduler.schedule(missingExecutions::countDown, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME); scheduler.schedule(() -> {
assertThat(Thread.currentThread().getName(), containsString("[" + schedulerName + "]"));
missingExecutions.countDown();
}, TimeValue.timeValueMillis(randomInt(5)), ThreadPool.Names.SAME);
assertTrue(missingExecutions.await(30, TimeUnit.SECONDS)); assertTrue(missingExecutions.await(30, TimeUnit.SECONDS));
} finally { } finally {
Scheduler.terminate(executor, 10, TimeUnit.SECONDS); Scheduler.terminate(executor, 10, TimeUnit.SECONDS);
@ -144,7 +150,7 @@ public class SchedulerTests extends ESTestCase {
} }
public void testScheduleAtFixedRate() throws InterruptedException { public void testScheduleAtFixedRate() throws InterruptedException {
ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY); ScheduledThreadPoolExecutor executor = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
try { try {
CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10)); CountDownLatch missingExecutions = new CountDownLatch(randomIntBetween(1, 10));
executor.scheduleAtFixedRate(missingExecutions::countDown, executor.scheduleAtFixedRate(missingExecutions::countDown,

View file

@ -588,7 +588,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final AtomicBoolean abort = new AtomicBoolean(); final AtomicBoolean abort = new AtomicBoolean();
// Large histories can be problematic and have the linearizability checker run OOM // Large histories can be problematic and have the linearizability checker run OOM
// Bound the time how long the checker can run on such histories (Values empirically determined) // Bound the time how long the checker can run on such histories (Values empirically determined)
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
try { try {
if (history.size() > 300) { if (history.size() > 300) {
scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS); scheduler.schedule(() -> abort.set(true), 10, TimeUnit.SECONDS);

View file

@ -464,7 +464,7 @@ public class IndexFollowingIT extends CcrIntegTestCase {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
}; };
int bulkSize = between(1, 20); int bulkSize = between(1, 20);
BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener) BulkProcessor bulkProcessor = BulkProcessor.builder(leaderClient()::bulk, listener, "IndexFollowingIT")
.setBulkActions(bulkSize) .setBulkActions(bulkSize)
.setConcurrentRequests(4) .setConcurrentRequests(4)
.build(); .build();

View file

@ -1390,7 +1390,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase {
@Override @Override
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) { protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(final LongSupplier followerGlobalCheckpoint) {
if (scheduleRetentionLeaseRenewal.get()) { if (scheduleRetentionLeaseRenewal.get()) {
final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY); final ScheduledThreadPoolExecutor scheduler = Scheduler.initScheduler(Settings.EMPTY, "test-scheduler");
final ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay( final ScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
() -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()), () -> retentionLeaseRenewal.accept(followerGlobalCheckpoint.getAsLong()),
0, 0,

View file

@ -125,7 +125,7 @@ public class DeprecationIndexingComponent extends AbstractLifecycleComponent imp
// This configuration disables the size count and size thresholds, // This configuration disables the size count and size thresholds,
// and instead uses a scheduled flush only. This means that calling // and instead uses a scheduled flush only. This means that calling
// processor.add() will not block the calling thread. // processor.add() will not block the calling thread.
return BulkProcessor.builder(client::bulk, listener) return BulkProcessor.builder(client::bulk, listener, "deprecation-indexing")
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3)) .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 3))
.setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings))) .setConcurrentRequests(Math.max(2, EsExecutors.allocatedProcessors(settings)))
.setBulkActions(-1) .setBulkActions(-1)

View file

@ -110,7 +110,7 @@ public class ILMHistoryStore implements Closeable {
long items = request.numberOfActions(); long items = request.numberOfActions();
logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure); logger.error(new ParameterizedMessage("failed to index {} items into ILM history index", items), failure);
} }
}) }, "ilm-history-store")
.setBulkActions(100) .setBulkActions(100)
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(5)) .setFlushInterval(TimeValue.timeValueSeconds(5))

View file

@ -232,7 +232,7 @@ class RollupShardIndexer {
numSent.addAndGet(-items); numSent.addAndGet(-items);
} }
}; };
return BulkProcessor.builder(client::bulk, listener) return BulkProcessor.builder(client::bulk, listener, "rollup-shard-indexer")
.setBulkActions(10000) .setBulkActions(10000)
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB))
// execute the bulk request on the same thread // execute the bulk request on the same thread

View file

@ -379,7 +379,7 @@ public class Watcher extends Plugin implements SystemIndexPlugin, ScriptPlugin,
public void afterBulk(long executionId, BulkRequest request, Throwable failure) { public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("error executing bulk", failure); logger.error("error executing bulk", failure);
} }
}) }, "watcher")
.setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings)) .setFlushInterval(SETTING_BULK_FLUSH_INTERVAL.get(settings))
.setBulkActions(SETTING_BULK_ACTIONS.get(settings)) .setBulkActions(SETTING_BULK_ACTIONS.get(settings))
.setBulkSize(SETTING_BULK_SIZE.get(settings)) .setBulkSize(SETTING_BULK_SIZE.get(settings))

View file

@ -130,7 +130,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
parser = mock(TriggeredWatch.Parser.class); parser = mock(TriggeredWatch.Parser.class);
BulkProcessor bulkProcessor = BulkProcessor. BulkProcessor bulkProcessor = BulkProcessor.
builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); builder(client::bulk, listener, "TriggeredWatchStoreTests").setConcurrentRequests(0).setBulkActions(1).build();
triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor); triggeredWatchStore = new TriggeredWatchStore(settings, client, parser, bulkProcessor);
} }

View file

@ -72,7 +72,8 @@ public class HistoryStoreTests extends ESTestCase {
when(client.settings()).thenReturn(settings); when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings)); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class); BulkProcessor.Listener listener = mock(BulkProcessor.Listener.class);
BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulk, listener).setConcurrentRequests(0).setBulkActions(1).build(); BulkProcessor bulkProcessor
= BulkProcessor.builder(client::bulk, listener, "HistoryStoreTests").setConcurrentRequests(0).setBulkActions(1).build();
historyStore = new HistoryStore(bulkProcessor); historyStore = new HistoryStore(bulkProcessor);
} }