diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java index e54c8164c6ab..33ef06699c8c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -67,6 +67,14 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics private volatile boolean closed = false; private final MergeMemoryEstimateProvider mergeMemoryEstimateProvider; + /** + * Creates a thread-pool-based merge scheduler that runs merges in a thread pool. + * + * @param shardId the shard id associated with this merge scheduler + * @param indexSettings used to obtain the {@link MergeSchedulerConfig} + * @param threadPoolMergeExecutorService the executor service used to execute merge tasks from this scheduler + * @param mergeMemoryEstimateProvider provides an estimate for how much memory a merge will take + */ public ThreadPoolMergeScheduler( ShardId shardId, IndexSettings indexSettings, @@ -146,6 +154,16 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics */ protected void afterMerge(OnGoingMerge merge) {} + /** + * A callback allowing for custom logic when a merge is queued. + */ + protected void mergeQueued(OnGoingMerge merge) {} + + /** + * A callback allowing for custom logic after a merge is executed or aborted. + */ + protected void mergeExecutedOrAborted(OnGoingMerge merge) {} + /** * A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up. */ @@ -157,6 +175,34 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics */ protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {} + /** + * Returns true if scheduled merges should be skipped (aborted) + */ + protected boolean shouldSkipMerge() { + return false; + } + + /** + * Returns true if IO-throttling is enabled + */ + protected boolean isAutoThrottle() { + return config.isAutoThrottle(); + } + + /** + * Returns the maximum number of active merges before being throttled + */ + protected int getMaxMergeCount() { + return config.getMaxMergeCount(); + } + + /** + * Returns the maximum number of threads running merges before being throttled + */ + protected int getMaxThreadCount() { + return config.getMaxThreadCount(); + } + /** * A callback for exceptions thrown while merging. */ @@ -168,6 +214,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { try { MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); + mergeQueued(mergeTask.onGoingMerge); return threadPoolMergeExecutorService.submitMergeTask(mergeTask); } finally { checkMergeTaskThrottling(); @@ -183,7 +230,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics return new MergeTask( mergeSource, merge, - isAutoThrottle && config.isAutoThrottle(), + isAutoThrottle && isAutoThrottle(), "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId, estimateMergeMemoryBytes ); @@ -193,7 +240,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics long submittedMergesCount = submittedMergeTaskCount.get(); long doneMergesCount = doneMergeTaskCount.get(); int runningMergesCount = runningMergeTasks.size(); - int configuredMaxMergeCount = config.getMaxMergeCount(); + int configuredMaxMergeCount = getMaxMergeCount(); // both currently running and enqueued merge tasks are considered "active" for throttling purposes int activeMerges = (int) (submittedMergesCount - doneMergesCount); if (activeMerges > configuredMaxMergeCount @@ -223,7 +270,12 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics if (closed) { // do not run or backlog tasks when closing the merge scheduler, instead abort them return Schedule.ABORT; - } else if (runningMergeTasks.size() < config.getMaxThreadCount()) { + } else if (shouldSkipMerge()) { + if (verbose()) { + message(String.format(Locale.ROOT, "skipping merge task %s", mergeTask)); + } + return Schedule.ABORT; + } else if (runningMergeTasks.size() < getMaxThreadCount()) { boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null; assert added : "starting merge task [" + mergeTask + "] registered as already running"; return Schedule.RUN; @@ -243,8 +295,9 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics maybeSignalAllMergesDoneAfterClose(); } - private void mergeTaskDone() { + private void mergeTaskDone(OnGoingMerge merge) { doneMergeTaskCount.incrementAndGet(); + mergeExecutedOrAborted(merge); checkMergeTaskThrottling(); } @@ -255,7 +308,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics } private synchronized void enqueueBackloggedTasks() { - int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - runningMergeTasks.size(); + int maxBackloggedTasksToEnqueue = getMaxThreadCount() - runningMergeTasks.size(); // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back while (closed || maxBackloggedTasksToEnqueue-- > 0) { MergeTask backloggedMergeTask = backloggedMergeTasks.poll(); @@ -408,7 +461,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics try { mergeTaskFinishedRunning(this); } finally { - mergeTaskDone(); + mergeTaskDone(onGoingMerge); } try { // kick-off any follow-up merge @@ -452,7 +505,7 @@ public class ThreadPoolMergeScheduler extends MergeScheduler implements Elastics if (verbose()) { message(String.format(Locale.ROOT, "merge task %s end abort", this)); } - mergeTaskDone(); + mergeTaskDone(onGoingMerge); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java index 6d39b72a107f..01a6150fd140 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -662,6 +662,31 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase { } } + public void testMergeSchedulerAbortsMergeWhenShouldSkipMergeIsTrue() { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + // build a scheduler that always returns true for shouldSkipMerge + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", Settings.builder().build()), + threadPoolMergeExecutorService, + merge -> 0 + ) { + @Override + protected boolean shouldSkipMerge() { + return true; + } + }; + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values())); + // verify that calling schedule on the merge task indicates the merge should be aborted + Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask); + assertThat(schedule, is(Schedule.ABORT)); + } + private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt())); }