diff --git a/docs/changelog/120869.yaml b/docs/changelog/120869.yaml new file mode 100644 index 000000000000..024d51db894f --- /dev/null +++ b/docs/changelog/120869.yaml @@ -0,0 +1,5 @@ +pr: 120869 +summary: Threadpool merge scheduler +area: Engine +type: feature +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java index 80de2ffcaa7a..243e4219ffef 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/InternalEngineMergeIT.java @@ -8,24 +8,40 @@ */ package org.elasticsearch.index.engine; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.threadpool.ThreadPool; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; -@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, scope = Scope.SUITE) +@ClusterScope(supportsDedicatedMasters = false, numDataNodes = 1, numClientNodes = 0, scope = Scope.TEST) public class InternalEngineMergeIT extends ESIntegTestCase { + private boolean useThreadPoolMerging; + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + useThreadPoolMerging = randomBoolean(); + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal, otherSettings)); + settings.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), useThreadPoolMerging); + return settings.build(); + } + public void testMergesHappening() throws Exception { final int numOfShards = randomIntBetween(1, 5); // some settings to keep num segments low @@ -83,4 +99,60 @@ public class InternalEngineMergeIT extends ESIntegTestCase { assertThat(count, lessThanOrEqualTo(upperNumberSegments)); } + public void testMergesUseTheMergeThreadPool() throws Exception { + final String indexName = randomIdentifier(); + createIndex(indexName, indexSettings(randomIntBetween(1, 3), 0).build()); + long id = 0; + final int minMerges = randomIntBetween(1, 5); + long totalDocs = 0; + + while (true) { + int docs = randomIntBetween(100, 200); + totalDocs += docs; + + BulkRequestBuilder request = client().prepareBulk(); + for (int j = 0; j < docs; ++j) { + request.add( + new IndexRequest(indexName).id(Long.toString(id++)) + .source(jsonBuilder().startObject().field("l", randomLong()).endObject()) + ); + } + BulkResponse response = request.get(); + assertNoFailures(response); + refresh(indexName); + + var mergesResponse = client().admin().indices().prepareStats(indexName).clear().setMerge(true).get(); + var primaries = mergesResponse.getIndices().get(indexName).getPrimaries(); + if (primaries.merge.getTotal() >= minMerges) { + break; + } + } + + forceMerge(); + refresh(indexName); + + // after a force merge there should only be 1 segment per shard + var shardsWithMultipleSegments = getShardSegments().stream() + .filter(shardSegments -> shardSegments.getSegments().size() > 1) + .toList(); + assertTrue("there are shards with multiple segments " + shardsWithMultipleSegments, shardsWithMultipleSegments.isEmpty()); + + final long expectedTotalDocs = totalDocs; + assertHitCount(prepareSearch(indexName).setQuery(QueryBuilders.matchAllQuery()).setTrackTotalHits(true), expectedTotalDocs); + + IndicesStatsResponse indicesStats = client().admin().indices().prepareStats(indexName).setMerge(true).get(); + long mergeCount = indicesStats.getIndices().get(indexName).getPrimaries().merge.getTotal(); + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().setThreadPool(true).get(); + assertThat(nodesStatsResponse.getNodes().size(), equalTo(1)); + + NodeStats nodeStats = nodesStatsResponse.getNodes().get(0); + if (useThreadPoolMerging) { + assertThat( + nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().get().completed(), + equalTo(mergeCount) + ); + } else { + assertTrue(nodeStats.getThreadPool().stats().stream().filter(s -> ThreadPool.Names.MERGE.equals(s.name())).findAny().isEmpty()); + } + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java new file mode 100644 index 000000000000..1743ca199605 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java @@ -0,0 +1,313 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.store.Directory; +import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; +import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; +import org.elasticsearch.action.admin.indices.segments.ShardSegments; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.plugins.EnginePlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.test.ESSingleNodeTestCase; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; + +public class ThreadPoolMergeSchedulerStressTestIT extends ESSingleNodeTestCase { + + private static final int MERGE_SCHEDULER_MAX_CONCURRENCY = 3; + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + // when there are more threads than scheduler(s)' concurrency capacity, excess merges will be backlogged + // alternatively, when scheduler(s)' concurrency capacity exceeds the executor's thread count, excess merges will be enqueued + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), MERGE_SCHEDULER_MAX_CONCURRENCY + randomFrom(-2, -1, 0, 1, 2)) + .build(); + } + + @Override + protected Collection> getPlugins() { + return CollectionUtils.appendToCopy(super.getPlugins(), ThreadPoolMergeSchedulerStressTestIT.TestEnginePlugin.class); + } + + public static class TestEnginePlugin extends Plugin implements EnginePlugin { + + final AtomicReference mergeExecutorServiceReference = new AtomicReference<>(); + final Set enqueuedMergesSet = ConcurrentCollections.newConcurrentSet(); + final Set runningMergesSet = ConcurrentCollections.newConcurrentSet(); + // maybe let a few merges run at the start + final int initialRunMergesCount = randomIntBetween(0, 5); + final Semaphore runMergeSemaphore = new Semaphore(initialRunMergesCount); + final int waitMergesEnqueuedCount = randomIntBetween(50, 100); + + void allowAllMerging() { + // even when indexing is done, queued and backlogged merges can themselves trigger further merging + // don't let this test be bothered by that, and simply let all merging run unhindered + runMergeSemaphore.release(Integer.MAX_VALUE - initialRunMergesCount); + } + + class TestInternalEngine extends org.elasticsearch.index.engine.InternalEngine { + + TestInternalEngine(EngineConfig engineConfig) { + super(engineConfig); + } + + protected ElasticsearchMergeScheduler createMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ) { + ElasticsearchMergeScheduler mergeScheduler = super.createMergeScheduler( + shardId, + indexSettings, + threadPoolMergeExecutorService + ); + assertThat(mergeScheduler, instanceOf(ThreadPoolMergeScheduler.class)); + // assert there is a single merge executor service for all shards + mergeExecutorServiceReference.compareAndSet(null, threadPoolMergeExecutorService); + assertThat(mergeExecutorServiceReference.get(), is(threadPoolMergeExecutorService)); + return new TestMergeScheduler((ThreadPoolMergeScheduler) mergeScheduler); + } + + class TestMergeScheduler implements ElasticsearchMergeScheduler { + + ThreadPoolMergeScheduler delegateMergeScheduler; + + TestMergeScheduler(ThreadPoolMergeScheduler threadPoolMergeScheduler) { + this.delegateMergeScheduler = threadPoolMergeScheduler; + } + + @Override + public Set onGoingMerges() { + return delegateMergeScheduler.onGoingMerges(); + } + + @Override + public MergeStats stats() { + return delegateMergeScheduler.stats(); + } + + @Override + public void refreshConfig() { + delegateMergeScheduler.refreshConfig(); + } + + @Override + public MergeScheduler getMergeScheduler() { + return new MergeScheduler() { + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) { + delegateMergeScheduler.merge(new MergeSource() { + @Override + public OneMerge getNextMerge() { + OneMerge nextMerge = mergeSource.getNextMerge(); + if (nextMerge != null) { + assertTrue(TestEnginePlugin.this.enqueuedMergesSet.add(nextMerge)); + // avoid excess merges pilling up + if (TestEnginePlugin.this.enqueuedMergesSet + .size() > TestEnginePlugin.this.waitMergesEnqueuedCount) { + runMergeSemaphore.release(); + } + } + return nextMerge; + } + + @Override + public void onMergeFinished(OneMerge merge) { + mergeSource.onMergeFinished(merge); + } + + @Override + public boolean hasPendingMerges() { + return mergeSource.hasPendingMerges(); + } + + @Override + public void merge(OneMerge merge) throws IOException { + assertNotNull(merge); + try { + // most merges need to acquire the semaphore in order to run + if (frequently()) { + runMergeSemaphore.acquire(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + // assert to-be-run merge was enqueued + assertTrue(TestEnginePlugin.this.enqueuedMergesSet.remove(merge)); + TestEnginePlugin.this.runningMergesSet.add(merge); + assertThat( + TestEnginePlugin.this.runningMergesSet.size(), + lessThanOrEqualTo( + TestEnginePlugin.this.mergeExecutorServiceReference.get().getMaxConcurrentMerges() + ) + ); + mergeSource.merge(merge); + assertTrue(TestEnginePlugin.this.runningMergesSet.remove(merge)); + } + }, trigger); + } + + @Override + public Directory wrapForMerge(OneMerge merge, Directory in) { + return delegateMergeScheduler.wrapForMerge(merge, in); + } + + @Override + public Executor getIntraMergeExecutor(OneMerge merge) { + return delegateMergeScheduler.getIntraMergeExecutor(merge); + } + + @Override + public void close() throws IOException { + delegateMergeScheduler.close(); + } + }; + } + } + } + + @Override + public Optional getEngineFactory(IndexSettings indexSettings) { + return Optional.of(TestInternalEngine::new); + } + + } + + public void testMergingFallsBehindAndThenCatchesUp() throws Exception { + createIndex( + "index", + // stress test merging across multiple shards + indexSettings(randomIntBetween(1, 10), 0) + // few segments per merge ought to result in more merging activity + .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), randomIntBetween(2, 3)) + .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), randomIntBetween(2, 3)) + // few concurrent merges allowed per scheduler + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), randomIntBetween(1, MERGE_SCHEDULER_MAX_CONCURRENCY)) + // many pending merges allowed, in order to disable indexing throttle + .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), randomIntBetween(1, Integer.MAX_VALUE)) + .build() + ); + ensureGreen("index"); + // generate merging activity across many threads + Thread[] indexingThreads = new Thread[randomIntBetween(20, 30)]; + AtomicBoolean indexingDone = new AtomicBoolean(false); + for (int i = 0; i < indexingThreads.length; i++) { + int finalI = i; + indexingThreads[i] = new Thread(() -> { + long termUpto = 0; + while (indexingDone.get() == false) { + for (int j = 0; j < 100; j++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for (int k = 0; k < 100; k++) { + sb.append(' '); + sb.append(termUpto++); + } + prepareIndex("index").setId("thread_" + finalI + "_term_" + termUpto) + .setSource("field" + (j % 10), sb.toString()) + .get(); + if (j % 2 == 0) { + indicesAdmin().prepareRefresh("index").get(); + } + } + indicesAdmin().prepareRefresh("index").get(); + } + }); + indexingThreads[i].start(); + } + TestEnginePlugin testEnginePlugin = getTestEnginePlugin(); + assertBusy(() -> { + // wait for merges to enqueue or backlog + assertThat(testEnginePlugin.enqueuedMergesSet.size(), greaterThanOrEqualTo(testEnginePlugin.waitMergesEnqueuedCount)); + }, 1, TimeUnit.MINUTES); + // finish up indexing + indexingDone.set(true); + for (Thread indexingThread : indexingThreads) { + indexingThread.join(); + } + // unblock merge threads + testEnginePlugin.allowAllMerging(); + // await all merging to catch up + assertBusy(() -> { + assertThat(testEnginePlugin.runningMergesSet.size(), is(0)); + assertThat(testEnginePlugin.enqueuedMergesSet.size(), is(0)); + testEnginePlugin.mergeExecutorServiceReference.get().allDone(); + }, 1, TimeUnit.MINUTES); + var segmentsCountAfterMergingCaughtUp = getSegmentsCountForAllShards("index"); + // force merge should be a noop after all available merging was done + assertAllSuccessful(indicesAdmin().prepareForceMerge("index").get()); + var segmentsCountAfterForceMerge = getSegmentsCountForAllShards("index"); + assertThat(segmentsCountAfterForceMerge, is(segmentsCountAfterMergingCaughtUp)); + // let's also run a force-merge to 1 segment + assertAllSuccessful(indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get()); + assertAllSuccessful(indicesAdmin().prepareRefresh("index").get()); + // assert one segment per shard + { + IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments("index").get(); + Iterator indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get("index").iterator(); + while (indexShardSegmentsIterator.hasNext()) { + for (ShardSegments segments : indexShardSegmentsIterator.next()) { + assertThat(segments.getSegments().size(), is(1)); + } + } + } + } + + private int getSegmentsCountForAllShards(String indexName) { + // refresh, otherwise we'd be still seeing the old merged-away segments + assertAllSuccessful(indicesAdmin().prepareRefresh(indexName).get()); + int count = 0; + IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indexName).get(); + Iterator indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get(indexName).iterator(); + while (indexShardSegmentsIterator.hasNext()) { + for (ShardSegments segments : indexShardSegmentsIterator.next()) { + count += segments.getSegments().size(); + } + } + return count; + } + + private TestEnginePlugin getTestEnginePlugin() { + return getInstanceFromNode(PluginsService.class).filterPlugins(TestEnginePlugin.class).toList().get(0); + } +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java index a0b158ed34a5..dd32e6b30fb7 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -625,6 +625,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { indexService.getIndexEventListener(), wrapper, indexService.getThreadPool(), + indexService.getThreadPoolMergeExecutorService(), indexService.getBigArrays(), null, Collections.emptyList(), diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java index 74ccdce19d3a..0ac8c4d0b6fd 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/IndexingMemoryControllerIT.java @@ -59,6 +59,7 @@ public class IndexingMemoryControllerIT extends ESSingleNodeTestCase { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), indexSettings, config.getWarmer(), config.getStore(), diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index 53d50aea6e7b..025626fbed9d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -88,6 +88,7 @@ import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingPressure; import org.elasticsearch.index.MergePolicyConfig; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndicesQueryCache; import org.elasticsearch.indices.IndicesRequestCache; @@ -625,6 +626,7 @@ public final class ClusterSettings extends AbstractScopedSettings { TDigestExecutionHint.SETTING, MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING, MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING, + ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING, TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE, DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING, DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING, diff --git a/server/src/main/java/org/elasticsearch/index/IndexModule.java b/server/src/main/java/org/elasticsearch/index/IndexModule.java index 7d63a0432cdb..6cd63b3c0047 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexModule.java +++ b/server/src/main/java/org/elasticsearch/index/IndexModule.java @@ -43,6 +43,7 @@ import org.elasticsearch.index.cache.query.IndexQueryCache; import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; @@ -470,6 +471,7 @@ public final class IndexModule { CircuitBreakerService circuitBreakerService, BigArrays bigArrays, ThreadPool threadPool, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, ClusterService clusterService, Client client, @@ -523,6 +525,7 @@ public final class IndexModule { circuitBreakerService, bigArrays, threadPool, + threadPoolMergeExecutorService, scriptService, clusterService, client, diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 6824f0d668c0..ee7a3038bb8b 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.IndexFieldData; import org.elasticsearch.index.fielddata.IndexFieldDataCache; @@ -154,6 +155,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final AsyncTrimTranslogTask trimTranslogTask; private final ThreadPool threadPool; + @Nullable + private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final BigArrays bigArrays; private final ScriptService scriptService; private final ClusterService clusterService; @@ -178,6 +181,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust CircuitBreakerService circuitBreakerService, BigArrays bigArrays, ThreadPool threadPool, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, ScriptService scriptService, ClusterService clusterService, Client client, @@ -261,6 +265,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust this.indexFoldersDeletionListener = indexFoldersDeletionListener; this.bigArrays = bigArrays; this.threadPool = threadPool; + this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; this.scriptService = scriptService; this.clusterService = clusterService; this.client = client; @@ -556,6 +561,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust eventListener, readerWrapper, threadPool, + threadPoolMergeExecutorService, bigArrays, engineWarmer, searchOperationListeners, @@ -820,6 +826,10 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust return threadPool; } + public @Nullable ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService() { + return threadPoolMergeExecutorService; + } + /** * The {@link BigArrays} to use for this index. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index af3c2cd5172f..1ef42cdb922c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -58,6 +58,8 @@ public final class EngineConfig { private final MapperService mapperService; private final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier; private final ThreadPool threadPool; + @Nullable + private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final Engine.Warmer warmer; private final Store store; private final MergePolicy mergePolicy; @@ -150,6 +152,7 @@ public final class EngineConfig { public EngineConfig( ShardId shardId, ThreadPool threadPool, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService, IndexSettings indexSettings, Engine.Warmer warmer, Store store, @@ -179,6 +182,7 @@ public final class EngineConfig { this.shardId = shardId; this.indexSettings = indexSettings; this.threadPool = threadPool; + this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; this.warmer = warmer == null ? (a) -> {} : warmer; this.store = store; this.mergePolicy = mergePolicy; @@ -287,6 +291,10 @@ public final class EngineConfig { return threadPool; } + public @Nullable ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService() { + return threadPoolMergeExecutorService; + } + /** * Returns an {@link org.elasticsearch.index.engine.Engine.Warmer} used to warm new searchers before they are used for searching. */ diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fc7f6eab0856..2f6f69862066 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -254,7 +254,11 @@ public class InternalEngine extends Engine { boolean success = false; try { this.lastDeleteVersionPruneTimeMSec = engineConfig.getThreadPool().relativeTimeInMillis(); - mergeScheduler = createMergeScheduler(engineConfig.getShardId(), engineConfig.getIndexSettings()); + mergeScheduler = createMergeScheduler( + engineConfig.getShardId(), + engineConfig.getIndexSettings(), + engineConfig.getThreadPoolMergeExecutorService() + ); scheduler = mergeScheduler.getMergeScheduler(); throttle = new IndexThrottle(); try { @@ -2818,15 +2822,95 @@ public class InternalEngine extends Engine { return indexWriter.getConfig(); } - protected ElasticsearchMergeScheduler createMergeScheduler(ShardId shardId, IndexSettings indexSettings) { - return new EngineMergeScheduler(shardId, indexSettings); + private void maybeFlushAfterMerge(OnGoingMerge merge) { + if (indexWriter.hasPendingMerges() == false && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { + // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the + // writer + // we deadlock on engine#close for instance. + engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + if (isClosed.get() == false) { + logger.warn("failed to flush after merge has finished", e); + } else { + logger.info("failed to flush after merge has finished during shard close"); + } + } + + @Override + protected void doRun() { + // if we have no pending merges and we are supposed to flush once merges have finished to + // free up transient disk usage of the (presumably biggish) segments that were just merged + flush(); + } + }); + } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { + // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change + // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. + // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. + shouldPeriodicallyFlushAfterBigMerge.set(true); + } } - private final class EngineMergeScheduler extends ElasticsearchConcurrentMergeScheduler { + protected ElasticsearchMergeScheduler createMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + @Nullable ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ) { + if (threadPoolMergeExecutorService != null) { + return new EngineThreadPoolMergeScheduler(shardId, indexSettings, threadPoolMergeExecutorService); + } else { + return new EngineConcurrentMergeScheduler(shardId, indexSettings); + } + } + + private final class EngineThreadPoolMergeScheduler extends ThreadPoolMergeScheduler { + EngineThreadPoolMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ) { + super(shardId, indexSettings, threadPoolMergeExecutorService); + } + + @Override + protected synchronized void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + logger.info( + "now throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}", + numRunningMerges, + numQueuedMerges, + configuredMaxMergeCount + ); + InternalEngine.this.activateThrottling(); + } + + @Override + protected synchronized void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) { + logger.info( + "stop throttling indexing: numRunningMerges={}, numQueuedMerges={}, maxNumMergesConfigured={}", + numRunningMerges, + numQueuedMerges, + configuredMaxMergeCount + ); + InternalEngine.this.deactivateThrottling(); + } + + @Override + public synchronized void afterMerge(OnGoingMerge merge) { + maybeFlushAfterMerge(merge); + } + + @Override + protected void handleMergeException(final Throwable exc) { + mergeException(exc); + } + } + + private final class EngineConcurrentMergeScheduler extends ElasticsearchConcurrentMergeScheduler { private final AtomicInteger numMergesInFlight = new AtomicInteger(0); private final AtomicBoolean isThrottling = new AtomicBoolean(); - EngineMergeScheduler(ShardId shardId, IndexSettings indexSettings) { + EngineConcurrentMergeScheduler(ShardId shardId, IndexSettings indexSettings) { super(shardId, indexSettings); } @@ -2850,33 +2934,7 @@ public class InternalEngine extends Engine { deactivateThrottling(); } } - if (indexWriter.hasPendingMerges() == false - && System.nanoTime() - lastWriteNanos >= engineConfig.getFlushMergesAfter().nanos()) { - // NEVER do this on a merge thread since we acquire some locks blocking here and if we concurrently rollback the writer - // we deadlock on engine#close for instance. - engineConfig.getThreadPool().executor(ThreadPool.Names.FLUSH).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - if (isClosed.get() == false) { - logger.warn("failed to flush after merge has finished", e); - } else { - logger.info("failed to flush after merge has finished during shard close"); - } - } - - @Override - protected void doRun() { - // if we have no pending merges and we are supposed to flush once merges have finished to - // free up transient disk usage of the (presumably biggish) segments that were just merged - flush(); - } - }); - } else if (merge.getTotalBytesSize() >= engineConfig.getIndexSettings().getFlushAfterMergeThresholdSize().getBytes()) { - // we hit a significant merge which would allow us to free up memory if we'd commit it hence on the next change - // we should execute a flush on the next operation if that's a flush after inactive or indexing a document. - // we could fork a thread and do it right away but we try to minimize forking and piggyback on outside events. - shouldPeriodicallyFlushAfterBigMerge.set(true); - } + maybeFlushAfterMerge(merge); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java new file mode 100644 index 000000000000..5217edb5490d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorService.java @@ -0,0 +1,304 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.core.Nullable; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.threadpool.ThreadPool; + +import java.util.Comparator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.LongUnaryOperator; + +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN; + +public class ThreadPoolMergeExecutorService { + /** + * Floor for IO write rate limit of individual merge tasks (we will never go any lower than this) + */ + static final ByteSizeValue MIN_IO_RATE = ByteSizeValue.ofMb(5L); + /** + * Ceiling for IO write rate limit of individual merge tasks (we will never go any higher than this) + */ + static final ByteSizeValue MAX_IO_RATE = ByteSizeValue.ofMb(10240L); + /** + * Initial value for IO write rate limit of individual merge tasks when doAutoIOThrottle is true + */ + static final ByteSizeValue START_IO_RATE = ByteSizeValue.ofMb(20L); + /** + * Total number of submitted merge tasks that support IO auto throttling and that have not yet been run (or aborted). + * This includes merge tasks that are currently running and that are backlogged (by their respective merge schedulers). + */ + private final AtomicInteger ioThrottledMergeTasksCount = new AtomicInteger(); + /** + * The merge tasks that are waiting execution. This does NOT include backlogged or currently executing merge tasks. + * For instance, this can be empty while there are backlogged merge tasks awaiting re-enqueuing. + */ + private final PriorityBlockingQueue queuedMergeTasks = new PriorityBlockingQueue<>( + 64, + Comparator.comparingLong(MergeTask::estimatedMergeSize) + ); + /** + * The set of all merge tasks currently being executed by merge threads from the pool. + * These are tracked notably in order to be able to update their disk IO throttle rate, after they have started, while executing. + */ + private final Set runningMergeTasks = ConcurrentCollections.newConcurrentSet(); + /** + * Current IO write throttle rate, in bytes per sec, that's in effect for all currently running merge tasks, + * across all {@link ThreadPoolMergeScheduler}s that use this instance of the queue. + */ + private final AtomicIORate targetIORateBytesPerSec = new AtomicIORate(START_IO_RATE.getBytes()); + private final ExecutorService executorService; + /** + * The maximum number of concurrently running merges, given the number of threads in the pool. + */ + private final int maxConcurrentMerges; + private final int concurrentMergesFloorLimitForThrottling; + private final int concurrentMergesCeilLimitForThrottling; + + public static @Nullable ThreadPoolMergeExecutorService maybeCreateThreadPoolMergeExecutorService( + ThreadPool threadPool, + Settings settings + ) { + if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) { + return new ThreadPoolMergeExecutorService(threadPool); + } else { + return null; + } + } + + private ThreadPoolMergeExecutorService(ThreadPool threadPool) { + this.executorService = threadPool.executor(ThreadPool.Names.MERGE); + this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax(); + this.concurrentMergesFloorLimitForThrottling = maxConcurrentMerges * 2; + this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 4; + } + + boolean submitMergeTask(MergeTask mergeTask) { + assert mergeTask.isRunning() == false; + // first enqueue the runnable that runs exactly one merge task (the smallest it can find) + if (enqueueMergeTaskExecution() == false) { + // if the thread pool cannot run the merge, just abort it + mergeTask.abort(); + return false; + } else { + if (mergeTask.supportsIOThrottling()) { + // count enqueued merge tasks that support IO auto throttling, and maybe adjust IO rate for all + int currentTaskCount = ioThrottledMergeTasksCount.incrementAndGet(); + targetIORateBytesPerSec.update( + currentTargetIORateBytesPerSec -> newTargetIORateBytesPerSec( + currentTargetIORateBytesPerSec, + currentTaskCount, + concurrentMergesFloorLimitForThrottling, + concurrentMergesCeilLimitForThrottling + ), + (prevTargetIORateBytesPerSec, newTargetIORateBytesPerSec) -> { + // it's OK to have this method update merge tasks concurrently, with different targetMBPerSec values, + // as it's not important that all merge tasks are throttled to the same IO rate at all time. + // For performance reasons, we don't synchronize the updates to targetMBPerSec values with the update of running + // merges. + if (prevTargetIORateBytesPerSec != newTargetIORateBytesPerSec) { + runningMergeTasks.forEach(runningMergeTask -> { + if (runningMergeTask.supportsIOThrottling()) { + runningMergeTask.setIORateLimit(newTargetIORateBytesPerSec); + } + }); + } + } + ); + } + // then enqueue the merge task proper + queuedMergeTasks.add(mergeTask); + return true; + } + } + + void reEnqueueBackloggedMergeTask(MergeTask mergeTask) { + queuedMergeTasks.add(mergeTask); + } + + public boolean allDone() { + return queuedMergeTasks.isEmpty() && runningMergeTasks.isEmpty() && ioThrottledMergeTasksCount.get() == 0L; + } + + /** + * Enqueues a runnable that executes exactly one merge task, the smallest that is runnable at some point in time. + * A merge task is not runnable if its scheduler already reached the configured max-allowed concurrency level. + */ + private boolean enqueueMergeTaskExecution() { + try { + executorService.execute(() -> { + // one such runnable always executes a SINGLE merge task from the queue + // this is important for merge queue statistics, i.e. the executor's queue size represents the current amount of merges + while (true) { + MergeTask smallestMergeTask; + try { + // will block if there are backlogged merges until they're enqueued again + smallestMergeTask = queuedMergeTasks.take(); + } catch (InterruptedException e) { + // An active worker thread has been interrupted while waiting for backlogged merges to be re-enqueued. + // In this case, we terminate the worker thread promptly and forget about the backlogged merges. + // It is OK to forget about merges in this case, because active worker threads are only interrupted + // when the node is shutting down, in which case in-memory accounting of merging activity is not relevant. + // As part of {@link java.util.concurrent.ThreadPoolExecutor#shutdownNow()} the thread pool's work queue + // is also drained, so any queued merge tasks are also forgotten. + break; + } + // let the task's scheduler decide if it can actually run the merge task now + ThreadPoolMergeScheduler.Schedule schedule = smallestMergeTask.schedule(); + if (schedule == RUN) { + runMergeTask(smallestMergeTask); + break; + } else if (schedule == ABORT) { + abortMergeTask(smallestMergeTask); + break; + } else { + assert schedule == BACKLOG; + // the merge task is backlogged by the merge scheduler, try to get the next smallest one + // it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run + } + } + }); + return true; + } catch (Throwable t) { + // cannot execute merges because the executor is shutting down + assert t instanceof RejectedExecutionException; + return false; + } + } + + private void runMergeTask(MergeTask mergeTask) { + assert mergeTask.isRunning() == false; + boolean added = runningMergeTasks.add(mergeTask); + assert added : "starting merge task [" + mergeTask + "] registered as already running"; + try { + if (mergeTask.supportsIOThrottling()) { + mergeTask.setIORateLimit(targetIORateBytesPerSec.get()); + } + mergeTask.run(); + } finally { + boolean removed = runningMergeTasks.remove(mergeTask); + assert removed : "completed merge task [" + mergeTask + "] not registered as running"; + if (mergeTask.supportsIOThrottling()) { + ioThrottledMergeTasksCount.decrementAndGet(); + } + } + } + + private void abortMergeTask(MergeTask mergeTask) { + assert mergeTask.isRunning() == false; + assert runningMergeTasks.contains(mergeTask) == false; + try { + mergeTask.abort(); + } finally { + if (mergeTask.supportsIOThrottling()) { + ioThrottledMergeTasksCount.decrementAndGet(); + } + } + } + + private static long newTargetIORateBytesPerSec( + long currentTargetIORateBytesPerSec, + int currentlySubmittedIOThrottledMergeTasks, + int concurrentMergesFloorLimitForThrottling, + int concurrentMergesCeilLimitForThrottling + ) { + final long newTargetIORateBytesPerSec; + if (currentlySubmittedIOThrottledMergeTasks < concurrentMergesFloorLimitForThrottling + && currentTargetIORateBytesPerSec > MIN_IO_RATE.getBytes()) { + // decrease target IO rate by 10% (capped) + newTargetIORateBytesPerSec = Math.max( + MIN_IO_RATE.getBytes(), + currentTargetIORateBytesPerSec - currentTargetIORateBytesPerSec / 10L + ); + } else if (currentlySubmittedIOThrottledMergeTasks > concurrentMergesCeilLimitForThrottling + && currentTargetIORateBytesPerSec < MAX_IO_RATE.getBytes()) { + // increase target IO rate by 10% (capped) + newTargetIORateBytesPerSec = Math.min( + MAX_IO_RATE.getBytes(), + currentTargetIORateBytesPerSec + currentTargetIORateBytesPerSec / 10L + ); + } else { + newTargetIORateBytesPerSec = currentTargetIORateBytesPerSec; + } + return newTargetIORateBytesPerSec; + } + + static class AtomicIORate { + private final AtomicLong ioRate; + + AtomicIORate(long initialIORate) { + ioRate = new AtomicLong(initialIORate); + } + + long get() { + return ioRate.get(); + } + + // Exactly like {@link AtomicLong#updateAndGet} but calls the consumer rather than return the new (updated) value. + // The consumer receives both the previous and the updated values (which can be equal). + void update(LongUnaryOperator updateFunction, AtomicIORate.UpdateConsumer updateConsumer) { + long prev = ioRate.get(), next = 0L; + for (boolean haveNext = false;;) { + if (haveNext == false) next = updateFunction.applyAsLong(prev); + if (ioRate.weakCompareAndSetVolatile(prev, next)) { + updateConsumer.accept(prev, next); + return; + } + haveNext = (prev == (prev = ioRate.get())); + } + } + + @FunctionalInterface + interface UpdateConsumer { + void accept(long prev, long next); + } + } + + // exposed for tests + Set getRunningMergeTasks() { + return runningMergeTasks; + } + + // exposed for tests + PriorityBlockingQueue getQueuedMergeTasks() { + return queuedMergeTasks; + } + + // exposed for tests and stats + long getTargetIORateBytesPerSec() { + return targetIORateBytesPerSec.get(); + } + + // exposed for tests + int getMaxConcurrentMerges() { + return maxConcurrentMerges; + } + + // exposed for tests + int getConcurrentMergesFloorLimitForThrottling() { + return concurrentMergesFloorLimitForThrottling; + } + + // exposed for tests + int getConcurrentMergesCeilLimitForThrottling() { + return concurrentMergesCeilLimitForThrottling; + } +} diff --git a/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java new file mode 100644 index 000000000000..8cfdc5926836 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/engine/ThreadPoolMergeScheduler.java @@ -0,0 +1,529 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergeRateLimiter; +import org.apache.lucene.index.MergeScheduler; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RateLimitedIndexOutput; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.merge.OnGoingMerge; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class ThreadPoolMergeScheduler extends MergeScheduler implements ElasticsearchMergeScheduler { + public static final Setting USE_THREAD_POOL_MERGE_SCHEDULER_SETTING = Setting.boolSetting( + "indices.merge.scheduler.use_thread_pool", + true, + Setting.Property.NodeScope + ); + private final ShardId shardId; + private final MergeSchedulerConfig config; + private final Logger logger; + private final MergeTracking mergeTracking; + private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; + private final PriorityQueue backloggedMergeTasks = new PriorityQueue<>( + 16, + Comparator.comparingLong(MergeTask::estimatedMergeSize) + ); + private final Map runningMergeTasks = new HashMap<>(); + // set when incoming merges should be throttled (i.e. restrict the indexing rate) + private final AtomicBoolean shouldThrottleIncomingMerges = new AtomicBoolean(); + // how many {@link MergeTask}s have kicked off (this is used to name them). + private final AtomicLong submittedMergeTaskCount = new AtomicLong(); + private final AtomicLong doneMergeTaskCount = new AtomicLong(); + private final CountDownLatch closedWithNoRunningMerges = new CountDownLatch(1); + private volatile boolean closed = false; + + public ThreadPoolMergeScheduler( + ShardId shardId, + IndexSettings indexSettings, + ThreadPoolMergeExecutorService threadPoolMergeExecutorService + ) { + this.shardId = shardId; + this.config = indexSettings.getMergeSchedulerConfig(); + this.logger = Loggers.getLogger(getClass(), shardId); + this.mergeTracking = new MergeTracking( + logger, + () -> this.config.isAutoThrottle() + ? ByteSizeValue.ofBytes(threadPoolMergeExecutorService.getTargetIORateBytesPerSec()).getMbFrac() + : Double.POSITIVE_INFINITY + ); + this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; + } + + @Override + public Set onGoingMerges() { + return mergeTracking.onGoingMerges(); + } + + @Override + public MergeStats stats() { + return mergeTracking.stats(); + } + + @Override + public MergeScheduler getMergeScheduler() { + return this; + } + + @Override + public void refreshConfig() { + // if maxMergeCount changed, maybe we need to toggle merge task throttling + checkMergeTaskThrottling(); + // if maxThreadCount changed, maybe some backlogged merges are now allowed to run + enqueueBackloggedTasks(); + } + + @Override + public void merge(MergeSource mergeSource, MergeTrigger trigger) { + if (closed) { + // avoid pulling from the merge source when closing + return; + } + MergePolicy.OneMerge merge = null; + try { + merge = mergeSource.getNextMerge(); + } catch (IllegalStateException e) { + if (verbose()) { + message("merge task poll failed, likely that index writer is failed"); + } + // ignore exception, we expect the IW failure to be logged elsewhere + } + if (merge != null) { + submitNewMergeTask(mergeSource, merge, trigger); + } + } + + @Override + public MergeScheduler clone() { + // Lucene IW makes a clone internally but since we hold on to this instance + // the clone will just be the identity. + return this; + } + + /** + * A callback allowing for custom logic before an actual merge starts. + */ + protected void beforeMerge(OnGoingMerge merge) {} + + /** + * A callback allowing for custom logic after an actual merge starts. + */ + protected void afterMerge(OnGoingMerge merge) {} + + /** + * A callback that's invoked when indexing should throttle down indexing in order to let merging to catch up. + */ + protected void enableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {} + + /** + * A callback that's invoked when indexing should un-throttle because merging caught up. + * This is invoked sometime after {@link #enableIndexingThrottling(int, int, int)} was invoked in the first place. + */ + protected void disableIndexingThrottling(int numRunningMerges, int numQueuedMerges, int configuredMaxMergeCount) {} + + /** + * A callback for exceptions thrown while merging. + */ + protected void handleMergeException(Throwable t) { + throw new MergePolicy.MergeException(t); + } + + // package-private for tests + boolean submitNewMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + try { + MergeTask mergeTask = newMergeTask(mergeSource, merge, mergeTrigger); + return threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } finally { + checkMergeTaskThrottling(); + } + } + + // package-private for tests + MergeTask newMergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, MergeTrigger mergeTrigger) { + // forced merges, as well as merges triggered when closing a shard, always run un-IO-throttled + boolean isAutoThrottle = mergeTrigger != MergeTrigger.CLOSING && merge.getStoreMergeInfo().mergeMaxNumSegments() == -1; + // IO throttling cannot be toggled for existing merge tasks, only new merge tasks pick up the updated IO throttling setting + return new MergeTask( + mergeSource, + merge, + isAutoThrottle && config.isAutoThrottle(), + "Lucene Merge Task #" + submittedMergeTaskCount.incrementAndGet() + " for shard " + shardId + ); + } + + private void checkMergeTaskThrottling() { + long submittedMergesCount = submittedMergeTaskCount.get(); + long doneMergesCount = doneMergeTaskCount.get(); + int runningMergesCount = runningMergeTasks.size(); + int configuredMaxMergeCount = config.getMaxMergeCount(); + // both currently running and enqueued merge tasks are considered "active" for throttling purposes + int activeMerges = (int) (submittedMergesCount - doneMergesCount); + if (activeMerges > configuredMaxMergeCount && shouldThrottleIncomingMerges.get() == false) { + // maybe enable merge task throttling + synchronized (shouldThrottleIncomingMerges) { + if (shouldThrottleIncomingMerges.getAndSet(true) == false) { + enableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); + } + } + } else if (activeMerges <= configuredMaxMergeCount && shouldThrottleIncomingMerges.get()) { + // maybe disable merge task throttling + synchronized (shouldThrottleIncomingMerges) { + if (shouldThrottleIncomingMerges.getAndSet(false)) { + disableIndexingThrottling(runningMergesCount, activeMerges - runningMergesCount, configuredMaxMergeCount); + } + } + } + } + + // exposed for tests + // synchronized so that {@code #closed}, {@code #runningMergeTasks} and {@code #backloggedMergeTasks} are modified atomically + synchronized Schedule schedule(MergeTask mergeTask) { + assert mergeTask.isRunning() == false; + if (closed) { + // do not run or backlog tasks when closing the merge scheduler, instead abort them + return Schedule.ABORT; + } else if (runningMergeTasks.size() < config.getMaxThreadCount()) { + boolean added = runningMergeTasks.put(mergeTask.onGoingMerge.getMerge(), mergeTask) == null; + assert added : "starting merge task [" + mergeTask + "] registered as already running"; + return Schedule.RUN; + } else { + backloggedMergeTasks.add(mergeTask); + return Schedule.BACKLOG; + } + } + + // exposed for tests + synchronized void mergeTaskFinishedRunning(MergeTask mergeTask) { + boolean removed = runningMergeTasks.remove(mergeTask.onGoingMerge.getMerge()) != null; + assert removed : "completed merge task [" + mergeTask + "] not registered as running"; + // when one merge is done, maybe a backlogged one can now execute + enqueueBackloggedTasks(); + // signal here, because, when closing, we wait for all currently running merges to finish + maybeSignalAllMergesDoneAfterClose(); + } + + private void mergeTaskDone() { + doneMergeTaskCount.incrementAndGet(); + checkMergeTaskThrottling(); + } + + private synchronized void maybeSignalAllMergesDoneAfterClose() { + if (closed && runningMergeTasks.isEmpty()) { + closedWithNoRunningMerges.countDown(); + } + } + + private synchronized void enqueueBackloggedTasks() { + int maxBackloggedTasksToEnqueue = config.getMaxThreadCount() - runningMergeTasks.size(); + // enqueue all backlogged tasks when closing, as the queue expects all backlogged tasks to always be enqueued back + while (closed || maxBackloggedTasksToEnqueue-- > 0) { + MergeTask backloggedMergeTask = backloggedMergeTasks.poll(); + if (backloggedMergeTask == null) { + break; + } + // no need to abort merge tasks now, they will be aborted on the spot when the scheduler gets to run them + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(backloggedMergeTask); + } + } + + /** + * Does the actual merge, by calling {@link org.apache.lucene.index.MergeScheduler.MergeSource#merge} + */ + void doMerge(MergeSource mergeSource, MergePolicy.OneMerge oneMerge) { + try { + mergeSource.merge(oneMerge); + } catch (Throwable t) { + // OK to ignore MergeAbortedException. This is what Lucene's ConcurrentMergeScheduler does. + if (t instanceof MergePolicy.MergeAbortedException == false) { + handleMergeException(t); + } + } + } + + @Override + public Directory wrapForMerge(MergePolicy.OneMerge merge, Directory in) { + // Return a wrapped Directory which has rate-limited output. + // Note: the rate limiter is only per thread (per merge). So, if there are multiple merge threads running + // the combined IO rate per node is, roughly, 'thread_pool_size * merge_queue#targetMBPerSec', as + // the per-thread IO rate is updated, best effort, for all running merge threads concomitantly. + if (merge.isAborted()) { + // merges can theoretically be aborted at any moment + return in; + } + MergeTask mergeTask = runningMergeTasks.get(merge); + if (mergeTask == null) { + throw new IllegalStateException("associated merge task for executing merge not found"); + } + return new FilterDirectory(in) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + ensureOpen(); + + // This Directory is only supposed to be used during merging, + // so all writes should have MERGE context, else there is a bug + // somewhere that is failing to pass down the right IOContext: + assert context.context() == IOContext.Context.MERGE : "got context=" + context.context(); + + return new RateLimitedIndexOutput(mergeTask.rateLimiter, in.createOutput(name, context)); + } + }; + } + + class MergeTask implements Runnable { + private final String name; + private final AtomicLong mergeStartTimeNS; + private final MergeSource mergeSource; + private final OnGoingMerge onGoingMerge; + private final MergeRateLimiter rateLimiter; + private final boolean supportsIOThrottling; + + MergeTask(MergeSource mergeSource, MergePolicy.OneMerge merge, boolean supportsIOThrottling, String name) { + this.name = name; + this.mergeStartTimeNS = new AtomicLong(); + this.mergeSource = mergeSource; + this.onGoingMerge = new OnGoingMerge(merge); + this.rateLimiter = new MergeRateLimiter(merge.getMergeProgress()); + this.supportsIOThrottling = supportsIOThrottling; + } + + Schedule schedule() { + return ThreadPoolMergeScheduler.this.schedule(this); + } + + public boolean supportsIOThrottling() { + return supportsIOThrottling; + } + + public void setIORateLimit(long ioRateLimitBytesPerSec) { + if (supportsIOThrottling == false) { + throw new IllegalArgumentException("merge task cannot be IO throttled"); + } + this.rateLimiter.setMBPerSec(ByteSizeValue.ofBytes(ioRateLimitBytesPerSec).getMbFrac()); + } + + public boolean isRunning() { + return mergeStartTimeNS.get() > 0L; + } + + /** + * Runs the merge associated to this task. MUST be invoked after {@link #schedule()} returned {@link Schedule#RUN}, + * to confirm that the associated {@link MergeScheduler} assents to run the merge. + * Either one of {@link #run()} or {@link #abort()} MUST be invoked exactly once for evey {@link MergeTask}. + * After the merge is finished, this will also submit any follow-up merges from the task's merge source. + */ + @Override + public void run() { + assert isRunning() == false; + assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) + : "runNowOrBacklog must be invoked before actually running the merge task"; + try { + beforeMerge(onGoingMerge); + try { + if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) { + throw new IllegalStateException("The merge task is already started or aborted"); + } + mergeTracking.mergeStarted(onGoingMerge); + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s start", this)); + } + try { + doMerge(mergeSource, onGoingMerge.getMerge()); + if (verbose()) { + message( + String.format( + Locale.ROOT, + "merge task %s merge segment [%s] done estSize=%.1f MB (written=%.1f MB) " + + "runTime=%.1fs (stopped=%.1fs, paused=%.1fs) rate=%s", + this, + getSegmentName(onGoingMerge.getMerge()), + bytesToMB(onGoingMerge.getMerge().estimatedMergeBytes), + bytesToMB(rateLimiter.getTotalBytesWritten()), + nsToSec(System.nanoTime() - mergeStartTimeNS.get()), + nsToSec(rateLimiter.getTotalStoppedNS()), + nsToSec(rateLimiter.getTotalPausedNS()), + rateToString(rateLimiter.getMBPerSec()) + ) + ); + } + } finally { + long tookMS = TimeValue.nsecToMSec(System.nanoTime() - mergeStartTimeNS.get()); + mergeTracking.mergeFinished(onGoingMerge.getMerge(), onGoingMerge, tookMS); + } + } finally { + afterMerge(onGoingMerge); + } + } finally { + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s end", this)); + } + try { + mergeTaskFinishedRunning(this); + } finally { + mergeTaskDone(); + } + try { + // kick-off any follow-up merge + merge(mergeSource, MergeTrigger.MERGE_FINISHED); + } catch (@SuppressWarnings("unused") AlreadyClosedException ace) { + // OK, this is what the {@code ConcurrentMergeScheduler} does + } + } + } + + /** + * Aborts the merge task, for e.g. when the {@link MergeScheduler}, or the + * {@link ThreadPoolMergeExecutorService} are closing. Either one of {@link #run()} or {@link #abort()} + * MUST be invoked exactly once for evey {@link MergeTask}. + * An aborted merge means that the segments involved will be made available + * (by the {@link org.apache.lucene.index.IndexWriter}) to any subsequent merges. + */ + void abort() { + assert isRunning() == false; + assert ThreadPoolMergeScheduler.this.runningMergeTasks.containsKey(onGoingMerge.getMerge()) == false + : "cannot abort a merge task that's already running"; + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s aborted", this)); + } + // {@code IndexWriter} checks the abort flag internally, while running the merge. + // The segments of an aborted merge become available to subsequent merges. + onGoingMerge.getMerge().setAborted(); + try { + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s start abort", this)); + } + // mark the merge task as running, even though the merge itself is aborted and the task will run for a brief time only + if (mergeStartTimeNS.compareAndSet(0L, System.nanoTime()) == false) { + throw new IllegalStateException("The merge task is already started or aborted"); + } + // This ensures {@code OneMerge#close} gets invoked. + // {@code IndexWriter} considers a merge as "running" once it has been pulled from the {@code MergeSource#getNextMerge}, + // so in theory it's not enough to just call {@code MergeSource#onMergeFinished} on it (as for "pending" ones). + doMerge(mergeSource, onGoingMerge.getMerge()); + } finally { + if (verbose()) { + message(String.format(Locale.ROOT, "merge task %s end abort", this)); + } + mergeTaskDone(); + } + } + + long estimatedMergeSize() { + // TODO is it possible that `estimatedMergeBytes` be `0` for correctly initialize merges, + // or is it always the case that if `estimatedMergeBytes` is `0` that means that the merge has not yet been initialized? + return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes(); + } + + @Override + public String toString() { + return name + (onGoingMerge.getMerge().isAborted() ? " (aborted)" : ""); + } + } + + @Override + /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */ + protected boolean verbose() { + if (logger.isTraceEnabled()) { + return true; + } + return super.verbose(); + } + + @Override + /* Overridden to route messages to our logger too, in addition to the {@link org.apache.lucene.util.InfoStream} that lucene uses. */ + protected void message(String message) { + if (logger.isTraceEnabled()) { + logger.trace("{}", message); + } + super.message(message); + } + + @Override + public void close() throws IOException { + synchronized (this) { + closed = true; + // enqueue any backlogged merge tasks, because the merge queue assumes that the backlogged tasks are always re-enqueued + enqueueBackloggedTasks(); + // signal if there aren't any currently running merges + maybeSignalAllMergesDoneAfterClose(); + } + try { + closedWithNoRunningMerges.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + // this closes an executor that may be used by ongoing merges, so better close it only after all running merges finished + super.close(); + } + } + + // exposed for tests + PriorityQueue getBackloggedMergeTasks() { + return backloggedMergeTasks; + } + + // exposed for tests + Map getRunningMergeTasks() { + return runningMergeTasks; + } + + private static double nsToSec(long ns) { + return ns / (double) TimeUnit.SECONDS.toNanos(1); + } + + private static double bytesToMB(long bytes) { + return bytes / 1024. / 1024.; + } + + private static String getSegmentName(MergePolicy.OneMerge merge) { + return merge.getMergeInfo() != null ? merge.getMergeInfo().info.name : "_na_"; + } + + private static String rateToString(double mbPerSec) { + if (mbPerSec == 0.0) { + return "stopped"; + } else if (mbPerSec == Double.POSITIVE_INFINITY) { + return "unlimited"; + } else { + return String.format(Locale.ROOT, "%.1f MB/sec", mbPerSec); + } + } + + enum Schedule { + ABORT, + RUN, + BACKLOG + } +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7fecc53826ff..c337929eea69 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -96,6 +96,7 @@ import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.SafeCommitInfo; import org.elasticsearch.index.engine.Segment; import org.elasticsearch.index.engine.SegmentsStats; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.fielddata.FieldDataStats; import org.elasticsearch.index.fielddata.ShardFieldData; import org.elasticsearch.index.flush.FlushStats; @@ -193,6 +194,8 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard { private final ThreadPool threadPool; + @Nullable + private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final MapperService mapperService; private final IndexCache indexCache; private final Store store; @@ -316,6 +319,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final IndexEventListener indexEventListener, final CheckedFunction indexReaderWrapper, final ThreadPool threadPool, + final ThreadPoolMergeExecutorService threadPoolMergeExecutorService, final BigArrays bigArrays, final Engine.Warmer warmer, final List searchOperationListener, @@ -342,6 +346,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.indexSortSupplier = indexSortSupplier; this.indexEventListener = indexEventListener; this.threadPool = threadPool; + this.threadPoolMergeExecutorService = threadPoolMergeExecutorService; this.mapperService = mapperService; this.indexCache = indexCache; this.internalIndexingStats = new InternalIndexingStats(); @@ -3545,6 +3550,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new EngineConfig( shardId, threadPool, + threadPoolMergeExecutorService, indexSettings, warmer, store, diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 9931332eb3c2..6e0d53a176ce 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -99,6 +99,7 @@ import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.engine.ReadOnlyEngine; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; @@ -232,6 +233,8 @@ public class IndicesService extends AbstractLifecycleComponent private final IndicesFieldDataCache indicesFieldDataCache; private final CacheCleaner cacheCleaner; private final ThreadPool threadPool; + @Nullable + private final ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private final CircuitBreakerService circuitBreakerService; private final BigArrays bigArrays; private final ScriptService scriptService; @@ -288,6 +291,10 @@ public class IndicesService extends AbstractLifecycleComponent IndicesService(IndicesServiceBuilder builder) { this.settings = builder.settings; this.threadPool = builder.threadPool; + this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + settings + ); this.pluginsService = builder.pluginsService; this.nodeEnv = builder.nodeEnv; this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE) @@ -784,6 +791,7 @@ public class IndicesService extends AbstractLifecycleComponent circuitBreakerService, bigArrays, threadPool, + threadPoolMergeExecutorService, scriptService, clusterService, client, @@ -1919,4 +1927,9 @@ public class IndicesService extends AbstractLifecycleComponent public BigArrays getBigArrays() { return bigArrays; } + + @Nullable + public ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService() { + return threadPoolMergeExecutorService; + } } diff --git a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java index 9698ce6b65cd..b8dddc20cc51 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java +++ b/server/src/main/java/org/elasticsearch/threadpool/DefaultBuiltInExecutorBuilders.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.threadpool.internal.BuiltInExecutorBuilders; import java.util.HashMap; @@ -141,6 +142,12 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders { false ) ); + if (ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.get(settings)) { + result.put( + ThreadPool.Names.MERGE, + new ScalingExecutorBuilder(ThreadPool.Names.MERGE, 1, allocatedProcessors, TimeValue.timeValueMinutes(5), true) + ); + } result.put( ThreadPool.Names.FORCE_MERGE, new FixedExecutorBuilder( diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 96d82793a3f4..85ee02b6db85 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -134,6 +134,7 @@ public class ThreadPool implements ReportingService, Scheduler, public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; public static final String SNAPSHOT_META = "snapshot_meta"; + public static final String MERGE = "merge"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; @@ -192,6 +193,7 @@ public class ThreadPool implements ReportingService, Scheduler, entry(Names.WARMER, ThreadPoolType.SCALING), entry(Names.SNAPSHOT, ThreadPoolType.SCALING), entry(Names.SNAPSHOT_META, ThreadPoolType.SCALING), + entry(Names.MERGE, ThreadPoolType.SCALING), entry(Names.FORCE_MERGE, ThreadPoolType.FIXED), entry(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING), entry(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING), diff --git a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java index c519d4834148..cf1b05bc2963 100644 --- a/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -59,6 +59,8 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperRegistry; @@ -158,6 +160,7 @@ public class IndexModuleTests extends ESTestCase { }; private MapperRegistry mapperRegistry; private ThreadPool threadPool; + private ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private CircuitBreakerService circuitBreakerService; private BigArrays bigArrays; private ScriptService scriptService; @@ -170,6 +173,7 @@ public class IndexModuleTests extends ESTestCase { settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) .build(); indicesQueryCache = new IndicesQueryCache(settings); indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); @@ -188,6 +192,7 @@ public class IndexModuleTests extends ESTestCase { emptyMap() ); threadPool = new TestThreadPool("test"); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); circuitBreakerService = new NoneCircuitBreakerService(); PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings); bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST); @@ -214,6 +219,7 @@ public class IndexModuleTests extends ESTestCase { circuitBreakerService, bigArrays, threadPool, + threadPoolMergeExecutorService, scriptService, clusterService, null, diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index e154a3d62b4e..e1128a329023 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2578,10 +2578,10 @@ public class InternalEngineTests extends EngineTestCase { public void append(LogEvent event) { final String formattedMessage = event.getMessage().getFormattedMessage(); if (event.getLevel() == Level.TRACE && event.getMarker().getName().contains("[index][0]")) { - if (formattedMessage.startsWith("merge thread")) { + if (formattedMessage.startsWith("merge task")) { messages.add(formattedMessage); } else if (event.getLoggerName().endsWith(".MS") - && formattedMessage.contains("MS: merge thread") + && formattedMessage.contains("MS: merge task") && formattedMessage.endsWith("end")) { luceneMergeSchedulerEnded.set(true); } @@ -2616,14 +2616,14 @@ public class InternalEngineTests extends EngineTestCase { }); assertBusy(() -> { - List threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge thread")).toList(); + List threadMsgs = mockAppender.messages().stream().filter(line -> line.startsWith("merge task")).toList(); assertThat("messages:" + threadMsgs, threadMsgs.size(), greaterThanOrEqualTo(3)); assertThat( threadMsgs, containsInRelativeOrder( - matchesRegex("^merge thread .* start$"), - matchesRegex("^merge thread .* merge segment.*$"), - matchesRegex("^merge thread .* end$") + matchesRegex("^merge task .* start$"), + matchesRegex("^merge task .* merge segment.*$"), + matchesRegex("^merge task .* end$") ) ); assertThat(mockAppender.mergeCompleted(), is(true)); @@ -3587,6 +3587,7 @@ public class InternalEngineTests extends EngineTestCase { EngineConfig brokenConfig = new EngineConfig( shardId, threadPool, + threadPoolMergeExecutorService, config.getIndexSettings(), null, store, @@ -7149,6 +7150,7 @@ public class InternalEngineTests extends EngineTestCase { EngineConfig configWithWarmer = new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), config.getIndexSettings(), warmer, store, diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java new file mode 100644 index 000000000000..0a99c5002d5a --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeExecutorServiceTests.java @@ -0,0 +1,690 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.MAX_IO_RATE; +import static org.elasticsearch.index.engine.ThreadPoolMergeExecutorService.MIN_IO_RATE; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.ABORT; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.BACKLOG; +import static org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule.RUN; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ThreadPoolMergeExecutorServiceTests extends ESTestCase { + + public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() { + TestThreadPool testThreadPool = new TestThreadPool("test"); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + // shutdown the thread pool + testThreadPool.shutdown(); + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + assertFalse(threadPoolMergeExecutorService.submitMergeTask(mergeTask)); + verify(mergeTask).abort(); + verify(mergeTask, times(0)).schedule(); + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(1)).abort(); + assertTrue(threadPoolMergeExecutorService.allDone()); + } + + public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutdown() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + // more merges than threads so that some are enqueued + int mergesToSubmit = mergeExecutorThreadCount + randomIntBetween(1, 5); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + TestThreadPool testThreadPool = new TestThreadPool("test", settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + Semaphore runMergeSemaphore = new Semaphore(0); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + AtomicInteger doneMergesCount = new AtomicInteger(0); + // submit more merge tasks than there are threads so that some are enqueued + for (int i = 0; i < mergesToSubmit; i++) { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + Schedule runOrAbort = randomFrom(RUN, ABORT); + doAnswer(mock -> { + // merges can be backlogged, but will be re-enqueued + Schedule schedule = randomFrom(BACKLOG, runOrAbort); + if (schedule == BACKLOG) { + // reenqueue backlogged merge task + new Thread(() -> threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask)).start(); + } + return schedule; + }).when(mergeTask).schedule(); + doAnswer(mock -> { + // wait to be signalled before completing + if (runOrAbort == ABORT) { + fail("merge task ran but it should've aborted instead"); + } + runMergeSemaphore.acquireUninterruptibly(); + doneMergesCount.incrementAndGet(); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing + if (runOrAbort == RUN) { + fail("merge task aborted but it should've ran instead"); + } + runMergeSemaphore.acquireUninterruptibly(); + doneMergesCount.incrementAndGet(); + return null; + }).when(mergeTask).abort(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } + // assert merges are running and enqueued + assertBusy(() -> { + // assert that there are merge tasks running concurrently at the max allowed concurrency rate + assertThat(threadPoolExecutor.getActiveCount(), is(mergeExecutorThreadCount)); + // with the other merge tasks enqueued + assertThat(threadPoolExecutor.getQueue().size(), is(mergesToSubmit - mergeExecutorThreadCount)); + }); + // shutdown prevents new merge tasks to be enqueued but existing ones should be allowed to continue + testThreadPool.shutdown(); + // assert all executors, except the merge one, are terminated + for (String executorName : ThreadPool.THREAD_POOL_TYPES.keySet()) { + assertTrue(testThreadPool.executor(executorName).isShutdown()); + if (ThreadPool.Names.MERGE.equals(executorName)) { + assertFalse(testThreadPool.executor(executorName).isTerminated()); + } else { + assertTrue(testThreadPool.executor(executorName).isTerminated()); + } + } + for (int i = 0; i < mergesToSubmit; i++) { + // closing the thread pool is delayed because there are running and/or enqueued merge tasks + assertFalse(testThreadPool.awaitTermination(1, TimeUnit.NANOSECONDS)); + assertTrue(threadPoolExecutor.isShutdown()); + assertFalse(threadPoolExecutor.isTerminated()); + // let merges run one by one and check thread pool + runMergeSemaphore.release(); + int completedMergesCount = i + 1; + assertBusy(() -> { + assertThat(doneMergesCount.get(), is(completedMergesCount)); + assertThat(threadPoolExecutor.getCompletedTaskCount(), is((long) completedMergesCount)); + // active threads still working on the remaining merges + assertThat( + threadPoolExecutor.getActiveCount(), + is(Math.min(mergeExecutorThreadCount, mergesToSubmit - completedMergesCount)) + ); + // with any of the other merges still enqueued + assertThat( + threadPoolExecutor.getQueue().size(), + is(Math.max(mergesToSubmit - mergeExecutorThreadCount - completedMergesCount, 0)) + ); + }); + } + assertBusy(() -> { + assertTrue(testThreadPool.awaitTermination(1, TimeUnit.NANOSECONDS)); + assertTrue(threadPoolExecutor.isShutdown()); + assertTrue(threadPoolExecutor.isTerminated()); + assertTrue(threadPoolMergeExecutorService.allDone()); + }); + } + + public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + int mergesStillToSubmit = randomIntBetween(10, 100); + int mergesStillToComplete = mergesStillToSubmit; + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + Semaphore runMergeSemaphore = new Semaphore(0); + AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger(); + while (mergesStillToComplete > 0) { + if (mergesStillToSubmit > 0 && (threadPoolMergeExecutorService.getRunningMergeTasks().isEmpty() || randomBoolean())) { + // submit new merge task + MergeTask mergeTask = mock(MergeTask.class); + boolean supportsIOThrottling = randomBoolean(); + when(mergeTask.supportsIOThrottling()).thenReturn(supportsIOThrottling); + doAnswer(mock -> { + Schedule schedule = randomFrom(Schedule.values()); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // reenqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } + return schedule; + }).when(mergeTask).schedule(); + doAnswer(mock -> { + // wait to be signalled before completing + runMergeSemaphore.acquire(); + if (supportsIOThrottling) { + submittedIOThrottledMergeTasks.decrementAndGet(); + } + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing + runMergeSemaphore.acquire(); + if (supportsIOThrottling) { + submittedIOThrottledMergeTasks.decrementAndGet(); + } + return null; + }).when(mergeTask).abort(); + long currentIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + if (supportsIOThrottling) { + submittedIOThrottledMergeTasks.incrementAndGet(); + } + long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + if (supportsIOThrottling) { + if (submittedIOThrottledMergeTasks.get() < threadPoolMergeExecutorService + .getConcurrentMergesFloorLimitForThrottling()) { + // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued + assertThat(newIORate, either(is(MIN_IO_RATE.getBytes())).or(lessThan(currentIORate))); + } else if (submittedIOThrottledMergeTasks.get() > threadPoolMergeExecutorService + .getConcurrentMergesCeilLimitForThrottling()) { + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newIORate, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(currentIORate))); + } else { + // assert the IO rate does NOT change when there are a couple of merge tasks enqueued + assertThat(newIORate, equalTo(currentIORate)); + } + } else { + // assert the IO rate does not change, when the merge task doesn't support IO throttling + assertThat(newIORate, equalTo(currentIORate)); + } + mergesStillToSubmit--; + } else { + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + long completedMerges = threadPoolExecutor.getCompletedTaskCount(); + runMergeSemaphore.release(); + // await merge to finish + assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1))); + mergesStillToComplete--; + } + } + assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + } + } + + public void testIORateIsAdjustedForRunningMergeTasks() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 3); + int mergesStillToSubmit = randomIntBetween(1, 10); + int mergesStillToComplete = mergesStillToSubmit; + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + Semaphore runMergeSemaphore = new Semaphore(0); + Set currentlyRunningMergeTasksSet = ConcurrentCollections.newConcurrentSet(); + while (mergesStillToComplete > 0) { + if (mergesStillToSubmit > 0 && (currentlyRunningMergeTasksSet.isEmpty() || randomBoolean())) { + MergeTask mergeTask = mock(MergeTask.class); + // all tasks support IO throttling in this test case + when(mergeTask.supportsIOThrottling()).thenReturn(true); + doAnswer(mock -> { + Schedule schedule = randomFrom(Schedule.values()); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // reenqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } + return schedule; + }).when(mergeTask).schedule(); + doAnswer(mock -> { + currentlyRunningMergeTasksSet.add(mergeTask); + // wait to be signalled before completing + runMergeSemaphore.acquire(); + currentlyRunningMergeTasksSet.remove(mergeTask); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + // wait to be signalled before completing + runMergeSemaphore.acquire(); + return null; + }).when(mergeTask).abort(); + int activeMergeTasksCount = threadPoolExecutor.getActiveCount(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + long newIORate = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + // all currently running merge tasks must be IO throttled + assertBusy(() -> { + // await new merge to start executing + if (activeMergeTasksCount < mergeExecutorThreadCount) { + assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeTasksCount + 1)); + } + // assert IO throttle is set on the running merge tasks + for (MergeTask currentlyRunningMergeTask : currentlyRunningMergeTasksSet) { + var ioRateCaptor = ArgumentCaptor.forClass(Long.class); + // only interested in the last invocation + verify(currentlyRunningMergeTask, atLeastOnce()).setIORateLimit(ioRateCaptor.capture()); + assertThat(ioRateCaptor.getValue(), is(newIORate)); + } + }); + mergesStillToSubmit--; + } else { + long completedMerges = threadPoolExecutor.getCompletedTaskCount(); + runMergeSemaphore.release(); + // await merge to finish + assertBusy(() -> assertThat(threadPoolExecutor.getCompletedTaskCount(), is(completedMerges + 1))); + mergesStillToComplete--; + } + } + assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + } + } + + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() { + // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted + int submittedVsExecutedRateOutOf1000 = randomIntBetween(0, 250); + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); + // executor starts running merges only after a considerable amount of merge tasks have already been submitted + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); + } + + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() { + // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted + int submittedVsExecutedRateOutOf1000 = randomIntBetween(750, 1000); + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); + // executor starts running merges only after a considerable amount of merge tasks have already been submitted + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); + } + + public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() { + // the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted + int submittedVsExecutedRateOutOf1000 = randomIntBetween(250, 750); + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5)); + // executor starts running merges only after a considerable amount of merge tasks have already been submitted + testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50)); + } + + private void testIORateAdjustedForSubmittedTasks( + int totalTasksToSubmit, + int submittedVsExecutedRateOutOf1000, + int initialTasksToSubmit + ) { + DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue(); + ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool); + final AtomicInteger currentlySubmittedMergeTaskCount = new AtomicInteger(); + final AtomicLong targetIORateLimit = new AtomicLong(ThreadPoolMergeExecutorService.START_IO_RATE.getBytes()); + final AtomicReference lastRunTask = new AtomicReference<>(); + int initialTasksCounter = Math.min(initialTasksToSubmit, totalTasksToSubmit); + while (totalTasksToSubmit > 0 || mergeExecutorTaskQueue.hasAnyTasks()) { + if (mergeExecutorTaskQueue.hasAnyTasks() == false // always submit if there are no outstanding merge tasks + || initialTasksCounter > 0 // first submit all the initial tasks + || (randomIntBetween(0, 1000) < submittedVsExecutedRateOutOf1000 && totalTasksToSubmit > 0)) { + // submit new merge task + MergeTask mergeTask = mock(MergeTask.class); + // all merge tasks support IO throttling in this test + when(mergeTask.supportsIOThrottling()).thenReturn(true); + // always run the task + when(mergeTask.schedule()).thenReturn(RUN); + doAnswer(mock -> { + lastRunTask.set(mergeTask); + return null; + }).when(mergeTask).run(); + currentlySubmittedMergeTaskCount.incrementAndGet(); + totalTasksToSubmit--; + initialTasksCounter--; + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + long newTargetIORateLimit = threadPoolMergeExecutorService.getTargetIORateBytesPerSec(); + if (currentlySubmittedMergeTaskCount.get() < threadPoolMergeExecutorService.getConcurrentMergesFloorLimitForThrottling()) { + // assert the IO rate decreases, with a floor limit, when there are few merge tasks enqueued + assertThat(newTargetIORateLimit, either(is(MIN_IO_RATE.getBytes())).or(lessThan(targetIORateLimit.get()))); + } else if (currentlySubmittedMergeTaskCount.get() > threadPoolMergeExecutorService + .getConcurrentMergesCeilLimitForThrottling()) { + // assert the IO rate increases, with a ceiling limit, when there are many merge tasks enqueued + assertThat(newTargetIORateLimit, either(is(MAX_IO_RATE.getBytes())).or(greaterThan(targetIORateLimit.get()))); + } else { + // assert the IO rate does change, when there are a couple of merge tasks enqueued + assertThat(newTargetIORateLimit, equalTo(targetIORateLimit.get())); + } + targetIORateLimit.set(newTargetIORateLimit); + } else { + // execute already submitted merge task + if (runOneTask(mergeExecutorTaskQueue)) { + // task is done, no longer just submitted + currentlySubmittedMergeTaskCount.decrementAndGet(); + // assert IO rate is invoked on the merge task that just ran + assertNotNull(lastRunTask.get()); + var ioRateCaptor = ArgumentCaptor.forClass(Long.class); + verify(lastRunTask.get(), times(1)).setIORateLimit(ioRateCaptor.capture()); + assertThat(ioRateCaptor.getValue(), is(targetIORateLimit.get())); + lastRunTask.set(null); + } + } + } + assertTrue(threadPoolMergeExecutorService.allDone()); + } + + public void testMergeTasksRunConcurrently() throws Exception { + // at least 2 merges allowed to run concurrently + int mergeExecutorThreadCount = randomIntBetween(2, 5); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + // more merge tasks than max concurrent merges allowed to run concurrently + int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween(1, 5); + Semaphore runMergeSemaphore = new Semaphore(0); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + // submit all merge tasks + for (int i = 0; i < totalMergeTasksCount; i++) { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + doAnswer(mock -> { + // each individual merge task can either "run" or be "backlogged" + Schedule schedule = randomFrom(RUN, BACKLOG); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // reenqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } + return schedule; + }).when(mergeTask).schedule(); + doAnswer(mock -> { + // wait to be signalled before completing + runMergeSemaphore.acquire(); + return null; + }).when(mergeTask).run(); + doAnswer(mock -> { + fail("This test doesn't deal with aborted merge tasks"); + return null; + }).when(mergeTask).abort(); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } + // assert stats while merge tasks finish + for (int completedTasksCount = 0; completedTasksCount < totalMergeTasksCount + - mergeExecutorThreadCount; completedTasksCount++) { + int finalCompletedTasksCount = completedTasksCount; + assertBusy(() -> { + // assert that there are merge tasks running concurrently at the max allowed concurrency rate + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); + // with the other merge tasks enqueued + assertThat( + threadPoolMergeExecutorService.getQueuedMergeTasks().size(), + is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount) + ); + // also check thread-pool stats for the same + assertThat(threadPoolExecutor.getActiveCount(), is(mergeExecutorThreadCount)); + assertThat( + threadPoolExecutor.getQueue().size(), + is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount) + ); + }); + // let one merge task finish running + runMergeSemaphore.release(); + } + // there are now fewer merge tasks still running than available threads + for (int remainingMergeTasksCount = mergeExecutorThreadCount; remainingMergeTasksCount >= 0; remainingMergeTasksCount--) { + int finalRemainingMergeTasksCount = remainingMergeTasksCount; + assertBusy(() -> { + // there are fewer available merges than available threads + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergeTasksCount)); + // no more merges enqueued + assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + // also check thread-pool stats for the same + assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount)); + assertThat(threadPoolExecutor.getQueue().size(), is(0)); + }); + // let one merge task finish running + runMergeSemaphore.release(); + } + assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + } + } + + public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 3); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + int totalMergeTasksCount = randomIntBetween(1, 10); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + List backloggedMergeTasksList = new ArrayList<>(); + for (int i = 0; i < totalMergeTasksCount; i++) { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + boolean runNowOrBacklog = randomBoolean(); + if (runNowOrBacklog) { + when(mergeTask.schedule()).thenReturn(randomFrom(RUN, ABORT)); + } else { + // first backlog, then run + when(mergeTask.schedule()).thenReturn(BACKLOG, randomFrom(RUN, ABORT)); + backloggedMergeTasksList.add(mergeTask); + } + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } + assertBusy(() -> { + // all runnable merge tasks should show as "completed" + assertThat(threadPoolExecutor.getCompletedTaskCount(), is((long) (totalMergeTasksCount - backloggedMergeTasksList.size()))); + if (backloggedMergeTasksList.size() >= mergeExecutorThreadCount) { + // active tasks waiting for backlogged merge tasks to be re-enqueued + assertThat(threadPoolExecutor.getActiveCount(), is(mergeExecutorThreadCount)); + assertThat(threadPoolExecutor.getQueue().size(), is(backloggedMergeTasksList.size() - mergeExecutorThreadCount)); + } else { + assertThat(threadPoolExecutor.getActiveCount(), is(backloggedMergeTasksList.size())); + assertThat(threadPoolExecutor.getQueue().size(), is(0)); + } + assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + }); + // re-enqueue backlogged merge tasks + for (MergeTask backloggedMergeTask : backloggedMergeTasksList) { + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(backloggedMergeTask); + } + assertBusy(() -> { + // all merge tasks should now show as "completed" + assertThat(threadPoolExecutor.getCompletedTaskCount(), is((long) totalMergeTasksCount)); + assertThat(threadPoolExecutor.getActiveCount(), is(0)); + assertThat(threadPoolExecutor.getQueue().size(), is(0)); + assertTrue(threadPoolMergeExecutorService.allDone()); + }); + } + } + + public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception { + int mergeExecutorThreadCount = randomIntBetween(1, 3); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true) + // few merge threads, in order to increase contention + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + // many merge tasks concurrently + int mergeTaskCount = randomIntBetween(10, 100); + CountDownLatch mergeTasksReadyLatch = new CountDownLatch(mergeTaskCount); + CountDownLatch submitTaskLatch = new CountDownLatch(1); + Collection runMergeTasks = ConcurrentCollections.newConcurrentSet(); + Collection abortMergeTasks = ConcurrentCollections.newConcurrentSet(); + for (int i = 0; i < mergeTaskCount; i++) { + new Thread(() -> { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + doAnswer(mock -> { + // each individual merge task can either "run" or be "backlogged" + Schedule schedule = randomFrom(RUN, ABORT, BACKLOG); + if (schedule == BACKLOG) { + testThreadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + // reenqueue backlogged merge task + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + }); + } + if (schedule == RUN) { + runMergeTasks.add(mergeTask); + } + if (schedule == ABORT) { + abortMergeTasks.add(mergeTask); + } + return schedule; + }).when(mergeTask).schedule(); + mergeTasksReadyLatch.countDown(); + // make all threads submit merge tasks at once + safeAwait(submitTaskLatch); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + }).start(); + } + safeAwait(mergeTasksReadyLatch); + submitTaskLatch.countDown(); + assertBusy(() -> { + assertThat(runMergeTasks.size() + abortMergeTasks.size(), is(mergeTaskCount)); + for (MergeTask mergeTask : runMergeTasks) { + verify(mergeTask, times(1)).run(); + verify(mergeTask, times(0)).abort(); + if (mergeTask.supportsIOThrottling() == false) { + verify(mergeTask, times(0)).setIORateLimit(anyLong()); + } + } + for (MergeTask mergeTask : abortMergeTasks) { + verify(mergeTask, times(0)).run(); + verify(mergeTask, times(1)).abort(); + verify(mergeTask, times(0)).setIORateLimit(anyLong()); + } + assertTrue(threadPoolMergeExecutorService.allDone()); + }); + } + } + + public void testMergeTasksExecuteInSizeOrder() { + DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue(); + ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool); + DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue(); + int mergeTaskCount = randomIntBetween(10, 100); + // sort merge tasks available to run by size + PriorityQueue mergeTasksAvailableToRun = new PriorityQueue<>( + mergeTaskCount, + Comparator.comparingLong(MergeTask::estimatedMergeSize) + ); + for (int i = 0; i < mergeTaskCount; i++) { + MergeTask mergeTask = mock(MergeTask.class); + when(mergeTask.supportsIOThrottling()).thenReturn(randomBoolean()); + // merge tasks of various sizes (0 might be a valid value) + when(mergeTask.estimatedMergeSize()).thenReturn(randomLongBetween(0, 10)); + doAnswer(mock -> { + // each individual merge task can either "run" or be "backlogged" at any point in time + Schedule schedule = randomFrom(Schedule.values()); + // in either case, the merge task is, at least temporarily, not "available" to run + mergeTasksAvailableToRun.remove(mergeTask); + // if merge task cannot run, it is backlogged, and should be re enqueued some time in the future + if (schedule == BACKLOG) { + // reenqueue backlogged merge task sometime in the future + reEnqueueBackloggedTaskQueue.scheduleNow(() -> { + // reenqueue backlogged merge task sometime in the future + threadPoolMergeExecutorService.reEnqueueBackloggedMergeTask(mergeTask); + // the merge task should once again be "available" to run + mergeTasksAvailableToRun.add(mergeTask); + }); + } + // hack: avoid blocking for unavailable merge task by running one re-enqueuing task now + if (schedule == BACKLOG && mergeTasksAvailableToRun.isEmpty()) { + assertTrue(runOneTask(reEnqueueBackloggedTaskQueue)); + } + if (schedule == RUN && mergeTasksAvailableToRun.isEmpty() == false) { + // assert the merge task that's now going to run is the smallest of the ones currently available to run + assertThat(mergeTask.estimatedMergeSize(), lessThanOrEqualTo(mergeTasksAvailableToRun.peek().estimatedMergeSize())); + } + return schedule; + }).when(mergeTask).schedule(); + mergeTasksAvailableToRun.add(mergeTask); + threadPoolMergeExecutorService.submitMergeTask(mergeTask); + } + while (true) { + // re-enqueue merge tasks + if (mergeTasksAvailableToRun.isEmpty() || randomBoolean()) { + boolean backlogReEnqueued = runOneTask(reEnqueueBackloggedTaskQueue); + if (mergeTasksAvailableToRun.isEmpty() && backlogReEnqueued == false) { + // test complete, all merges ran, and none is backlogged + assertFalse(mergeExecutorTaskQueue.hasAnyTasks()); + assertFalse(reEnqueueBackloggedTaskQueue.hasAnyTasks()); + assertTrue(threadPoolMergeExecutorService.allDone()); + break; + } + } else { + // run one merge task + runOneTask(mergeExecutorTaskQueue); + } + } + } + + static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService + .maybeCreateThreadPoolMergeExecutorService( + threadPool, + randomBoolean() + ? Settings.EMPTY + : Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build() + ); + assertNotNull(threadPoolMergeExecutorService); + assertTrue(threadPoolMergeExecutorService.allDone()); + return threadPoolMergeExecutorService; + } + + private static boolean runOneTask(DeterministicTaskQueue deterministicTaskQueue) { + while (deterministicTaskQueue.hasAnyTasks()) { + if (deterministicTaskQueue.hasRunnableTasks()) { + deterministicTaskQueue.runRandomTask(); + return true; + } else { + deterministicTaskQueue.advanceTime(); + } + } + return false; + } +} diff --git a/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java new file mode 100644 index 000000000000..5e085c083b78 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerTests.java @@ -0,0 +1,496 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.engine; + +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.MergePolicy.OneMerge; +import org.apache.lucene.index.MergeScheduler.MergeSource; +import org.apache.lucene.index.MergeTrigger; +import org.apache.lucene.store.MergeInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MergeSchedulerConfig; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +public class ThreadPoolMergeSchedulerTests extends ESTestCase { + + public void testMergesExecuteInSizeOrder() throws IOException { + DeterministicTaskQueue threadPoolTaskQueue = new DeterministicTaskQueue(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests + .getThreadPoolMergeExecutorService(threadPoolTaskQueue.getThreadPool()); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), + threadPoolMergeExecutorService + ) + ) { + List executedMergesList = new ArrayList<>(); + int mergeCount = randomIntBetween(2, 10); + for (int i = 0; i < mergeCount; i++) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + doAnswer(invocation -> { + OneMerge merge = (OneMerge) invocation.getArguments()[0]; + assertFalse(merge.isAborted()); + executedMergesList.add(merge); + return null; + }).when(mergeSource).merge(any(OneMerge.class)); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + } + threadPoolTaskQueue.runAllTasks(); + assertThat(executedMergesList.size(), is(mergeCount)); + // assert merges are executed in ascending size order + for (int i = 1; i < mergeCount; i++) { + assertThat( + executedMergesList.get(i - 1).getStoreMergeInfo().estimatedMergeBytes(), + lessThanOrEqualTo(executedMergesList.get(i).getStoreMergeInfo().estimatedMergeBytes()) + ); + } + } + assertTrue(threadPoolMergeExecutorService.allDone()); + } + + public void testSimpleMergeTaskBacklogging() { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + // close method waits for running merges to finish, but this test leaves running merges around + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService + ); + // more merge tasks than merge threads + int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 5); + for (int i = 0; i < mergeCount; i++) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + Schedule schedule = threadPoolMergeScheduler.schedule( + threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values())) + ); + if (i < mergeExecutorThreadCount) { + assertThat(schedule, is(Schedule.RUN)); + } else { + assertThat(schedule, is(Schedule.BACKLOG)); + } + } + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(mergeCount - mergeExecutorThreadCount)); + } + + public void testSimpleMergeTaskReEnqueueingBySize() { + int mergeExecutorThreadCount = randomIntBetween(1, 5); + Settings mergeSchedulerSettings = Settings.builder() + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + // close method waits for running merges to finish, but this test leaves running merges around + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", mergeSchedulerSettings), + threadPoolMergeExecutorService + ); + // sort backlogged merges by size + PriorityQueue backloggedMergeTasks = new PriorityQueue<>(16, Comparator.comparingLong(MergeTask::estimatedMergeSize)); + // more merge tasks than merge threads + int mergeCount = mergeExecutorThreadCount + randomIntBetween(2, 10); + for (int i = 0; i < mergeCount; i++) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + MergeTask mergeTask = threadPoolMergeScheduler.newMergeTask(mergeSource, oneMerge, randomFrom(MergeTrigger.values())); + Schedule schedule = threadPoolMergeScheduler.schedule(mergeTask); + if (i < mergeExecutorThreadCount) { + assertThat(schedule, is(Schedule.RUN)); + } else { + assertThat(schedule, is(Schedule.BACKLOG)); + backloggedMergeTasks.add(mergeTask); + } + } + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(backloggedMergeTasks.size())); + int enqueuedTasksCount = mergeCount - mergeExecutorThreadCount; + for (int i = 0; i < enqueuedTasksCount; i++) { + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(enqueuedTasksCount - i)); + MergeTask runningMergeTask = randomFrom(threadPoolMergeScheduler.getRunningMergeTasks().values()); + runningMergeTask.run(); + var submittedMergeTaskCaptor = ArgumentCaptor.forClass(MergeTask.class); + verify(threadPoolMergeExecutorService, times(i + 1)).reEnqueueBackloggedMergeTask(submittedMergeTaskCaptor.capture()); + assertThat(submittedMergeTaskCaptor.getValue(), is(backloggedMergeTasks.poll())); + Schedule schedule = threadPoolMergeScheduler.schedule(submittedMergeTaskCaptor.getValue()); + assertThat(schedule, is(Schedule.RUN)); + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeExecutorThreadCount)); + } + } + + public void testMergeSourceWithFollowUpMergesRunSequentially() throws Exception { + // test with min 2 allowed concurrent merges + int mergeExecutorThreadCount = randomIntBetween(2, 5); + Settings settings = Settings.builder() + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeExecutorThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests + .getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", settings), + threadPoolMergeExecutorService + ) + ) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge firstMerge = mock(OneMerge.class); + when(firstMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(firstMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + // at least one followup merge + null (i.e. no more followups) + int followUpMergeCount = randomIntBetween(2, 10); + OneMerge[] followUpMerges = new OneMerge[followUpMergeCount]; + followUpMerges[followUpMergeCount - 1] = null; + for (int i = 0; i < followUpMergeCount - 1; i++) { + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + followUpMerges[i] = oneMerge; + } + // the merge source with follow-up merges + when(mergeSource.getNextMerge()).thenReturn(firstMerge, followUpMerges); + AtomicBoolean isMergeInProgress = new AtomicBoolean(); + AtomicInteger runMergeIdx = new AtomicInteger(); + Semaphore runMergeSemaphore = new Semaphore(0); + Semaphore nextMergeSemaphore = new Semaphore(0); + doAnswer(invocation -> { + // assert only one merge can be in-progress at any point-in-time + assertTrue(isMergeInProgress.compareAndSet(false, true)); + OneMerge mergeInvocation = (OneMerge) invocation.getArguments()[0]; + assertFalse(mergeInvocation.isAborted()); + // assert merges run in the order they are produced by the merge source + if (runMergeIdx.get() == 0) { + assertThat(mergeInvocation, is(firstMerge)); + } else { + assertThat(mergeInvocation, is(followUpMerges[runMergeIdx.get() - 1])); + } + runMergeIdx.incrementAndGet(); + // await before returning from the merge in order to really ensure that follow-up merges don't run concurrently + nextMergeSemaphore.release(); + runMergeSemaphore.acquire(); + assertTrue(isMergeInProgress.compareAndSet(true, false)); + return null; + }).when(mergeSource).merge(any(OneMerge.class)); + // trigger run merges on the merge source + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + do { + // let merges run, but wait for the in-progress one to signal it is running + nextMergeSemaphore.acquire(); + runMergeSemaphore.release(); + } while (runMergeIdx.get() < followUpMergeCount); + assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + } + } + } + + public void testMergesRunConcurrently() throws Exception { + // min 2 allowed concurrent merges, per scheduler + int mergeSchedulerMaxThreadCount = randomIntBetween(2, 4); + // the merge executor has at least 1 extra thread available + int mergeExecutorThreadCount = mergeSchedulerMaxThreadCount + randomIntBetween(1, 3); + Settings settings = Settings.builder() + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests + .getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", settings), + threadPoolMergeExecutorService + ) + ) { + // at least 1 extra merge than there are concurrently allowed + int mergeCount = mergeExecutorThreadCount + randomIntBetween(1, 10); + Semaphore runMergeSemaphore = new Semaphore(0); + for (int i = 0; i < mergeCount; i++) { + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + doAnswer(invocation -> { + OneMerge merge = (OneMerge) invocation.getArguments()[0]; + assertFalse(merge.isAborted()); + // wait to be signalled before completing + runMergeSemaphore.acquire(); + return null; + }).when(mergeSource).merge(any(OneMerge.class)); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + } + for (int completedMergesCount = 0; completedMergesCount < mergeCount + - mergeSchedulerMaxThreadCount; completedMergesCount++) { + int finalCompletedMergesCount = completedMergesCount; + assertBusy(() -> { + // assert that there are merges running concurrently at the max allowed concurrency rate + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount)); + // with the other merges backlogged + assertThat( + threadPoolMergeScheduler.getBackloggedMergeTasks().size(), + is(mergeCount - mergeSchedulerMaxThreadCount - finalCompletedMergesCount) + ); + // also check the same for the thread-pool executor + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeSchedulerMaxThreadCount)); + // queued merge tasks do not include backlogged merges + assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + // also check thread-pool stats for the same + // there are active thread-pool threads waiting for the backlogged merge tasks to be re-enqueued + int activeMergeThreads = Math.min(mergeCount - finalCompletedMergesCount, mergeExecutorThreadCount); + assertThat(threadPoolExecutor.getActiveCount(), is(activeMergeThreads)); + assertThat(threadPoolExecutor.getQueue().size(), is(mergeCount - finalCompletedMergesCount - activeMergeThreads)); + }); + // let one merge task finish running + runMergeSemaphore.release(); + } + // there are now fewer merges still running than available threads + for (int remainingMergesCount = mergeSchedulerMaxThreadCount; remainingMergesCount >= 0; remainingMergesCount--) { + int finalRemainingMergesCount = remainingMergesCount; + assertBusy(() -> { + // there are fewer available merges than available threads + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(finalRemainingMergesCount)); + // no more backlogged merges + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0)); + // also check thread-pool executor for the same + assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergesCount)); + // no more backlogged merges + assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0)); + // also check thread-pool stats for the same + assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergesCount)); + assertThat(threadPoolExecutor.getQueue().size(), is(0)); + }); + // let one merge task finish running + runMergeSemaphore.release(); + } + assertBusy(() -> assertTrue(threadPoolMergeExecutorService.allDone())); + } + } + } + + public void testSchedulerCloseWaitsForRunningMerge() throws Exception { + int mergeSchedulerMaxThreadCount = randomIntBetween(1, 3); + int mergeExecutorThreadCount = randomIntBetween(1, 3); + Settings settings = Settings.builder() + .put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount) + .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), mergeSchedulerMaxThreadCount) + .build(); + try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) { + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorServiceTests + .getThreadPoolMergeExecutorService(testThreadPool); + assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount)); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + IndexSettingsModule.newIndexSettings("index", settings), + threadPoolMergeExecutorService + ) + ) { + CountDownLatch mergeDoneLatch = new CountDownLatch(1); + MergeSource mergeSource = mock(MergeSource.class); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomLongBetween(1L, 10L))); + when(oneMerge.getMergeProgress()).thenReturn(new MergePolicy.OneMergeProgress()); + when(mergeSource.getNextMerge()).thenReturn(oneMerge, (OneMerge) null); + doAnswer(invocation -> { + OneMerge merge = (OneMerge) invocation.getArguments()[0]; + assertFalse(merge.isAborted()); + // wait to be signalled before completing the merge + mergeDoneLatch.await(); + return null; + }).when(mergeSource).merge(any(OneMerge.class)); + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + Thread t = new Thread(() -> { + try { + threadPoolMergeScheduler.close(); + } catch (IOException e) { + fail(e); + } + }); + t.start(); + try { + assertTrue(t.isAlive()); + // ensure the merge scheduler is effectively "closed" + assertBusy(() -> { + MergeSource mergeSource2 = mock(MergeSource.class); + threadPoolMergeScheduler.merge(mergeSource2, randomFrom(MergeTrigger.values())); + // when the merge scheduler is closed it won't pull in any new merges from the merge source + verifyNoInteractions(mergeSource2); + }); + // assert the merge still shows up as "running" + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().keySet(), contains(oneMerge)); + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0)); + assertTrue(t.isAlive()); + // signal the merge to finish + mergeDoneLatch.countDown(); + } finally { + t.join(); + } + assertBusy(() -> { + assertThat(threadPoolMergeScheduler.getRunningMergeTasks().size(), is(0)); + assertThat(threadPoolMergeScheduler.getBackloggedMergeTasks().size(), is(0)); + assertTrue(threadPoolMergeExecutorService.allDone()); + }); + } + } + } + + public void testAutoIOThrottleForMergeTasksWhenSchedulerDisablesIt() throws Exception { + // merge scheduler configured with auto IO throttle disabled + Settings settings = Settings.builder().put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), false).build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress(); + OneMerge oneMerge = mock(OneMerge.class); + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong())); + when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress); + MergeSource mergeSource = mock(MergeSource.class); + when(mergeSource.getNextMerge()).thenReturn(oneMerge); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + indexSettings, + threadPoolMergeExecutorService + ) + ) { + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + var submittedMergeTaskCaptor = ArgumentCaptor.forClass(MergeTask.class); + verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture()); + assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling()); + } + } + + public void testAutoIOThrottleForMergeTasks() throws Exception { + final Settings.Builder settingsBuilder = Settings.builder(); + // merge scheduler configured with auto IO throttle enabled + if (randomBoolean()) { + settingsBuilder.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), true); + } + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settingsBuilder.build()); + MergePolicy.OneMergeProgress oneMergeProgress = new MergePolicy.OneMergeProgress(); + OneMerge oneMerge = mock(OneMerge.class); + // forced merge with a set number of segments + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong(), randomNonNegativeInt())); + when(oneMerge.getMergeProgress()).thenReturn(oneMergeProgress); + MergeSource mergeSource = mock(MergeSource.class); + when(mergeSource.getNextMerge()).thenReturn(oneMerge); + ThreadPoolMergeExecutorService threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + indexSettings, + threadPoolMergeExecutorService + ) + ) { + threadPoolMergeScheduler.merge(mergeSource, randomFrom(MergeTrigger.values())); + var submittedMergeTaskCaptor = ArgumentCaptor.forClass(MergeTask.class); + verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture()); + // forced merge tasks should not be IO throttled + assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling()); + } + // NOT a forced merge + when(oneMerge.getStoreMergeInfo()).thenReturn(getNewMergeInfo(randomNonNegativeLong(), -1)); + threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + indexSettings, + threadPoolMergeExecutorService + ) + ) { + // merge submitted upon closing + threadPoolMergeScheduler.merge(mergeSource, MergeTrigger.CLOSING); + var submittedMergeTaskCaptor = ArgumentCaptor.forClass(MergeTask.class); + verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture()); + // merge tasks submitted when closing should not be IO throttled + assertFalse(submittedMergeTaskCaptor.getValue().supportsIOThrottling()); + } + // otherwise, merge tasks should be auto IO throttled + threadPoolMergeExecutorService = mock(ThreadPoolMergeExecutorService.class); + try ( + ThreadPoolMergeScheduler threadPoolMergeScheduler = new ThreadPoolMergeScheduler( + new ShardId("index", "_na_", 1), + indexSettings, + threadPoolMergeExecutorService + ) + ) { + // merge submitted upon closing + threadPoolMergeScheduler.merge( + mergeSource, + randomValueOtherThan(MergeTrigger.CLOSING, () -> randomFrom(MergeTrigger.values())) + ); + var submittedMergeTaskCaptor = ArgumentCaptor.forClass(MergeTask.class); + verify(threadPoolMergeExecutorService).submitMergeTask(submittedMergeTaskCaptor.capture()); + // merge tasks should be auto IO throttled + assertTrue(submittedMergeTaskCaptor.getValue().supportsIOThrottling()); + } + } + + private static MergeInfo getNewMergeInfo(long estimatedMergeBytes) { + return getNewMergeInfo(estimatedMergeBytes, randomFrom(-1, randomNonNegativeInt())); + } + + private static MergeInfo getNewMergeInfo(long estimatedMergeBytes, int maxNumSegments) { + return new MergeInfo(randomNonNegativeInt(), estimatedMergeBytes, randomBoolean(), maxNumSegments); + } +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 8c325c945a7a..38d89f08378b 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -41,8 +41,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { private final AtomicLong currentTimeMillis = new AtomicLong(); @Override - protected ThreadPool setUpThreadPool() { - return new TestThreadPool(getClass().getName(), threadPoolSettings()) { + protected ThreadPool setUpThreadPool(Settings settings) { + return new TestThreadPool(getClass().getName(), settings) { @Override public long absoluteTimeInMillis() { return currentTimeMillis.get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 975565b73a0d..faf782e02c6c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -5033,6 +5033,7 @@ public class IndexShardTests extends IndexShardTestCase { EngineConfig configWithWarmer = new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), config.getIndexSettings(), warmer, config.getStore(), diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index ca616dc619ec..4e280f544378 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -41,6 +41,8 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.LuceneDocument; import org.elasticsearch.index.mapper.MapperService; @@ -89,6 +91,7 @@ public class RefreshListenersTests extends ESTestCase { private Engine engine; private volatile int maxListeners; private ThreadPool threadPool; + private ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private Store store; @Before @@ -97,6 +100,11 @@ public class RefreshListenersTests extends ESTestCase { maxListeners = randomIntBetween(2, 1000); // Now setup the InternalEngine which is much more complicated because we aren't mocking anything threadPool = new TestThreadPool(getTestName()); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) + .build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); listeners = new RefreshListeners( () -> maxListeners, () -> engine.refresh("too-many-listeners"), @@ -105,7 +113,6 @@ public class RefreshListenersTests extends ESTestCase { new MeanMetric() ); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); ShardId shardId = new ShardId(new Index("index", "_na_"), 1); Directory directory = newDirectory(); store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId)); @@ -134,6 +141,7 @@ public class RefreshListenersTests extends ESTestCase { EngineConfig config = new EngineConfig( shardId, threadPool, + threadPoolMergeExecutorService, indexSettings, null, store, diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 7a2f37500187..d4554df1617e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -155,6 +155,7 @@ public abstract class EngineTestCase extends ESTestCase { protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY); protected ThreadPool threadPool; + protected ThreadPoolMergeExecutorService threadPoolMergeExecutorService; protected TranslogHandler translogHandler; protected Store store; @@ -197,6 +198,7 @@ public abstract class EngineTestCase extends ESTestCase { between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)) ) .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), between(0, 1000)) + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) .build(); } @@ -241,6 +243,11 @@ public abstract class EngineTestCase extends ESTestCase { } defaultSettings = IndexSettingsModule.newIndexSettings("index", indexSettings()); threadPool = new TestThreadPool(getClass().getName()); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService( + threadPool, + defaultSettings.getNodeSettings() + ); + store = createStore(); storeReplica = createStore(); Lucene.cleanLuceneIndex(store.directory()); @@ -272,6 +279,7 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -304,6 +312,7 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -336,6 +345,7 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), config.getIndexSettings(), config.getWarmer(), config.getStore(), @@ -840,6 +850,7 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig( shardId, threadPool, + threadPoolMergeExecutorService, indexSettings, null, store, @@ -880,6 +891,7 @@ public abstract class EngineTestCase extends ESTestCase { return new EngineConfig( config.getShardId(), config.getThreadPool(), + config.getThreadPoolMergeExecutorService(), indexSettings, config.getWarmer(), store, diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 2ae4bb034310..e8286835e9cf 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -54,6 +54,8 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngineFactory; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.MapperMetrics; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.SourceToParse; @@ -152,6 +154,7 @@ public abstract class IndexShardTestCase extends ESTestCase { }; protected ThreadPool threadPool; + protected ThreadPoolMergeExecutorService threadPoolMergeExecutorService; protected Executor writeExecutor; protected long primaryTerm; @@ -166,14 +169,16 @@ public abstract class IndexShardTestCase extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - threadPool = setUpThreadPool(); + Settings settings = threadPoolSettings(); + threadPool = setUpThreadPool(settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); writeExecutor = threadPool.executor(ThreadPool.Names.WRITE); primaryTerm = randomIntBetween(1, 100); // use random but fixed term for creating shards failOnShardFailures(); } - protected ThreadPool setUpThreadPool() { - return new TestThreadPool(getClass().getName(), threadPoolSettings()); + protected ThreadPool setUpThreadPool(Settings settings) { + return new TestThreadPool(getClass().getName(), settings); } @Override @@ -203,7 +208,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } public Settings threadPoolSettings() { - return Settings.EMPTY; + return Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()).build(); } protected Store createStore(IndexSettings indexSettings, ShardPath shardPath) throws IOException { @@ -537,6 +542,7 @@ public abstract class IndexShardTestCase extends ESTestCase { indexEventListener, indexReaderWrapper, threadPool, + threadPoolMergeExecutorService, BigArrays.NON_RECYCLING_INSTANCE, warmer, Collections.emptyList(), diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 9e1dd348bf13..33ad36e22b45 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -134,6 +134,7 @@ import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MockEngineFactoryPlugin; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.mapper.MockFieldFilterPlugin; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesQueryCache; @@ -1635,12 +1636,39 @@ public abstract class ESIntegTestCase extends ESTestCase { * Waits for all relocations and force merge all indices in the cluster to 1 segment. */ protected BroadcastResponse forceMerge() { + return forceMerge(randomBoolean()); + } + + /** + * Waits for all relocations and force merge all indices in the cluster to 1 segment. + */ + protected BroadcastResponse forceMerge(boolean assertOneSegment) { waitForRelocation(); BroadcastResponse actionGet = indicesAdmin().prepareForceMerge().setMaxNumSegments(1).get(); assertNoFailures(actionGet); + if (assertOneSegment) { + // after a force merge there should only be 1 segment per shard + var shardsWithMultipleSegments = getShardSegments().stream() + .filter(shardSegments -> shardSegments.getSegments().size() > 1) + .toList(); + assertTrue("there are shards with multiple segments " + shardsWithMultipleSegments, shardsWithMultipleSegments.isEmpty()); + } return actionGet; } + /** + * Returns the segments of the shards of the indices. + */ + protected List getShardSegments(String... indices) { + IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indices).get(); + return indicesSegmentResponse.getIndices() + .values() + .stream() + .flatMap(indexSegments -> indexSegments.getShards().values().stream()) + .flatMap(indexShardSegments -> Stream.of(indexShardSegments.shards())) + .toList(); + } + /** * Returns true iff the given index exists otherwise false */ @@ -2076,6 +2104,7 @@ public abstract class ESIntegTestCase extends ESTestCase { builder.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK.getKey(), randomFrom("1KB", "16KB", "64KB")); builder.put(IndexingPressure.SPLIT_BULK_HIGH_WATERMARK_SIZE.getKey(), "256B"); } + builder.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6bc1833e1036..7494b3a8bc9c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1418,6 +1418,20 @@ public final class InternalTestCluster extends TestCluster { } } + public void assertMergeExecutorIsDone() throws Exception { + assertBusy(() -> { + for (String nodeName : getNodeNames()) { + IndicesService indicesService = getInstance(IndicesService.class, nodeName); + if (indicesService.getThreadPoolMergeExecutorService() != null) { + assertTrue( + "thread pool merge executor is not done after test", + indicesService.getThreadPoolMergeExecutorService().allDone() + ); + } + } + }); + } + public void assertNoInFlightDocsInEngine() throws Exception { assertBusy(() -> { for (String nodeName : getNodeNames()) { @@ -2526,6 +2540,7 @@ public final class InternalTestCluster extends TestCluster { assertRequestsFinished(); assertSearchContextsReleased(); assertNoInFlightDocsInEngine(); + assertMergeExecutorIsDone(); awaitIndexShardCloseAsyncTasks(); for (NodeAndClient nodeAndClient : nodes.values()) { NodeEnvironment env = nodeAndClient.node().getNodeEnvironment(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 62dc3313a117..957570918cde 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -34,6 +34,8 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService; +import org.elasticsearch.index.engine.ThreadPoolMergeScheduler; import org.elasticsearch.index.engine.TranslogHandler; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; @@ -82,6 +84,7 @@ import static org.hamcrest.Matchers.instanceOf; public class FollowingEngineTests extends ESTestCase { private ThreadPool threadPool; + private ThreadPoolMergeExecutorService threadPoolMergeExecutorService; private Index index; private ShardId shardId; private AtomicLong primaryTerm = new AtomicLong(); @@ -91,7 +94,11 @@ public class FollowingEngineTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - threadPool = new TestThreadPool("following-engine-tests"); + Settings settings = Settings.builder() + .put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), randomBoolean()) + .build(); + threadPool = new TestThreadPool("following-engine-tests", settings); + threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings); index = new Index("index", "uuid"); shardId = new ShardId(index, 0); primaryTerm.set(randomLongBetween(1, Long.MAX_VALUE)); @@ -113,7 +120,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new FollowingEngine(engineConfig)); assertThat(e, hasToString(containsString("a following engine can not be constructed for a non-following index"))); } @@ -137,7 +144,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final VersionType versionType = randomFrom(VersionType.INTERNAL, VersionType.EXTERNAL, VersionType.EXTERNAL_GTE); final List ops = EngineTestCase.generateSingleDocHistory(true, versionType, 2, 2, 20, "id"); @@ -156,7 +163,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final Engine.Index indexToTest = indexForFollowing("id", seqNo, origin); consumer.accept(followingEngine, indexToTest); @@ -182,7 +189,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { final String id = "id"; final Engine.Delete delete = new Engine.Delete( @@ -208,7 +215,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.index(indexForFollowing("id", 128, Engine.Operation.Origin.PRIMARY)); int addedNoops = followingEngine.fillSeqNoGaps(primaryTerm.get()); @@ -221,6 +228,7 @@ public class FollowingEngineTests extends ESTestCase { final ShardId shardIdValue, final IndexSettings indexSettings, final ThreadPool threadPool, + final ThreadPoolMergeExecutorService threadPoolMergeExecutorService, final Store store ) throws IOException { final IndexWriterConfig indexWriterConfig = newIndexWriterConfig(); @@ -235,6 +243,7 @@ public class FollowingEngineTests extends ESTestCase { return new EngineConfig( shardIdValue, threadPool, + threadPoolMergeExecutorService, indexSettings, null, store, @@ -506,7 +515,13 @@ public class FollowingEngineTests extends ESTestCase { IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, Settings.EMPTY); try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { - EngineConfig followerConfig = engineConfig(shardId, followerIndexSettings, threadPool, followerStore); + EngineConfig followerConfig = engineConfig( + shardId, + followerIndexSettings, + threadPool, + threadPoolMergeExecutorService, + followerStore + ); followerStore.createEmpty(); String translogUuid = Translog.createEmptyTranslog( followerConfig.getTranslogConfig().getTranslogPath(), @@ -613,7 +628,7 @@ public class FollowingEngineTests extends ESTestCase { IndexSettings leaderIndexSettings = new IndexSettings(leaderIndexMetadata, leaderSettings); try (Store leaderStore = createStore(shardId, leaderIndexSettings, newDirectory())) { leaderStore.createEmpty(); - EngineConfig leaderConfig = engineConfig(shardId, leaderIndexSettings, threadPool, leaderStore); + EngineConfig leaderConfig = engineConfig(shardId, leaderIndexSettings, threadPool, threadPoolMergeExecutorService, leaderStore); leaderStore.associateIndexWithNewTranslog( Translog.createEmptyTranslog( leaderConfig.getTranslogConfig().getTranslogPath(), @@ -629,7 +644,13 @@ public class FollowingEngineTests extends ESTestCase { IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, leaderSettings); try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { - EngineConfig followerConfig = engineConfig(shardId, followerIndexSettings, threadPool, followerStore); + EngineConfig followerConfig = engineConfig( + shardId, + followerIndexSettings, + threadPool, + threadPoolMergeExecutorService, + followerStore + ); try (FollowingEngine followingEngine = createEngine(followerStore, followerConfig)) { wrappedTask.accept(leaderEngine, followingEngine); } @@ -809,7 +830,7 @@ public class FollowingEngineTests extends ESTestCase { final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); primaryTerm.set(oldTerm); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); final Map operationWithTerms = new HashMap<>(); @@ -882,7 +903,7 @@ public class FollowingEngineTests extends ESTestCase { final IndexMetadata indexMetadata = IndexMetadata.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetadata, settings); try (Store store = createStore(shardId, indexSettings, newDirectory())) { - final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store); + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, threadPoolMergeExecutorService, store); try (FollowingEngine engine = createEngine(store, engineConfig)) { AtomicBoolean running = new AtomicBoolean(true); Thread rollTranslog = new Thread(() -> { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java index c9a1a82b3411..ef7fd2c6b065 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/RetrySearchIntegTests.java @@ -57,7 +57,8 @@ public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase equalTo(0) ); refresh(indexName); - forceMerge(); + // force merge with expunge deletes is not merging down to one segment only + forceMerge(false); final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createRepository(repositoryName, "fs"); @@ -125,7 +126,8 @@ public class RetrySearchIntegTests extends BaseSearchableSnapshotsIntegTestCase equalTo(0) ); refresh(indexName); - forceMerge(); + // force merge with expunge deletes is not merging down to one segment only + forceMerge(false); final String repositoryName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); createRepository(repositoryName, "fs"); diff --git a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java index 4564187299bd..7c596bff281e 100644 --- a/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java +++ b/x-pack/plugin/snapshot-based-recoveries/src/internalClusterTest/java/org/elasticsearch/xpack/snapshotbasedrecoveries/recovery/SnapshotBasedIndexRecoveryIT.java @@ -465,7 +465,7 @@ public class SnapshotBasedIndexRecoveryIT extends AbstractSnapshotIntegTestCase int numDocs = randomIntBetween(300, 1000); indexDocs(indexName, 0, numDocs); - forceMerge(); + forceMerge(false); String repoName = "repo"; createRepo(repoName, TestRepositoryPlugin.INSTRUMENTED_TYPE);