Add thread pool utilisation metric (#120363)

There are existing metrics for the active number of threads, but it seems tricky to go from those to a "utilisation" number because all the pools have different sizes.

This commit adds `es.thread_pool.{name}.threads.utilization.current` which will be published by all  `TaskExecutionTimeTrackingEsThreadPoolExecutor` thread pools (where `EsExecutors.TaskTrackingConfig#trackExecutionTime` is true).

The metric is a double gauge indicating what fraction (in [0.0, 1.0]) of the maximum possible execution time was utilised over the polling interval.

It's calculated as actualTaskExecutionTime / maximumTaskExecutionTime, so effectively a "mean" value. The metric interval is 60s so brief spikes won't be apparent in the measure, but the initial goal is to use it to detect hot-spotting so the 60s average will probably suffice.

Relates ES-10530
This commit is contained in:
Nick Tindall 2025-04-17 11:49:30 +10:00 committed by GitHub
parent e53d3ff64b
commit 270ca0a80a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 316 additions and 33 deletions

View file

@ -0,0 +1,5 @@
pr: 120363
summary: Add thread pool utilization metric
area: "Infra/Metrics"
type: enhancement
issues: []

View file

@ -11,6 +11,7 @@ package org.elasticsearch.threadpool;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
@ -31,22 +32,18 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.function.Function.identity;
import static org.elasticsearch.common.util.Maps.toUnmodifiableSortedMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.threadpool.ThreadPool.DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;
import static org.elasticsearch.threadpool.ThreadPool.WRITE_THREAD_POOLS_EWMA_ALPHA_SETTING;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.matchesRegex;
@ -165,37 +162,78 @@ public class SimpleThreadPoolIT extends ESIntegTestCase {
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));
tps[0].forEach(stats -> {
Map<String, Long> threadPoolStats = List.of(
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, stats.completed()),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, 0L),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, 0L),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, (long) stats.largest()),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE, 0L)
).stream().collect(toUnmodifiableSortedMap(e -> stats.name() + e.getKey(), Entry::getValue));
Function<String, List<Long>> measurementExtractor = name -> {
String metricName = ThreadPool.THREAD_POOL_METRIC_PREFIX + name;
assertThat(metricName, in(registeredMetrics));
List<Measurement> measurements = name.endsWith(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED)
? plugin.getLongAsyncCounterMeasurement(metricName)
: plugin.getLongGaugeMeasurement(metricName);
return measurements.stream().map(Measurement::getLong).toList();
};
Map<String, List<Long>> measurements = threadPoolStats.keySet()
.stream()
.collect(toUnmodifiableSortedMap(identity(), measurementExtractor));
logger.info("Stats of `{}`: {}", stats.name(), threadPoolStats);
logger.info("Measurements of `{}`: {}", stats.name(), measurements);
threadPoolStats.forEach(
(metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(greaterThanOrEqualTo(value))))
Map<String, MetricDefinition<?>> metricDefinitions = Map.of(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
new MetricDefinition<>(stats.completed(), TestTelemetryPlugin::getLongAsyncCounterMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
new MetricDefinition<>((long) stats.largest(), TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong),
ThreadPool.THREAD_POOL_METRIC_NAME_QUEUE,
new MetricDefinition<>(0L, TestTelemetryPlugin::getLongGaugeMeasurement, Measurement::getLong)
);
// TaskExecutionTimeTrackingEsThreadPoolExecutor also publishes a utilization metric
if (tp.executor(stats.name()) instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor) {
metricDefinitions = Maps.copyMapWithAddedEntry(
metricDefinitions,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
new MetricDefinition<>(0.0d, TestTelemetryPlugin::getDoubleGaugeMeasurement, Measurement::getDouble)
);
}
metricDefinitions = metricDefinitions.entrySet()
.stream()
.collect(Collectors.toUnmodifiableMap(e -> stats.name() + e.getKey(), Map.Entry::getValue));
logger.info(
"Measurements of `{}`: {}",
stats.name(),
metricDefinitions.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getMeasurements(plugin, e.getKey())))
);
// Validate all metrics
metricDefinitions.forEach((name, md) -> md.assertValid(plugin, name));
});
}
private static class MetricDefinition<T extends Comparable<T>> {
private final T minimumValue;
private final BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor;
private final Function<Measurement, T> valueExtractor;
MetricDefinition(
T minimumValue,
BiFunction<TestTelemetryPlugin, String, List<Measurement>> metricExtractor,
Function<Measurement, T> valueExtractor
) {
this.minimumValue = minimumValue;
this.metricExtractor = metricExtractor;
this.valueExtractor = valueExtractor;
}
public List<T> getMeasurements(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
return metricExtractor.apply(testTelemetryPlugin, ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix)
.stream()
.map(valueExtractor)
.toList();
}
public void assertValid(TestTelemetryPlugin testTelemetryPlugin, String metricSuffix) {
List<T> metrics = getMeasurements(testTelemetryPlugin, metricSuffix);
assertThat(
ThreadPool.THREAD_POOL_METRIC_PREFIX + metricSuffix + " is populated",
metrics,
contains(greaterThanOrEqualTo(minimumValue))
);
}
}
public void testWriteThreadpoolEwmaAlphaSetting() {
Settings settings = Settings.EMPTY;
var ewmaAlpha = DEFAULT_INDEX_AUTOSCALING_EWMA_ALPHA;

View file

@ -33,6 +33,8 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
private final boolean trackOngoingTasks;
// The set of currently running tasks and the timestamp of when they started execution in the Executor.
private final Map<Runnable, Long> ongoingTasks = new ConcurrentHashMap<>();
private volatile long lastPollTime = System.nanoTime();
private volatile long lastTotalExecutionTime = 0;
TaskExecutionTimeTrackingEsThreadPoolExecutor(
String name,
@ -89,6 +91,26 @@ public final class TaskExecutionTimeTrackingEsThreadPoolExecutor extends EsThrea
return getQueue().size();
}
/**
* Returns the fraction of the maximum possible thread time that was actually used since the last time
* this method was called.
*
* @return the utilization as a fraction, in the range [0, 1]
*/
public double pollUtilization() {
final long currentTotalExecutionTimeNanos = totalExecutionTime.sum();
final long currentPollTimeNanos = System.nanoTime();
final long totalExecutionTimeSinceLastPollNanos = currentTotalExecutionTimeNanos - lastTotalExecutionTime;
final long timeSinceLastPoll = currentPollTimeNanos - lastPollTime;
final long maximumExecutionTimeSinceLastPollNanos = timeSinceLastPoll * getMaximumPoolSize();
final double utilizationSinceLastPoll = (double) totalExecutionTimeSinceLastPollNanos / maximumExecutionTimeSinceLastPollNanos;
lastTotalExecutionTime = currentTotalExecutionTimeNanos;
lastPollTime = currentPollTimeNanos;
return utilizationSinceLastPoll;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (trackOngoingTasks) {

View file

@ -24,12 +24,14 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionHandler;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.ReportingService;
import org.elasticsearch.telemetry.metric.DoubleWithAttributes;
import org.elasticsearch.telemetry.metric.Instrument;
import org.elasticsearch.telemetry.metric.LongAsyncCounter;
import org.elasticsearch.telemetry.metric.LongGauge;
@ -149,6 +151,7 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
public static final String THREAD_POOL_METRIC_NAME_CURRENT = ".threads.count.current";
public static final String THREAD_POOL_METRIC_NAME_QUEUE = ".threads.queue.size";
public static final String THREAD_POOL_METRIC_NAME_ACTIVE = ".threads.active.current";
public static final String THREAD_POOL_METRIC_NAME_UTILIZATION = ".threads.utilization.current";
public static final String THREAD_POOL_METRIC_NAME_LARGEST = ".threads.largest.current";
public static final String THREAD_POOL_METRIC_NAME_REJECTED = ".threads.rejected.total";
@ -374,6 +377,17 @@ public class ThreadPool implements ReportingService<ThreadPoolInfo>, Scheduler,
if (rejectedExecutionHandler instanceof EsRejectedExecutionHandler handler) {
handler.registerCounter(meterRegistry, prefix + THREAD_POOL_METRIC_NAME_REJECTED, name);
}
if (threadPoolExecutor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor timeTrackingExecutor) {
instruments.add(
meterRegistry.registerDoubleGauge(
prefix + THREAD_POOL_METRIC_NAME_UTILIZATION,
"fraction of maximum thread time utilized for " + name,
"fraction",
() -> new DoubleWithAttributes(timeTrackingExecutor.pollUtilization(), at)
)
);
}
}
return instruments;
}

View file

@ -17,15 +17,28 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.util.concurrent.TaskExecutionTimeTrackingEsThreadPoolExecutor;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.telemetry.InstrumentType;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.RecordingMeterRegistry;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders;
import org.hamcrest.Matcher;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DEFAULT;
import static org.elasticsearch.common.util.concurrent.EsExecutors.TaskTrackingConfig.DO_NOT_TRACK;
@ -35,8 +48,10 @@ import static org.elasticsearch.threadpool.ThreadPool.assertCurrentMethodIsNotCa
import static org.elasticsearch.threadpool.ThreadPool.getMaxSnapshotThreadPoolSize;
import static org.elasticsearch.threadpool.ThreadPool.halfAllocatedProcessorsMaxFive;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
public class ThreadPoolTests extends ESTestCase {
@ -473,6 +488,195 @@ public class ThreadPoolTests extends ESTestCase {
}
}
public void testDetailedUtilizationMetric() throws Exception {
final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders();
final ThreadPool threadPool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(),
meterRegistry,
builtInExecutorBuilders
);
try {
// write thread pool is tracked
final String threadPoolName = ThreadPool.Names.WRITE;
final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName);
final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName);
final TaskExecutionTimeTrackingEsThreadPoolExecutor executor = asInstanceOf(
TaskExecutionTimeTrackingEsThreadPoolExecutor.class,
threadPool.executor(threadPoolName)
);
final long beforePreviousCollectNanos = System.nanoTime();
meterRegistry.getRecorder().collect();
final long afterPreviousCollectNanos = System.nanoTime();
metricAsserter.assertLatestMetricValueMatches(
InstrumentType.DOUBLE_GAUGE,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
Measurement::getDouble,
equalTo(0.0d)
);
final AtomicLong minimumDurationNanos = new AtomicLong(Long.MAX_VALUE);
final long beforeStartNanos = System.nanoTime();
final CyclicBarrier barrier = new CyclicBarrier(2);
Future<?> future = executor.submit(() -> {
long innerStartTimeNanos = System.nanoTime();
safeSleep(100);
safeAwait(barrier);
minimumDurationNanos.set(System.nanoTime() - innerStartTimeNanos);
});
safeAwait(barrier);
safeGet(future);
final long maxDurationNanos = System.nanoTime() - beforeStartNanos;
// Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to run
assertBusy(() -> assertThat(executor.getTotalTaskExecutionTime(), greaterThan(0L)));
final long beforeMetricsCollectedNanos = System.nanoTime();
meterRegistry.getRecorder().collect();
final long afterMetricsCollectedNanos = System.nanoTime();
// Calculate upper bound on utilisation metric
final long minimumPollIntervalNanos = beforeMetricsCollectedNanos - afterPreviousCollectNanos;
final long minimumMaxExecutionTimeNanos = minimumPollIntervalNanos * threadPoolInfo.getMax();
final double maximumUtilization = (double) maxDurationNanos / minimumMaxExecutionTimeNanos;
// Calculate lower bound on utilisation metric
final long maximumPollIntervalNanos = afterMetricsCollectedNanos - beforePreviousCollectNanos;
final long maximumMaxExecutionTimeNanos = maximumPollIntervalNanos * threadPoolInfo.getMax();
final double minimumUtilization = (double) minimumDurationNanos.get() / maximumMaxExecutionTimeNanos;
logger.info("Utilization must be in [{}, {}]", minimumUtilization, maximumUtilization);
Matcher<Double> matcher = allOf(greaterThan(minimumUtilization), lessThan(maximumUtilization));
metricAsserter.assertLatestMetricValueMatches(
InstrumentType.DOUBLE_GAUGE,
ThreadPool.THREAD_POOL_METRIC_NAME_UTILIZATION,
Measurement::getDouble,
matcher
);
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
public void testThreadCountMetrics() throws Exception {
final RecordingMeterRegistry meterRegistry = new RecordingMeterRegistry();
final BuiltInExecutorBuilders builtInExecutorBuilders = new DefaultBuiltInExecutorBuilders();
final ThreadPool threadPool = new ThreadPool(
Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), "test").build(),
meterRegistry,
builtInExecutorBuilders
);
try {
final String threadPoolName = randomFrom(
ThreadPool.Names.GENERIC,
ThreadPool.Names.ANALYZE,
ThreadPool.Names.WRITE,
ThreadPool.Names.SEARCH
);
final ThreadPool.Info threadPoolInfo = threadPool.info(threadPoolName);
final MetricAsserter metricAsserter = new MetricAsserter(meterRegistry, threadPoolName);
meterRegistry.getRecorder().collect();
metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, InstrumentType.LONG_GAUGE, equalTo(0L));
metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT, InstrumentType.LONG_GAUGE, equalTo(0L));
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
InstrumentType.LONG_ASYNC_COUNTER,
equalTo(0L)
);
metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST, InstrumentType.LONG_GAUGE, equalTo(0L));
final int numThreads = randomIntBetween(1, Math.min(10, threadPoolInfo.getMax()));
final CyclicBarrier barrier = new CyclicBarrier(numThreads + 1);
final List<Future<?>> futures = new ArrayList<>();
final EsThreadPoolExecutor executor = asInstanceOf(EsThreadPoolExecutor.class, threadPool.executor(threadPoolName));
for (int i = 0; i < numThreads; i++) {
futures.add(executor.submit(() -> {
safeAwait(barrier);
safeAwait(barrier);
}));
}
// Wait for all threads to start
safeAwait(barrier);
meterRegistry.getRecorder().collect();
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE,
InstrumentType.LONG_GAUGE,
equalTo((long) numThreads)
);
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
InstrumentType.LONG_GAUGE,
equalTo((long) numThreads)
);
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
InstrumentType.LONG_ASYNC_COUNTER,
equalTo(0L)
);
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
InstrumentType.LONG_GAUGE,
equalTo((long) numThreads)
);
// Let all threads complete
safeAwait(barrier);
futures.forEach(ESTestCase::safeGet);
// Wait for TaskExecutionTimeTrackingEsThreadPoolExecutor#afterExecute to complete
assertBusy(() -> assertThat(executor.getActiveCount(), equalTo(0)));
meterRegistry.getRecorder().collect();
metricAsserter.assertLatestLongValueMatches(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, InstrumentType.LONG_GAUGE, equalTo(0L));
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_CURRENT,
InstrumentType.LONG_GAUGE,
equalTo((long) numThreads)
);
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED,
InstrumentType.LONG_ASYNC_COUNTER,
equalTo((long) numThreads)
);
metricAsserter.assertLatestLongValueMatches(
ThreadPool.THREAD_POOL_METRIC_NAME_LARGEST,
InstrumentType.LONG_GAUGE,
equalTo((long) numThreads)
);
} finally {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
private static class MetricAsserter {
private final RecordingMeterRegistry meterRegistry;
private final String threadPoolName;
MetricAsserter(RecordingMeterRegistry meterRegistry, String threadPoolName) {
this.meterRegistry = meterRegistry;
this.threadPoolName = threadPoolName;
}
void assertLatestLongValueMatches(String metricName, InstrumentType instrumentType, Matcher<Long> matcher) {
assertLatestMetricValueMatches(instrumentType, metricName, Measurement::getLong, matcher);
}
<T> void assertLatestMetricValueMatches(
InstrumentType instrumentType,
String name,
Function<Measurement, T> valueExtractor,
Matcher<T> matcher
) {
List<Measurement> measurements = meterRegistry.getRecorder()
.getMeasurements(instrumentType, ThreadPool.THREAD_POOL_METRIC_PREFIX + threadPoolName + name);
assertFalse(name + " has no measurements", measurements.isEmpty());
assertThat(valueExtractor.apply(measurements.getLast()), matcher);
}
}
private static AbstractRunnable forceExecution(AbstractRunnable delegate) {
return new AbstractRunnable() {
@Override