Bring over merge metrics from stateless (#128617)

Relates to an effort to combine the merge schedulers from stateless and stateful. The stateless merge scheduler has MergeMetrics that we want in both stateless and stateful. This PR copies over the merge metrics from the stateless merge scheduler into the combined merge scheduler.

Relates ES-9687
This commit is contained in:
Brian Rothermich 2025-06-23 19:42:01 -04:00 committed by GitHub
parent a671505c8a
commit 0f39ff586c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 354 additions and 72 deletions

View file

@ -88,12 +88,14 @@ public class ThreadPoolMergeSchedulerStressTestIT extends ESSingleNodeTestCase {
protected ElasticsearchMergeScheduler createMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
ElasticsearchMergeScheduler mergeScheduler = super.createMergeScheduler(
shardId,
indexSettings,
threadPoolMergeExecutorService
threadPoolMergeExecutorService,
mergeMetrics
);
assertThat(mergeScheduler, instanceOf(ThreadPoolMergeScheduler.class));
// assert there is a single merge executor service for all shards

View file

@ -52,6 +52,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.MapperMetrics;
@ -680,7 +681,8 @@ public class IndexShardIT extends ESSingleNodeTestCase {
null,
MapperMetrics.NOOP,
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
}

View file

@ -88,7 +88,8 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
}

View file

@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.query.IndexQueryCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperMetrics;
@ -181,6 +182,7 @@ public final class IndexModule {
private final MapperMetrics mapperMetrics;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;
private final MergeMetrics mergeMetrics;
/**
* Construct the index module for the index with the specified index settings. The index module contains extension points for plugins
@ -190,6 +192,7 @@ public final class IndexModule {
* @param analysisRegistry the analysis registry
* @param engineFactory the engine factory
* @param directoryFactories the available store types
* @param mergeMetrics
*/
public IndexModule(
final IndexSettings indexSettings,
@ -203,7 +206,8 @@ public final class IndexModule {
final MapperMetrics mapperMetrics,
final List<SearchOperationListener> searchOperationListeners,
final IndexingStatsSettings indexingStatsSettings,
final SearchStatsSettings searchStatsSettings
final SearchStatsSettings searchStatsSettings,
final MergeMetrics mergeMetrics
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
@ -220,6 +224,7 @@ public final class IndexModule {
this.mapperMetrics = mapperMetrics;
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
this.mergeMetrics = mergeMetrics;
}
/**
@ -557,7 +562,8 @@ public final class IndexModule {
mapperMetrics,
queryRewriteInterceptor,
indexingStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
success = true;
return indexService;

View file

@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.fielddata.FieldDataContext;
import org.elasticsearch.index.fielddata.IndexFieldData;
@ -172,6 +173,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final QueryRewriteInterceptor queryRewriteInterceptor;
private final IndexingStatsSettings indexingStatsSettings;
private final SearchStatsSettings searchStatsSettings;
private final MergeMetrics mergeMetrics;
@SuppressWarnings("this-escape")
public IndexService(
@ -210,7 +212,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
MapperMetrics mapperMetrics,
QueryRewriteInterceptor queryRewriteInterceptor,
IndexingStatsSettings indexingStatsSettings,
SearchStatsSettings searchStatsSettings
SearchStatsSettings searchStatsSettings,
MergeMetrics mergeMetrics
) {
super(indexSettings);
assert indexCreationContext != IndexCreationContext.RELOAD_ANALYZERS
@ -297,6 +300,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
}
this.indexingStatsSettings = indexingStatsSettings;
this.searchStatsSettings = searchStatsSettings;
this.mergeMetrics = mergeMetrics;
updateFsyncTaskIfNecessary();
}
@ -588,7 +592,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
indexCommitListener,
mapperMetrics,
indexingStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);

View file

@ -149,6 +149,8 @@ public final class EngineConfig {
private final EngineResetLock engineResetLock;
private final MergeMetrics mergeMetrics;
/**
* Creates a new {@link org.elasticsearch.index.engine.EngineConfig}
*/
@ -181,7 +183,8 @@ public final class EngineConfig {
Engine.IndexCommitListener indexCommitListener,
boolean promotableToPrimary,
MapperService mapperService,
EngineResetLock engineResetLock
EngineResetLock engineResetLock,
MergeMetrics mergeMetrics
) {
this.shardId = shardId;
this.indexSettings = indexSettings;
@ -229,6 +232,7 @@ public final class EngineConfig {
// always use compound on flush - reduces # of file-handles on refresh
this.useCompoundFile = indexSettings.getSettings().getAsBoolean(USE_COMPOUND_FILE, true);
this.engineResetLock = engineResetLock;
this.mergeMetrics = mergeMetrics;
}
/**
@ -477,4 +481,8 @@ public final class EngineConfig {
public EngineResetLock getEngineResetLock() {
return engineResetLock;
}
public MergeMetrics getMergeMetrics() {
return mergeMetrics;
}
}

View file

@ -257,7 +257,8 @@ public class InternalEngine extends Engine {
mergeScheduler = createMergeScheduler(
engineConfig.getShardId(),
engineConfig.getIndexSettings(),
engineConfig.getThreadPoolMergeExecutorService()
engineConfig.getThreadPoolMergeExecutorService(),
engineConfig.getMergeMetrics()
);
scheduler = mergeScheduler.getMergeScheduler();
throttle = new IndexThrottle(pauseIndexingOnThrottle);
@ -2908,10 +2909,11 @@ public class InternalEngine extends Engine {
protected ElasticsearchMergeScheduler createMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService
@Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
if (threadPoolMergeExecutorService != null) {
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService);
return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService, mergeMetrics);
} else {
return new EngineConcurrentMergeScheduler(shardId, indexSettings);
}
@ -2921,9 +2923,10 @@ public class InternalEngine extends Engine {
EngineThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMetrics mergeMetrics
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes);
super(shardId, indexSettings, threadPoolMergeExecutorService, InternalEngine.this::estimateMergeBytes, mergeMetrics);
}
@Override

View file

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.MergePolicy;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.LongHistogram;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import java.util.concurrent.atomic.AtomicLong;
public class MergeMetrics {
public static final String MERGE_SEGMENTS_SIZE = "es.merge.segments.size";
public static final String MERGE_DOCS_TOTAL = "es.merge.docs.total";
public static final String MERGE_SEGMENTS_QUEUED_USAGE = "es.merge.segments.queued.usage";
public static final String MERGE_SEGMENTS_RUNNING_USAGE = "es.merge.segments.running.usage";
public static final String MERGE_SEGMENTS_MERGED_SIZE = "es.merge.segments.merged.size";
public static final String MERGE_QUEUED_ESTIMATED_MEMORY_SIZE = "es.merge.segments.memory.size";
public static final String MERGE_TIME_IN_SECONDS = "es.merge.time";
public static MergeMetrics NOOP = new MergeMetrics(TelemetryProvider.NOOP.getMeterRegistry());
private final LongCounter mergeSizeInBytes;
private final LongCounter mergeMergedSegmentSizeInBytes;
private final LongCounter mergeNumDocs;
private final LongHistogram mergeTimeInSeconds;
private final AtomicLong runningMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedMergeSizeInBytes = new AtomicLong();
private final AtomicLong queuedEstimatedMergeMemoryInBytes = new AtomicLong();
public MergeMetrics(MeterRegistry meterRegistry) {
mergeSizeInBytes = meterRegistry.registerLongCounter(MERGE_SEGMENTS_SIZE, "Total size of segments merged", "bytes");
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_QUEUED_USAGE,
"Total usage of segments queued to be merged",
"bytes",
() -> new LongWithAttributes(queuedMergeSizeInBytes.get())
);
meterRegistry.registerLongGauge(
MERGE_SEGMENTS_RUNNING_USAGE,
"Total usage of segments currently being merged",
"bytes",
() -> new LongWithAttributes(runningMergeSizeInBytes.get())
);
mergeMergedSegmentSizeInBytes = meterRegistry.registerLongCounter(
MERGE_SEGMENTS_MERGED_SIZE,
"Total size of the new merged segments",
"bytes"
);
mergeNumDocs = meterRegistry.registerLongCounter(MERGE_DOCS_TOTAL, "Total number of documents merged", "documents");
mergeTimeInSeconds = meterRegistry.registerLongHistogram(MERGE_TIME_IN_SECONDS, "Merge time in seconds", "seconds");
meterRegistry.registerLongGauge(
MERGE_QUEUED_ESTIMATED_MEMORY_SIZE,
"Estimated memory usage for queued merges",
"bytes",
() -> new LongWithAttributes(queuedEstimatedMergeMemoryInBytes.get())
);
}
public void incrementQueuedMergeBytes(OnGoingMerge currentMerge, long estimatedMemorySize) {
queuedMergeSizeInBytes.getAndAdd(currentMerge.getTotalBytesSize());
queuedEstimatedMergeMemoryInBytes.getAndAdd(estimatedMemorySize);
}
public void moveQueuedMergeBytesToRunning(OnGoingMerge currentMerge, long estimatedMemorySize) {
long totalSize = currentMerge.getTotalBytesSize();
queuedMergeSizeInBytes.getAndAdd(-totalSize);
runningMergeSizeInBytes.getAndAdd(totalSize);
queuedEstimatedMergeMemoryInBytes.getAndAdd(-estimatedMemorySize);
}
public void decrementRunningMergeBytes(OnGoingMerge currentMerge) {
runningMergeSizeInBytes.getAndAdd(-currentMerge.getTotalBytesSize());
}
public void markMergeMetrics(MergePolicy.OneMerge currentMerge, long mergedSegmentSize, long tookMillis) {
mergeSizeInBytes.incrementBy(currentMerge.totalBytesSize());
mergeMergedSegmentSizeInBytes.incrementBy(mergedSegmentSize);
mergeNumDocs.incrementBy(currentMerge.totalNumDocs());
mergeTimeInSeconds.record(tookMillis / 1000);
}
public long getQueuedMergeSizeInBytes() {
return queuedMergeSizeInBytes.get();
}
public long getRunningMergeSizeInBytes() {
return runningMergeSizeInBytes.get();
}
}

View file

@ -64,6 +64,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private final MergeSchedulerConfig config;
protected final Logger logger;
private final MergeTracking mergeTracking;
private final MergeMetrics mergeMetrics;
private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService;
private final PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
16,
@ -86,16 +87,19 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
* @param mergeMetrics metrics related to merges
*/
public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService,
MergeMemoryEstimateProvider mergeMemoryEstimateProvider
MergeMemoryEstimateProvider mergeMemoryEstimateProvider,
MergeMetrics mergeMetrics
) {
this.shardId = shardId;
this.config = indexSettings.getMergeSchedulerConfig();
this.logger = Loggers.getLogger(getClass(), shardId);
this.mergeMetrics = mergeMetrics;
this.mergeTracking = new MergeTracking(
logger,
() -> this.config.isAutoThrottle()
@ -226,6 +230,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
try {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
mergeMetrics.incrementQueuedMergeBytes(mergeTask.getOnGoingMerge(), mergeTask.getMergeMemoryEstimateBytes());
mergeQueued(mergeTask.onGoingMerge);
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
} finally {
@ -310,6 +315,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private void mergeTaskDone(OnGoingMerge merge) {
doneMergeTaskCount.incrementAndGet();
mergeMetrics.decrementRunningMergeBytes(merge);
mergeExecutedOrAborted(merge);
checkMergeTaskThrottling();
}
@ -437,6 +443,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
assert hasStartedRunning() == false;
assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge())
: "runNowOrBacklog must be invoked before actually running the merge task";
boolean success = false;
try {
beforeMerge(onGoingMerge);
try {
@ -444,11 +451,13 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
throw new IllegalStateException("The merge task is already started or aborted");
}
mergeTracking.mergeStarted(onGoingMerge);
mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes);
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s start", this));
}
try {
doMerge(mergeSource, onGoingMerge.getMerge());
success = onGoingMerge.getMerge().isAborted() == false;
if (verbose()) {
message(
String.format(
@ -468,6 +477,10 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
}
} finally {
long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get());
if (success) {
long newSegmentSize = getNewSegmentSize(onGoingMerge.getMerge());
mergeMetrics.markMergeMetrics(onGoingMerge.getMerge(), newSegmentSize, tookMS);
}
mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS);
}
} finally {
@ -508,6 +521,8 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
// {@code IndexWriter} checks the abort flag internally, while running the merge.
// The segments of an aborted merge become available to subsequent merges.
onGoingMerge.getMerge().setAborted();
mergeMetrics.moveQueuedMergeBytesToRunning(onGoingMerge, mergeMemoryEstimateBytes);
try {
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s start abort", this));
@ -554,6 +569,21 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
return onGoingMerge;
}
private static long getNewSegmentSize(MergePolicy.OneMerge currentMerge) {
try {
return currentMerge.getMergeInfo() != null ? currentMerge.getMergeInfo().sizeInBytes() : currentMerge.estimatedMergeBytes;
} catch (IOException e) {
// For stateless only: It is (rarely) possible that the merged segment could be merged away by the IndexWriter prior to
// reaching this point. Once the IW creates the new segment, it could be exposed to be included in a new merge. That
// merge can be executed concurrently if more than 1 merge threads are configured. That new merge allows this IW to
// delete segment created by this merge. Although the files may still be available in the object store for executing
// searches, the IndexDirectory will no longer have references to the underlying segment files and will throw file not
// found if we try to read them. In this case, we will ignore that exception (which would otherwise fail the shard) and
// use the originally estimated merge size for metrics.
return currentMerge.estimatedMergeBytes;
}
}
@Override
public String toString() {
return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : "");

View file

@ -91,6 +91,7 @@ import org.elasticsearch.index.engine.Engine.GetResult;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.engine.SafeCommitInfo;
@ -269,6 +270,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final MeanMetric externalRefreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
private final CounterMetric periodicFlushMetric = new CounterMetric();
private final MergeMetrics mergeMetrics;
private final ShardEventListener shardEventListener = new ShardEventListener();
@ -343,7 +345,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final Engine.IndexCommitListener indexCommitListener,
final MapperMetrics mapperMetrics,
final IndexingStatsSettings indexingStatsSettings,
final SearchStatsSettings searchStatsSettings
final SearchStatsSettings searchStatsSettings,
final MergeMetrics mergeMetrics
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
@ -432,6 +435,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.refreshFieldHasValueListener = new RefreshFieldHasValueListener();
this.relativeTimeInNanosSupplier = relativeTimeInNanosSupplier;
this.indexCommitListener = indexCommitListener;
this.mergeMetrics = mergeMetrics;
}
public ThreadPool getThreadPool() {
@ -3755,7 +3759,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexCommitListener,
routingEntry().isPromotableToPrimary(),
mapperService(),
engineResetLock
engineResetLock,
mergeMetrics
);
}

View file

@ -98,6 +98,7 @@ import org.elasticsearch.index.cache.request.ShardRequestCache;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.NoOpEngine;
import org.elasticsearch.index.engine.ReadOnlyEngine;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
@ -283,6 +284,7 @@ public class IndicesService extends AbstractLifecycleComponent
final SlowLogFieldProvider slowLogFieldProvider; // pkg-private for testingå
private final IndexingStatsSettings indexStatsSettings;
private final SearchStatsSettings searchStatsSettings;
private final MergeMetrics mergeMetrics;
@Override
protected void doStart() {
@ -358,6 +360,7 @@ public class IndicesService extends AbstractLifecycleComponent
this.requestCacheKeyDifferentiator = builder.requestCacheKeyDifferentiator;
this.queryRewriteInterceptor = builder.queryRewriteInterceptor;
this.mapperMetrics = builder.mapperMetrics;
this.mergeMetrics = builder.mergeMetrics;
// doClose() is called when shutting down a node, yet there might still be ongoing requests
// that we need to wait for before closing some resources such as the caches. In order to
// avoid closing these resources while ongoing requests are still being processed, we use a
@ -801,7 +804,8 @@ public class IndicesService extends AbstractLifecycleComponent
mapperMetrics,
searchOperationListeners,
indexStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
@ -900,7 +904,8 @@ public class IndicesService extends AbstractLifecycleComponent
mapperMetrics,
searchOperationListeners,
indexStatsSettings,
searchStatsSettings
searchStatsSettings,
mergeMetrics
);
pluginsService.forEach(p -> p.onIndexModule(indexModule));
return indexModule.newIndexMapperService(clusterService, parserConfig, mapperRegistry, scriptService);

View file

@ -27,6 +27,7 @@ import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
import org.elasticsearch.index.shard.SearchOperationListener;
@ -79,6 +80,7 @@ public class IndicesServiceBuilder {
@Nullable
CheckedBiConsumer<ShardSearchRequest, StreamOutput, IOException> requestCacheKeyDifferentiator;
MapperMetrics mapperMetrics;
MergeMetrics mergeMetrics;
List<SearchOperationListener> searchOperationListener = List.of();
QueryRewriteInterceptor queryRewriteInterceptor = null;
SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() {
@ -206,6 +208,11 @@ public class IndicesServiceBuilder {
return this;
}
public IndicesServiceBuilder mergeMetrics(MergeMetrics mergeMetrics) {
this.mergeMetrics = mergeMetrics;
return this;
}
public List<SearchOperationListener> searchOperationListeners() {
return searchOperationListener;
}
@ -244,6 +251,7 @@ public class IndicesServiceBuilder {
Objects.requireNonNull(indexFoldersDeletionListeners);
Objects.requireNonNull(snapshotCommitSuppliers);
Objects.requireNonNull(mapperMetrics);
Objects.requireNonNull(mergeMetrics);
Objects.requireNonNull(searchOperationListener);
Objects.requireNonNull(slowLogFieldProvider);

View file

@ -116,6 +116,7 @@ import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
import org.elasticsearch.index.search.stats.ShardSearchPhaseAPMMetrics;
@ -806,6 +807,9 @@ class NodeConstruction {
threadPool::relativeTimeInMillis
);
MapperMetrics mapperMetrics = new MapperMetrics(sourceFieldMetrics);
MergeMetrics mergeMetrics = new MergeMetrics(telemetryProvider.getMeterRegistry());
final List<SearchOperationListener> searchOperationListeners = List.of(
new ShardSearchPhaseAPMMetrics(telemetryProvider.getMeterRegistry())
);
@ -894,6 +898,7 @@ class NodeConstruction {
.valuesSourceRegistry(searchModule.getValuesSourceRegistry())
.requestCacheKeyDifferentiator(searchModule.getRequestCacheKeyDifferentiator())
.mapperMetrics(mapperMetrics)
.mergeMetrics(mergeMetrics)
.searchOperationListeners(searchOperationListeners)
.slowLogFieldProvider(slowLogFieldProvider)
.build();
@ -1290,6 +1295,7 @@ class NodeConstruction {
b.bind(FailureStoreMetrics.class).toInstance(failureStoreMetrics);
b.bind(ShutdownPrepareService.class).toInstance(shutdownPrepareService);
b.bind(OnlinePrewarmingService.class).toInstance(onlinePrewarmingService);
b.bind(MergeMetrics.class).toInstance(mergeMetrics);
});
if (ReadinessService.enabled(environment)) {

View file

@ -60,6 +60,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
@ -256,7 +257,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
module.setReaderWrapper(s -> new Wrapper());
@ -286,7 +288,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
final IndexService indexService = newIndexService(module);
@ -314,7 +317,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
module.setDirectoryWrapper(new TestDirectoryWrapper());
@ -670,7 +674,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
final IndexService indexService = newIndexService(module);
@ -695,7 +700,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
final AtomicLong lastAcquiredPrimaryTerm = new AtomicLong();
@ -800,7 +806,8 @@ public class IndexModuleTests extends ESTestCase {
MapperMetrics.NOOP,
emptyList(),
new IndexingStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings())
new SearchStatsSettings(ClusterSettings.createBuiltInClusterSettings()),
MergeMetrics.NOOP
);
}

View file

@ -3635,7 +3635,8 @@ public class InternalEngineTests extends EngineTestCase {
null,
true,
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
@ -7243,7 +7244,8 @@ public class InternalEngineTests extends EngineTestCase {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());

View file

@ -46,6 +46,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@ -74,12 +75,14 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
nodeEnvironment = newNodeEnvironment(settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
.getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment);
var mergeMetrics = mock(MergeMetrics.class);
try (
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
mergeMetrics
)
) {
List<OneMerge> executedMergesList = new ArrayList<>();
@ -97,9 +100,19 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
return null;
}).when(mergeSource).merge(any(OneMerge.class));
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
// verify queued byte metric is recorded for each merge
verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong());
}
threadPoolTaskQueue.runAllTasks();
assertThat(executedMergesList.size(), is(mergeCount));
// verify metrics are reported for each merge
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong());
// assert merges are executed in ascending size order
for (int i = 1; i < mergeCount; i++) {
assertThat(
@ -113,6 +126,7 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
public void testSimpleMergeTaskBacklogging() {
int mergeExecutorThreadCount = randomIntBetween(1, 5);
var mergeMetrics = mock(MergeMetrics.class);
Settings mergeSchedulerSettings = Settings.builder()
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount)
.build();
@ -122,7 +136,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
mergeMetrics
);
// more merge tasks than merge threads
int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
@ -143,6 +158,9 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
}
assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount));
assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(mergeCount - mergeExecutorThreadCount));
// verify no metrics are recorded as no merges have been queued or executed through the merge scheduler
verifyNoInteractions(mergeMetrics);
}
public void testSimpleMergeTaskReEnqueueingBySize() {
@ -156,7 +174,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
);
// sort backlogged merges by size
PriorityQueue<MergeTask> backloggedMergeTasks = new PriorityQueue<>(
@ -388,7 +407,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
MergeSource mergeSource = mock(MergeSource.class);
@ -454,6 +474,7 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
// disable fs available disk space feature for this test
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s")
.build();
var mergeMetrics = mock(MergeMetrics.class);
nodeEnvironment = newNodeEnvironment(settings);
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
@ -465,7 +486,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
mergeMetrics
)
) {
// at least 1 extra merge than there are concurrently allowed
@ -485,7 +507,11 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
return null;
}).when(mergeSource).merge(any(OneMerge.class));
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
// verify queued byte metric is recorded for each merge
verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong());
}
for (int completedMergesCount = 0; completedMergesCount < mergeCount
- mergeSchedulerMaxThreadCount; completedMergesCount++) {
int finalCompletedMergesCount = completedMergesCount;
@ -530,6 +556,11 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
runMergeSemaphore.release();
}
assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone()));
// verify metrics are recorded for each merge
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
verify(mergeMetrics, times(mergeCount)).markMergeMetrics(any(), anyLong(), anyLong());
}
}
}
@ -553,7 +584,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", settings),
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
CountDownLatch mergeDoneLatch = new CountDownLatch(1);
@ -626,7 +658,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@ -656,7 +689,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
@ -673,7 +707,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
// merge submitted upon closing
@ -690,7 +725,8 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
new ShardId("index", "_na_", 1),
indexSettings,
threadPoolMergeExecutorService,
merge -> 0
merge -> 0,
MergeMetrics.NOOP
)
) {
// merge submitted upon closing
@ -705,29 +741,63 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
}
}
public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
// build a scheduler that always returns true for shouldSkipMerge
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
threadPoolMergeExecutorService,
merge -> 0
) {
@Override
protected boolean shouldSkipMerge() {
return true;
public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() throws IOException {
DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue();
Settings settings = Settings.builder()
// disable fs available disk space feature for this test
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "0s")
.build();
nodeEnvironment = newNodeEnvironment(settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests
.getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool(), settings, nodeEnvironment);
var mergeMetrics = mock(MergeMetrics.class);
try (
// build a scheduler that always returns true for shouldSkipMerge
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.EMPTY),
threadPoolMergeExecutorService,
merge -> 0,
mergeMetrics
) {
@Override
protected boolean shouldSkipMerge() {
return true;
}
}
};
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
// verify that calling schedule on the merge task indicates the merge should be aborted
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
assertThat(schedule, is(Schedule.ABORT));
) {
int mergeCount = randomIntBetween(2, 10);
for (int i = 0; i < mergeCount; i++) {
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
// create the merge task
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
// verify that calling schedule on the merge task indicates the merge should be aborted
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
assertThat(schedule, is(Schedule.ABORT));
// run the merge through the scheduler
threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values()));
// verify queued merge byte metrics are still recorded for each merge
verify(mergeMetrics, times(i + 1)).incrementQueuedMergeBytes(any(), anyLong());
}
// run all merges; they should all be aborted
threadPoolTaskQueue.runAllTasks();
// verify queued bytes metrics are moved to running and decremented
verify(mergeMetrics, times(mergeCount)).moveQueuedMergeBytesToRunning(any(), anyLong());
verify(mergeMetrics, times(mergeCount)).decrementRunningMergeBytes(any());
// verify we did not mark the merges as merged
verify(mergeMetrics, times(0)).markMergeMetrics(any(), anyLong(), anyLong());
}
}
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
@ -746,7 +816,7 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
IndexSettings indexSettings,
ThreadPoolMergeExecutorService threadPoolMergeExecutorService
) {
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0);
super(shardId, indexSettings, threadPoolMergeExecutorService, merge -> 0, MergeMetrics.NOOP);
}
@Override

