Bring over merge scheduler features from stateless (#128155)

Relates to an effort to consolidate the stateless merge scheduler with the current (stateful) merge scheduler from main ES. This PR brings over features required to maintain parity with the stateless scheduler. Specifically, a few methods are added for the stateless scheduler to override:

Adds an overridable method shouldSkipMerge to test for skipping merges
Adds 2 additional lifecycle callbacks to the scheduler for when a merge is enqueued and when a merge is executed or aborted. This is used by stateless to track active + queued merges per-shard
Adds overridable methods for enabling/disabling IO/thread/merge count throttling

Other functionality required by the stateless merge scheduler can use the existing callbacks from the stateful scheduler:

beforeMerge can be overridden to prewarm
afterMerge can be overridden to refresh after big merges

Relates ES-10264
---------

Co-authored-by: elasticsearchmachine <infra-root+elasticsearchmachine@elastic.co>
This commit is contained in:
Brian Rothermich 2025-05-22 16:52:53 -04:00 committed by GitHub
parent 4c6b141736
commit 1a641e5d65
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 85 additions and 7 deletions

View file

@ -67,6 +67,14 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
private volatile boolean closed = false;
private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider;
/**
* Creates a thread-pool-based merge scheduler that runs merges in a thread pool.
*
* @param shardId the shard id associated with this merge scheduler
* @param indexSettings used to obtain the {@link MergeSchedulerConfig}
* @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler
* @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take
*/
public ThreadPoolMergeScheduler(
ShardId shardId,
IndexSettings indexSettings,
@ -146,6 +154,16 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
*/
protected void afterMerge(OnGoingMerge merge) {}
/**
* A callback allowing for custom logic when a merge is queued.
*/
protected void mergeQueued(OnGoingMerge merge) {}
/**
* A callback allowing for custom logic after a merge is executed or aborted.
*/
protected void mergeExecutedOrAborted(OnGoingMerge merge) {}
/**
* A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up.
*/
@ -157,6 +175,34 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
*/
protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {}
/**
* Returns true if scheduled merges should be skipped (aborted)
*/
protected boolean shouldSkipMerge() {
return false;
}
/**
* Returns true if IO-throttling is enabled
*/
protected boolean isAutoThrottle() {
return config.isAutoThrottle();
}
/**
* Returns the maximum number of active merges before being throttled
*/
protected int getMaxMergeCount() {
return config.getMaxMergeCount();
}
/**
* Returns the maximum number of threads running merges before being throttled
*/
protected int getMaxThreadCount() {
return config.getMaxThreadCount();
}
/**
* A callback for exceptions thrown while merging.
*/
@ -168,6 +214,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) {
try {
MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger);
mergeQueued(mergeTask.onGoingMerge);
return threadPoolMergeExecutorService.submitMergeTask(mergeTask);
} finally {
checkMergeTaskThrottling();
@ -183,7 +230,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
return new MergeTask(
mergeSource,
merge,
isAutoThrottle && config.isAutoThrottle(),
isAutoThrottle && isAutoThrottle(),
"Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId,
estimateMergeMemoryBytes
);
@ -193,7 +240,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
long submittedMergesCount = submittedMergeTaskCount.get();
long doneMergesCount = doneMergeTaskCount.get();
int runningMergesCount = runningMergeTasks.size();
int configuredMaxMergeCount = config.getMaxMergeCount();
int configuredMaxMergeCount = getMaxMergeCount();
// both currently running and enqueued merge tasks are considered "active" for throttling purposes
int activeMerges = (int) (submittedMergesCount - doneMergesCount);
if (activeMerges > configuredMaxMergeCount
@ -223,7 +270,12 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
if (closed) {
// do not run or backlog tasks when closing the merge scheduler, instead abort them
return Schedule.ABORT;
} else if (runningMergeTasks.size() < config.getMaxThreadCount()) {
} else if (shouldSkipMerge()) {
if (verbose()) {
message(String.format(Locale.ROOT, "skipping merge task %s", mergeTask));
}
return Schedule.ABORT;
} else if (runningMergeTasks.size() < getMaxThreadCount()) {
boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null;
assert added : "starting merge task [" + mergeTask + "] registered as already running";
return Schedule.RUN;
@ -243,8 +295,9 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
maybeSignalAllMergesDoneAfterClose();
}
private void mergeTaskDone() {
private void mergeTaskDone(OnGoingMerge merge) {
doneMergeTaskCount.incrementAndGet();
mergeExecutedOrAborted(merge);
checkMergeTaskThrottling();
}
@ -255,7 +308,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
}
private synchronized void enqueueBackloggedTasks() {
int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - runningMergeTasks.size();
int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size();
// enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back
while (closed || maxBackloggedTasksToEnqueue-- > 0) {
MergeTask backloggedMergeTask = backloggedMergeTasks.poll();
@ -408,7 +461,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
try {
mergeTaskFinishedRunning(this);
} finally {
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
try {
// kick-off any follow-up merge
@ -452,7 +505,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics
if (verbose()) {
message(String.format(Locale.ROOT, "merge task %s end abort", this));
}
mergeTaskDone();
mergeTaskDone(onGoingMerge);
}
}

View file

@ -662,6 +662,31 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
}
}
public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class);
// build a scheduler that always returns true for shouldSkipMerge
ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler(
new ShardId("index", "_na_", 1),
IndexSettingsModule.newIndexSettings("index", Settings.builder().build()),
threadPoolMergeExecutorService,
merge -> 0
) {
@Override
protected boolean shouldSkipMerge() {
return true;
}
};
MergeSource mergeSource = mock(MergeSource.class);
OneMerge oneMerge = mock(OneMerge.class);
when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L)));
when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress());
when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null);
MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values()));
// verify that calling schedule on the merge task indicates the merge should be aborted
Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask);
assertThat(schedule, is(Schedule.ABORT));
}
private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) {
return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt()));
}