diff --git a/docs/changelog/83832.yaml b/docs/changelog/83832.yaml new file mode 100644 index 000000000000..5d50b0cf7a2d --- /dev/null +++ b/docs/changelog/83832.yaml @@ -0,0 +1,6 @@ +pr: 83832 +summary: Push back excessive stats requests +area: Stats +type: enhancement +issues: + - 51992 diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index 3f99e754c3ea..185d19013dc2 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -91,6 +91,10 @@ using metrics. `transport`:: Transport statistics about sent and received bytes in cluster communication. + + `stats_requests`:: + Statistics about stats requests such as indices stats, nodes stats, + recovery stats etc. -- ``:: @@ -2633,6 +2637,37 @@ search requests on the keyed node. The rank of this node; used for shard selection when routing search requests. ====== +====== + +[[cluster-nodes-stats-api-response-body-stats-requests]] +`stats_requests`:: +(object) +Contains statistics about the stats requests the node has received. ++ +.Properties of `stats_requests` +[%collapsible%open] +====== +``:: +(object) +Contains statistics about a specific type of a stats request the node has received. ++ +.Properties of `` +[%collapsible%open] +======= +`current`:: +(integer) +Number of stats requests currently in progress. + +`completed`:: +(integer) +Number of stats requests that have been completed by the node (successfully or +not). + +`rejected`:: +(integer) +Number of stats requests that were rejected by the node because it had reached +the limit of concurrent stats requests (`node.stats.max_concurrent_requests`). +======= ===== ==== diff --git a/docs/reference/modules/cluster/misc.asciidoc b/docs/reference/modules/cluster/misc.asciidoc index 83adaef9ec1a..4590f6870205 100644 --- a/docs/reference/modules/cluster/misc.asciidoc +++ b/docs/reference/modules/cluster/misc.asciidoc @@ -101,6 +101,24 @@ number of shards for each node, use the setting. -- +[[stats-requests-limit]] +===== Stats request limit + +A stats request might require information from all nodes to be aggregated before it returns to the user. +These requests can be heavy and they put extra pressure on the coordinating node (the node collecting the +responses from all the nodes), for this reason there is a limit on the concurrent requests that a node can coordinate. + +-- + +[[node-stats-max-concurrent-requests]] +`node.stats.max_concurrent_requests`:: ++ +-- +(<>) +Limits the stats requests a coordinating node can concurrently handle. Defaults to `100`. + + + [[user-defined-data]] ===== User-defined cluster metadata diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java index 96f3f0d1495f..82850fd67b7d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/stats/IndexStatsIT.java @@ -79,6 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful; @@ -1386,52 +1387,57 @@ public class IndexStatsIT extends ESIntegTestCase { } // start threads that will get stats concurrently with indexing - for (int i = 0; i < numberOfStatsThreads; i++) { - final Thread thread = new Thread(() -> { - try { - barrier.await(); - } catch (final BrokenBarrierException | InterruptedException e) { - failed.set(true); - executionFailures.get().add(e); - latch.countDown(); - } - final IndicesStatsRequest request = new IndicesStatsRequest(); - request.all(); - request.indices(new String[0]); - while (stop.get() == false) { + try { + updateClusterSettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), numberOfStatsThreads + 1)); + for (int i = 0; i < numberOfStatsThreads; i++) { + final Thread thread = new Thread(() -> { try { - final IndicesStatsResponse response = client().admin().indices().stats(request).get(); - if (response.getFailedShards() > 0) { - failed.set(true); - shardFailures.get().addAll(Arrays.asList(response.getShardFailures())); - latch.countDown(); - } - } catch (final ExecutionException | InterruptedException e) { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { failed.set(true); executionFailures.get().add(e); latch.countDown(); } - } - }); - thread.setName("stats-" + i); - threads.add(thread); - thread.start(); + final IndicesStatsRequest request = new IndicesStatsRequest(); + request.all(); + request.indices(new String[0]); + while (stop.get() == false) { + try { + final IndicesStatsResponse response = client().admin().indices().stats(request).get(); + if (response.getFailedShards() > 0) { + failed.set(true); + shardFailures.get().addAll(Arrays.asList(response.getShardFailures())); + latch.countDown(); + } + } catch (final ExecutionException | InterruptedException e) { + failed.set(true); + executionFailures.get().add(e); + latch.countDown(); + } + } + }); + thread.setName("stats-" + i); + threads.add(thread); + thread.start(); + } + + // release the hounds + barrier.await(); + + // wait for a failure, or for fifteen seconds to elapse + latch.await(15, TimeUnit.SECONDS); + + // stop all threads and wait for them to complete + stop.set(true); + for (final Thread thread : threads) { + thread.join(); + } + + assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class)); + assertThat(executionFailures.get(), emptyCollectionOf(Exception.class)); + } finally { + updateClusterSettings(Settings.builder().putNull(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey())); } - - // release the hounds - barrier.await(); - - // wait for a failure, or for fifteen seconds to elapse - latch.await(15, TimeUnit.SECONDS); - - // stop all threads and wait for them to complete - stop.set(true); - for (final Thread thread : threads) { - thread.join(); - } - - assertThat(shardFailures.get(), emptyCollectionOf(DefaultShardOperationFailedException.class)); - assertThat(executionFailures.get(), emptyCollectionOf(Exception.class)); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index 85364d587989..40372dd6e974 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -8,8 +8,10 @@ package org.elasticsearch.action.admin.cluster.node.info; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -33,6 +35,7 @@ public class TransportNodesInfoAction extends TransportNodesAction< NodeInfo> { private final NodeService nodeService; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportNodesInfoAction( @@ -40,7 +43,8 @@ public class TransportNodesInfoAction extends TransportNodesAction< ClusterService clusterService, TransportService transportService, NodeService nodeService, - ActionFilters actionFilters + ActionFilters actionFilters, + StatsRequestLimiter statsRequestLimiter ) { super( NodesInfoAction.NAME, @@ -54,6 +58,7 @@ public class TransportNodesInfoAction extends TransportNodesAction< NodeInfo.class ); this.nodeService = nodeService; + this.statsRequestLimiter = statsRequestLimiter; } @Override @@ -94,6 +99,11 @@ public class TransportNodesInfoAction extends TransportNodesAction< ); } + @Override + protected void doExecute(Task task, NodesInfoRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } + public static class NodeInfoRequest extends TransportRequest { NodesInfoRequest request; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 0af4ddefd5bd..1d88bea494c4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -8,6 +8,8 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.StatsRequestStats; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -87,6 +89,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private IndexingPressureStats indexingPressureStats; + @Nullable + private StatsRequestStats statsRequestStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -107,6 +112,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { ingestStats = in.readOptionalWriteable(IngestStats::new); adaptiveSelectionStats = in.readOptionalWriteable(AdaptiveSelectionStats::new); indexingPressureStats = in.readOptionalWriteable(IndexingPressureStats::new); + if (in.getVersion().onOrAfter(Version.V_8_2_0)) { + statsRequestStats = in.readOptionalWriteable(StatsRequestStats::new); + } } public NodeStats( @@ -126,7 +134,8 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable IngestStats ingestStats, @Nullable AdaptiveSelectionStats adaptiveSelectionStats, @Nullable ScriptCacheStats scriptCacheStats, - @Nullable IndexingPressureStats indexingPressureStats + @Nullable IndexingPressureStats indexingPressureStats, + @Nullable StatsRequestStats statsRequestStats ) { super(node); this.timestamp = timestamp; @@ -145,6 +154,7 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { this.adaptiveSelectionStats = adaptiveSelectionStats; this.scriptCacheStats = scriptCacheStats; this.indexingPressureStats = indexingPressureStats; + this.statsRequestStats = statsRequestStats; } public long getTimestamp() { @@ -249,6 +259,11 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { return indexingPressureStats; } + @Nullable + public StatsRequestStats getStatsRequestStats() { + return statsRequestStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -272,6 +287,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { out.writeOptionalWriteable(ingestStats); out.writeOptionalWriteable(adaptiveSelectionStats); out.writeOptionalWriteable(indexingPressureStats); + if (out.getVersion().onOrAfter(Version.V_8_2_0)) { + out.writeOptionalWriteable(statsRequestStats); + } } @Override @@ -341,6 +359,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { if (getIndexingPressureStats() != null) { getIndexingPressureStats().toXContent(builder, params); } + if (getStatsRequestStats() != null) { + getStatsRequestStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java index 004a83e130f5..1eb55b0a897f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -189,7 +189,8 @@ public class NodesStatsRequest extends BaseNodesRequest { INGEST("ingest"), ADAPTIVE_SELECTION("adaptive_selection"), SCRIPT_CACHE("script_cache"), - INDEXING_PRESSURE("indexing_pressure"),; + INDEXING_PRESSURE("indexing_pressure"), + STATS_REQUESTS("stats_requests"),; private String metricName; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 16701e1ad01f..e0b38bc23cff 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -8,8 +8,10 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -36,6 +38,7 @@ public class TransportNodesStatsAction extends TransportNodesAction< NodeStats> { private final NodeService nodeService; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportNodesStatsAction( @@ -43,7 +46,8 @@ public class TransportNodesStatsAction extends TransportNodesAction< ClusterService clusterService, TransportService transportService, NodeService nodeService, - ActionFilters actionFilters + ActionFilters actionFilters, + StatsRequestLimiter statsRequestLimiter ) { super( NodesStatsAction.NAME, @@ -57,6 +61,7 @@ public class TransportNodesStatsAction extends TransportNodesAction< NodeStats.class ); this.nodeService = nodeService; + this.statsRequestLimiter = statsRequestLimiter; } @Override @@ -95,10 +100,16 @@ public class TransportNodesStatsAction extends TransportNodesAction< NodesStatsRequest.Metric.INGEST.containedIn(metrics), NodesStatsRequest.Metric.ADAPTIVE_SELECTION.containedIn(metrics), NodesStatsRequest.Metric.SCRIPT_CACHE.containedIn(metrics), - NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics) + NodesStatsRequest.Metric.INDEXING_PRESSURE.containedIn(metrics), + NodesStatsRequest.Metric.STATS_REQUESTS.containedIn(metrics) ); } + @Override + protected void doExecute(Task task, NodesStatsRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } + public static class NodeStatsRequest extends TransportRequest { NodesStatsRequest request; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java index 83bf017bd428..f0fd76b2f02f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/usage/TransportNodesUsageAction.java @@ -8,8 +8,10 @@ package org.elasticsearch.action.admin.cluster.node.usage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -36,6 +38,7 @@ public class TransportNodesUsageAction extends TransportNodesAction< private final UsageService restUsageService; private final AggregationUsageService aggregationUsageService; private final long sinceTime; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportNodesUsageAction( @@ -44,7 +47,8 @@ public class TransportNodesUsageAction extends TransportNodesAction< TransportService transportService, ActionFilters actionFilters, UsageService restUsageService, - AggregationUsageService aggregationUsageService + AggregationUsageService aggregationUsageService, + StatsRequestLimiter statsRequestLimiter ) { super( NodesUsageAction.NAME, @@ -59,6 +63,7 @@ public class TransportNodesUsageAction extends TransportNodesAction< ); this.restUsageService = restUsageService; this.aggregationUsageService = aggregationUsageService; + this.statsRequestLimiter = statsRequestLimiter; this.sinceTime = System.currentTimeMillis(); } @@ -85,6 +90,11 @@ public class TransportNodesUsageAction extends TransportNodesAction< return new NodeUsage(clusterService.localNode(), System.currentTimeMillis(), sinceTime, restUsage, aggsUsage); } + @Override + protected void doExecute(Task task, NodesUsageRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } + public static class NodeUsageRequest extends TransportRequest { NodesUsageRequest request; diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index dc4673b2ea56..cbc3219194a4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.nodes.TransportNodesAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; @@ -72,6 +73,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportClusterStatsAction( @@ -80,7 +82,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< TransportService transportService, NodeService nodeService, IndicesService indicesService, - ActionFilters actionFilters + ActionFilters actionFilters, + StatsRequestLimiter statsRequestLimiter ) { super( ClusterStatsAction.NAME, @@ -98,6 +101,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< this.indicesService = indicesService; this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); + this.statsRequestLimiter = statsRequestLimiter; } @Override @@ -183,6 +187,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< true, false, false, + false, false ); List shardsStats = new ArrayList<>(); @@ -233,6 +238,11 @@ public class TransportClusterStatsAction extends TransportNodesAction< } + @Override + protected void doExecute(Task task, ClusterStatsRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } + public static class ClusterStatsNodeRequest extends TransportRequest { ClusterStatsRequest request; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java index c5d39cdb5c19..18e062609405 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/TransportRecoveryAction.java @@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.indices.recovery; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,6 +45,7 @@ import java.util.Map; public class TransportRecoveryAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportRecoveryAction( @@ -51,7 +53,8 @@ public class TransportRecoveryAction extends TransportBroadcastByNodeAction listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } + @Nullable // unless running tests that inject extra behaviour private volatile Runnable onShardOperation; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java index 470aedb2895f..5947d556e2a2 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/TransportIndicesSegmentsAction.java @@ -11,6 +11,7 @@ package org.elasticsearch.action.admin.indices.segments; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -38,6 +39,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi ShardSegments> { private final IndicesService indicesService; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportIndicesSegmentsAction( @@ -45,7 +47,8 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + StatsRequestLimiter statsRequestLimiter ) { super( IndicesSegmentsAction.NAME, @@ -57,6 +60,7 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; + this.statsRequestLimiter = statsRequestLimiter; } /** @@ -120,4 +124,9 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastByNodeActi return new ShardSegments(indexShard.routingEntry(), indexShard.segments()); }); } + + @Override + protected void doExecute(Task task, IndicesSegmentsRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index 44bb62cd0f04..ef70043fa2c7 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -12,6 +12,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -40,6 +41,7 @@ import java.util.List; public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction { private final IndicesService indicesService; + private final StatsRequestLimiter statsRequestLimiter; @Inject public TransportIndicesStatsAction( @@ -47,7 +49,8 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + StatsRequestLimiter statsRequestLimiter ) { super( IndicesStatsAction.NAME, @@ -59,6 +62,7 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< ThreadPool.Names.MANAGEMENT ); this.indicesService = indicesService; + this.statsRequestLimiter = statsRequestLimiter; } /** @@ -144,4 +148,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastByNodeAction< ); }); } + + @Override + protected void doExecute(Task task, IndicesStatsRequest request, ActionListener listener) { + statsRequestLimiter.tryToExecute(task, request, listener, super::doExecute); + } } diff --git a/server/src/main/java/org/elasticsearch/action/support/StatsRequestLimiter.java b/server/src/main/java/org/elasticsearch/action/support/StatsRequestLimiter.java new file mode 100644 index 000000000000..7d502c95a438 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/StatsRequestLimiter.java @@ -0,0 +1,124 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AdjustableSemaphore; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.tasks.Task; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * This class guards the amount of stats requests a node can concurrently coordinate. + */ +public class StatsRequestLimiter { + + public static final Setting MAX_CONCURRENT_STATS_REQUESTS_PER_NODE = Setting.intSetting( + "node.stats.max_concurrent_requests", + 100, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private final AdjustableSemaphore maxConcurrentStatsRequestsPerNodeSemaphore; + private volatile int maxConcurrentStatsRequestsPerNode; + private final Map stats = new ConcurrentHashMap<>(); + + public StatsRequestLimiter(Settings settings, ClusterSettings clusterSettings) { + maxConcurrentStatsRequestsPerNode = MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.get(settings); + this.maxConcurrentStatsRequestsPerNodeSemaphore = new AdjustableSemaphore(maxConcurrentStatsRequestsPerNode, false); + clusterSettings.addSettingsUpdateConsumer(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE, this::setMaxConcurrentStatsRequestsPerNode); + } + + private void setMaxConcurrentStatsRequestsPerNode(int maxConcurrentStatsRequestsPerNode) { + this.maxConcurrentStatsRequestsPerNode = maxConcurrentStatsRequestsPerNode; + this.maxConcurrentStatsRequestsPerNodeSemaphore.setMaxPermits(maxConcurrentStatsRequestsPerNode); + } + + /** + * Checks if executing the action will remain within the limits of the max concurrent requests the node can handle. If the limit is + * respected the action will be executed otherwise it will throw an EsRejectedExecutionException. The method keeps track of current, + * completed and rejected requests per action type. + */ + public void tryToExecute( + Task task, + Request request, + ActionListener listener, + TriConsumer> executeAction + ) { + StatsHolder statsHolder = stats.computeIfAbsent(task.getAction(), ignored -> new StatsHolder(task.getAction())); + if (tryAcquire()) { + statsHolder.current.inc(); + final Runnable release = new RunOnce(() -> { + release(); + statsHolder.current.dec(); + statsHolder.completed.inc(); + }); + boolean success = false; + try { + executeAction.apply(task, request, ActionListener.runBefore(listener, release::run)); + success = true; + } finally { + if (success == false) { + release.run(); + } + } + } else { + listener.onFailure( + new EsRejectedExecutionException( + "this node is already coordinating [" + + maxConcurrentStatsRequestsPerNode + + "] stats requests and has reached the limit set by [" + + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey() + + "]" + ) + ); + statsHolder.rejected.inc(); + } + } + + public StatsRequestStats stats() { + return new StatsRequestStats(stats.values().stream().map(StatsHolder::stats).collect(Collectors.toList())); + } + + // visible for testing + boolean tryAcquire() { + return maxConcurrentStatsRequestsPerNodeSemaphore.tryAcquire(); + } + + // visible for testing + void release() { + maxConcurrentStatsRequestsPerNodeSemaphore.release(); + } + + static final class StatsHolder { + String request; + final CounterMetric current = new CounterMetric(); + final CounterMetric completed = new CounterMetric(); + final CounterMetric rejected = new CounterMetric(); + + StatsHolder(String request) { + this.request = request; + } + + StatsRequestStats.Stats stats() { + return new StatsRequestStats.Stats(request, current.count(), completed.count(), rejected.count()); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/StatsRequestStats.java b/server/src/main/java/org/elasticsearch/action/support/StatsRequestStats.java new file mode 100644 index 000000000000..aaa18a4fce7a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/StatsRequestStats.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.xcontent.ToXContentFragment; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +public class StatsRequestStats implements Writeable, ToXContentFragment, Iterable { + + public static class Stats implements Writeable, ToXContentFragment, Comparable { + + private final String request; + private final long current; + private final long completed; + private final long rejected; + + public Stats(String request, long current, long completed, long rejected) { + this.request = request; + this.current = current; + this.completed = completed; + this.rejected = rejected; + } + + public Stats(StreamInput in) throws IOException { + request = in.readString(); + current = in.readLong(); + completed = in.readLong(); + rejected = in.readLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(request); + out.writeLong(current); + out.writeLong(completed); + out.writeLong(rejected); + } + + public String getRequest() { + return this.request; + } + + public long getCurrent() { + return this.current; + } + + public long getCompleted() { + return this.completed; + } + + public long getRejected() { + return rejected; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(request); + if (current != -1) { + builder.field(Fields.CURRENT, current); + } + if (completed != -1) { + builder.field(Fields.COMPLETED, completed); + } + if (rejected != -1) { + builder.field(Fields.REJECTED, rejected); + } + builder.endObject(); + return builder; + } + + @Override + public int compareTo(Stats other) { + if ((getRequest() == null) && (other.getRequest() == null)) { + return 0; + } else if ((getRequest() != null) && (other.getRequest() == null)) { + return 1; + } else if (getRequest() == null) { + return -1; + } else { + int compare = getRequest().compareTo(other.getRequest()); + if (compare == 0) { + compare = Long.compare(getCompleted(), other.getCompleted()); + } + return compare; + } + } + } + + private List stats; + + public StatsRequestStats(List stats) { + Collections.sort(stats); + this.stats = stats; + } + + public StatsRequestStats(StreamInput in) throws IOException { + stats = in.readList(Stats::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(stats); + } + + @Override + public Iterator iterator() { + return stats.iterator(); + } + + static final class Fields { + static final String CURRENT = "current"; + static final String COMPLETED = "completed"; + static final String REJECTED = "rejected"; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("stats_requests"); + for (Stats stat : stats) { + stat.toXContent(builder, params); + } + builder.endObject(); + return builder; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index d8ee643fb28b..f82bb516d14d 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.DestructiveOperations; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.bootstrap.BootstrapSettings; import org.elasticsearch.client.internal.Client; @@ -508,7 +509,8 @@ public final class ClusterSettings extends AbstractScopedSettings { FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, IndexingPressure.MAX_INDEXING_BYTES, ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE_FROZEN, - DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING + DataTier.ENFORCE_DEFAULT_TIER_PREFERENCE_SETTING, + StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE ); static List> BUILT_IN_SETTING_UPGRADERS = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 37766ab2f656..8960925915cc 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.search.SearchExecutionStatsCollector; import org.elasticsearch.action.search.SearchPhaseController; import org.elasticsearch.action.search.SearchTransportService; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.bootstrap.BootstrapCheck; @@ -809,6 +810,8 @@ public class Node implements Closeable { ); clusterInfoService.addListener(diskThresholdMonitor::onNewInfo); + final StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(settings, settingsModule.getClusterSettings()); + final DiscoveryModule discoveryModule = new DiscoveryModule( settings, transportService, @@ -842,7 +845,8 @@ public class Node implements Closeable { responseCollectorService, searchTransportService, indexingLimits, - searchModule.getValuesSourceRegistry().getUsageService() + searchModule.getValuesSourceRegistry().getUsageService(), + statsRequestLimiter ); final SearchService searchService = newSearchService( @@ -983,6 +987,7 @@ public class Node implements Closeable { b.bind(IndexSettingProviders.class).toInstance(indexSettingProviders); b.bind(DesiredNodesSettingsValidator.class).toInstance(desiredNodesSettingsValidator); b.bind(HealthService.class).toInstance(healthService); + b.bind(StatsRequestLimiter.class).toInstance(statsRequestLimiter); }); injector = modules.createInjector(); diff --git a/server/src/main/java/org/elasticsearch/node/NodeService.java b/server/src/main/java/org/elasticsearch/node/NodeService.java index acfe00ef9e6c..70ed72e804e1 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeService.java +++ b/server/src/main/java/org/elasticsearch/node/NodeService.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.search.SearchTransportService; +import org.elasticsearch.action.support.StatsRequestLimiter; import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; @@ -52,6 +53,7 @@ public class NodeService implements Closeable { private final SearchTransportService searchTransportService; private final IndexingPressure indexingPressure; private final AggregationUsageService aggregationUsageService; + private final StatsRequestLimiter statsRequestLimiter; private final Coordinator coordinator; @@ -72,7 +74,8 @@ public class NodeService implements Closeable { ResponseCollectorService responseCollectorService, SearchTransportService searchTransportService, IndexingPressure indexingPressure, - AggregationUsageService aggregationUsageService + AggregationUsageService aggregationUsageService, + StatsRequestLimiter statsRequestLimiter ) { this.settings = settings; this.threadPool = threadPool; @@ -90,6 +93,7 @@ public class NodeService implements Closeable { this.searchTransportService = searchTransportService; this.indexingPressure = indexingPressure; this.aggregationUsageService = aggregationUsageService; + this.statsRequestLimiter = statsRequestLimiter; clusterService.addStateApplier(ingestService); } @@ -139,7 +143,8 @@ public class NodeService implements Closeable { boolean ingest, boolean adaptiveSelection, boolean scriptCache, - boolean indexingPressure + boolean indexingPressure, + boolean statsRequests ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -160,7 +165,8 @@ public class NodeService implements Closeable { ingest ? ingestService.stats() : null, adaptiveSelection ? responseCollectorService.getAdaptiveStats(searchTransportService.getPendingSearchRequests()) : null, scriptCache ? scriptService.cacheStats() : null, - indexingPressure ? this.indexingPressure.stats() : null + indexingPressure ? this.indexingPressure.stats() : null, + statsRequests ? this.statsRequestLimiter.stats() : null ); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index 402bb7468c16..95b145867e36 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.action.admin.cluster.node.stats; +import org.elasticsearch.action.support.StatsRequestStats; import org.elasticsearch.cluster.coordination.ClusterStateSerializationStats; import org.elasticsearch.cluster.coordination.PendingClusterStateStats; import org.elasticsearch.cluster.coordination.PublishClusterStateStats; @@ -509,6 +510,21 @@ public class NodeStatsTests extends ESTestCase { assertEquals(limited, sum.getCompilationLimitTriggered()); assertEquals(compilations, sum.getCompilations()); } + if (nodeStats.getStatsRequestStats() == null) { + assertNull(deserializedNodeStats.getStatsRequestStats()); + } else { + Iterator statsRequestsStatsIterator = nodeStats.getStatsRequestStats().iterator(); + Iterator deserializedStatsRequestsStatsIterator = deserializedNodeStats.getStatsRequestStats() + .iterator(); + while (statsRequestsStatsIterator.hasNext()) { + StatsRequestStats.Stats stats = statsRequestsStatsIterator.next(); + StatsRequestStats.Stats deserializedStats = deserializedStatsRequestsStatsIterator.next(); + assertEquals(stats.getRequest(), deserializedStats.getRequest()); + assertEquals(stats.getCurrent(), deserializedStats.getCurrent()); + assertEquals(stats.getCompleted(), deserializedStats.getCompleted()); + assertEquals(stats.getRejected(), deserializedStats.getRejected()); + } + } } } } @@ -885,6 +901,22 @@ public class NodeStatsTests extends ESTestCase { randomLongBetween(0, maxStatValue) ); } + StatsRequestStats statsRequestStats = null; + if (frequently()) { + int numStatsRequestsStats = randomIntBetween(0, 10); + List statsRequestsStatsList = new ArrayList<>(); + for (int i = 0; i < numStatsRequestsStats; i++) { + statsRequestsStatsList.add( + new StatsRequestStats.Stats( + randomAlphaOfLengthBetween(3, 10), + randomIntBetween(1, 10), + randomIntBetween(1, 1000), + randomIntBetween(1, 1000) + ) + ); + } + statsRequestStats = new StatsRequestStats(statsRequestsStatsList); + } // TODO NodeIndicesStats are not tested here, way too complicated to create, also they need to be migrated to Writeable yet return new NodeStats( node, @@ -903,7 +935,8 @@ public class NodeStatsTests extends ESTestCase { ingestStats, adaptiveSelectionStats, scriptCacheStats, - indexingPressureStats + indexingPressureStats, + statsRequestStats ); } diff --git a/server/src/test/java/org/elasticsearch/action/support/StatsRequestLimiterTests.java b/server/src/test/java/org/elasticsearch/action/support/StatsRequestLimiterTests.java new file mode 100644 index 000000000000..9b987e993752 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/support/StatsRequestLimiterTests.java @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.TriConsumer; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; + +import static java.util.Collections.emptyMap; +import static org.elasticsearch.action.support.StatsRequestLimiter.MAX_CONCURRENT_STATS_REQUESTS_PER_NODE; + +public class StatsRequestLimiterTests extends ESTestCase { + + public void testGrantsPermitsUpToMaxPermits() throws Exception { + final int maxPermits = randomIntBetween(1, 5); + final List threads = new ArrayList<>(maxPermits); + final CyclicBarrier barrier = new CyclicBarrier(1 + maxPermits); + TriConsumer> execute = (task, i, actionListener) -> { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + fail("Exception occurred while waiting for the barrier to be lifted"); + } + actionListener.onResponse(i); + }); + thread.setName("thread-" + i); + threads.add(thread); + thread.start(); + }; + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings settings = Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), maxPermits).build(); + StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter(settings, clusterSettings); + + for (int i = 0; i < maxPermits; i++) { + PlainActionFuture listener = new PlainActionFuture<>(); + statsRequestLimiter.tryToExecute(createTask(), i, listener, execute); + } + PlainActionFuture listener = new PlainActionFuture<>(); + statsRequestLimiter.tryToExecute(createTask(), maxPermits, listener, execute); + String expectedExceptionMessage = "this node is already coordinating [" + + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.get(settings) + + "] stats requests and has reached the limit set by [" + + MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey() + + "]"; + expectThrows(EsRejectedExecutionException.class, expectedExceptionMessage, listener::actionGet); + StatsRequestStats.Stats stats = getStats(statsRequestLimiter); + assertEquals(maxPermits, stats.getCurrent()); + + barrier.await(); + for (Thread thread : threads) { + thread.join(); + } + assertBusy(() -> assertTrue(statsRequestLimiter.tryAcquire())); + stats = getStats(statsRequestLimiter); + assertEquals(0, stats.getCurrent()); + assertEquals(maxPermits, stats.getCompleted()); + assertEquals(1, stats.getRejected()); + } + + public void testStatsRequestPermitCanBeDynamicallyUpdated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter( + Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(), + clusterSettings + ); + + assertTrue(statsRequestLimiter.tryAcquire()); + assertFalse(statsRequestLimiter.tryAcquire()); + + clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 2).build()); + + assertTrue(statsRequestLimiter.tryAcquire()); + + clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build()); + + assertFalse(statsRequestLimiter.tryAcquire()); + statsRequestLimiter.release(); + statsRequestLimiter.release(); + + assertTrue(statsRequestLimiter.tryAcquire()); + assertFalse(statsRequestLimiter.tryAcquire()); + } + + public void testMaxConcurrentStatsRequestsPerNodeIsValidated() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + Settings invalidSetting = Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 0).build(); + expectThrows(IllegalArgumentException.class, () -> new StatsRequestLimiter(invalidSetting, clusterSettings)); + new StatsRequestLimiter(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(), clusterSettings); + expectThrows( + IllegalArgumentException.class, + () -> clusterSettings.applySettings(Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 0).build()) + ); + } + + public void testReleasingAfterException() { + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + StatsRequestLimiter statsRequestLimiter = new StatsRequestLimiter( + Settings.builder().put(MAX_CONCURRENT_STATS_REQUESTS_PER_NODE.getKey(), 1).build(), + clusterSettings + ); + PlainActionFuture listener = new PlainActionFuture<>(); + TriConsumer> execute = (task, input, actionListener) -> { + // Verify that we hold the last permit + assertFalse(statsRequestLimiter.tryAcquire()); + throw new RuntimeException("simulated"); + }; + expectThrows(RuntimeException.class, () -> statsRequestLimiter.tryToExecute(createTask(), 10, listener, execute)); + StatsRequestStats.Stats stats = getStats(statsRequestLimiter); + assertEquals(0, stats.getCurrent()); + assertEquals(1, stats.getCompleted()); + assertTrue(statsRequestLimiter.tryAcquire()); + } + + private StatsRequestStats.Stats getStats(StatsRequestLimiter statsRequestLimiter) { + return statsRequestLimiter.stats().iterator().next(); + } + + private Task createTask() { + return new Task( + randomLong(), + "transport", + "stats_action", + "description", + new TaskId(randomLong() + ":" + randomLong()), + emptyMap() + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index 939788fc2e37..fe2de2a3341f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -170,6 +170,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ), new NodeStats( @@ -189,6 +190,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ), new NodeStats( @@ -208,6 +210,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ) ); @@ -258,6 +261,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ), new NodeStats( @@ -277,6 +281,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ), new NodeStats( @@ -296,6 +301,7 @@ public class DiskUsageTests extends ESTestCase { null, null, null, + null, null ) ); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index b47ea46a2be3..0b4d6b421b15 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -89,7 +89,8 @@ public class MockInternalClusterInfoService extends InternalClusterInfoService { nodeStats.getIngestStats(), nodeStats.getAdaptiveSelectionStats(), nodeStats.getScriptCacheStats(), - nodeStats.getIndexingPressureStats() + nodeStats.getIndexingPressureStats(), + nodeStats.getStatsRequestStats() ); }).collect(Collectors.toList()); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index c82ceacdb962..a060a4339c24 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -2458,6 +2458,7 @@ public final class InternalTestCluster extends TestCluster { false, false, false, + false, false ); assertThat( diff --git a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java index 67cb99ca3904..7bf3fd528d1e 100644 --- a/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java +++ b/x-pack/plugin/autoscaling/src/test/java/org/elasticsearch/xpack/autoscaling/capacity/memory/AutoscalingMemoryInfoServiceTests.java @@ -368,6 +368,7 @@ public class AutoscalingMemoryInfoServiceTests extends AutoscalingTestCase { null, null, null, + null, null ); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 8708f0506c4b..49973f9d9f7a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -358,6 +358,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { ingestStats, null, null, + null, null ); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 4b22722e323f..d0fbe7603ecf 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -445,6 +445,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa null, null, null, + null, null ); }