View file

@ -5064,7 +5064,8 @@ public class IndexShardTests extends IndexShardTestCase {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
return new InternalEngine(configWithWarmer);
});
@ -5346,7 +5347,8 @@ public class IndexShardTests extends IndexShardTestCase {
config.getIndexCommitListener(),
config.isPromotableToPrimary(),
config.getMapperService(),
config.getEngineResetLock()
config.getEngineResetLock(),
config.getMergeMetrics()
);
lazyEngineConfig.set(engineConfigWithBlockingRefreshListener);
return new InternalEngine(engineConfigWithBlockingRefreshListener) {

View file

@ -43,6 +43,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.mapper.IdFieldMapper;
@ -175,7 +176,8 @@ public class RefreshListenersTests extends ESTestCase {
null,
true,
EngineTestCase.createMapperService(),
new EngineResetLock()
new EngineResetLock(),
MergeMetrics.NOOP
);
engine = new InternalEngine(config);
EngineTestCase.recoverFromTranslog(engine, (e, s) -> 0, Long.MAX_VALUE);

View file

@ -145,6 +145,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.MergeMetrics;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MapperRegistry;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
@ -2483,6 +2484,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
.client(client)
.metaStateService(new MetaStateService(nodeEnv, namedXContentRegistry))
.mapperMetrics(MapperMetrics.NOOP)
.mergeMetrics(MergeMetrics.NOOP)
.build();
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
snapshotShardsService = new SnapshotShardsService(