From 1fe3b77a2aab57aec4e77f77d7ff3a231c3ecad6 Mon Sep 17 00:00:00 2001 From: Pete Gillin Date: Wed, 21 May 2025 19:04:22 +0100 Subject: [PATCH] ES-10063 Add multi-project support for more stats APIs (#127650) * Add multi-project support for more stats APIs This affects the following APIs: - `GET _nodes/stats`: - For `indices`, it now prefixes the index name with the project ID (for non-default projects). Previously, it didn't tell you which project an index was in, and it failed if two projects had the same index name. - For `ingest`, it now gets the pipeline and processor stats for all projects, and prefixes the pipeline ID with the project ID. Previously, it only got them for the default project. - `GET /_cluster/stats`: - For `ingest`, it now aggregates the pipeline and processor stats for all projects. Previously, it only got them for the default project. - `GET /_info`: - For `ingest`, same as for `GET /_nodes/stats`. This is done by making `IndicesService.stats()` and `IngestService.stats()` include project IDs in the `NodeIndicesStats` and `IngestStats` objects they return, and making those stats objects incorporate the project IDs when converting to XContent. The transitive callers of these two methods are rather extensive (including all callers to `NodeService.stats()`, all callers of `TransportNodesStatsAction`, and so on). To ensure the change is safe, the callers were all checked out, and they fall into the following cases: - The behaviour change is one of the desired enhancements described above. - There is no behaviour change because it was getting node stats but neither `indices` nor `ingest` stats were requested. - There is no behaviour change because it was getting `indices` and/or `ingest` stats but only using aggregate values. - In `MachineLearningUsageTransportAction` and `TransportGetTrainedModelsStatsAction`, the `IngestStats` returned will return stats from all projects instead of just the default with this change, but they have been changed to filter the non-default project stats out, so this change is a noop there. (These actions are not MP-ready yet.) - `MonitoringService` will be affected, but this is the legacy monitoring module which is not in use anywhere that MP is going to be enabled. (If anything, the behaviour is probably improved by this change, as it will now include project IDs, rather than producing ambiguous unqualified results and failing in the case of duplicates.) * Update test/external-modules/multi-project/build.gradle Change suggested by Niels. Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> * Respond to review comments * fix merge weirdness * [CI] Auto commit changes from spotless * Fix test compilation following upstream change to base class * Update x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datatiers/DataTierUsageFixtures.java Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> * Make projects-by-index map nullable and omit in single-project; always include project prefix in XContent in multip-project, even if default; also incorporate one other review comment * Add a TODO * update IT to reflect changed behaviour * Switch to using XContent.Params to indicate whether it is multi-project or not * Refactor NodesStatsMultiProjectIT to common up repeated assertions * Defer use of ProjectIdResolver in REST handlers to keep tests happy * Include index UUID in "unknown project" case * Make the index-to-project map empty rather than null in the BWC deserialization case. This works out fine, for the reasons given in the comment. As it happens, I'd already forgotten to do the null check in the one place it's actively used. * remove a TODO that is done, and add a comment * fix typo * Get REST YAML tests working with project ID prefix TODO finish this * As a drive-by, fix and un-suppress one of the health REST tests * [CI] Auto commit changes from spotless * TODO ugh * Experiment with different stashing behaviour * [CI] Auto commit changes from spotless * Try a more sensible stash behaviour for assertions * clarify comment * Make checkstyle happy * Make the way `Assertion` works more consistent, and simplify implementation * [CI] Auto commit changes from spotless * In RestNodesStatsAction, make the XContent params to channel.request(), which is the value it would have had before this change --------- Co-authored-by: Niels Bauman <33722607+nielsbauman@users.noreply.github.com> Co-authored-by: elasticsearchmachine --- .../ingest/common/IngestRestartIT.java | 8 +- .../test/ingest/15_info_ingest.yml | 78 ++-- .../rest-api-spec/test/ingest/70_bulk.yml | 16 +- .../test/health/40_diagnosis.yml | 2 +- .../test/nodes.stats/11_indices_metrics.yml | 6 +- .../ingest/IngestStatsNamesAndTypesIT.java | 6 +- .../org/elasticsearch/TransportVersions.java | 1 + .../elasticsearch/action/ActionModule.java | 4 +- .../admin/cluster/node/stats/NodeStats.java | 2 + .../cluster/stats/ClusterStatsNodes.java | 51 +-- .../cluster/project/ProjectIdResolver.java | 11 + .../cluster/project/ProjectResolver.java | 10 - .../elasticsearch/indices/IndicesService.java | 18 +- .../indices/NodeIndicesStats.java | 48 ++- .../elasticsearch/ingest/IngestService.java | 28 +- .../org/elasticsearch/ingest/IngestStats.java | 92 +++-- ...stRefCountedChunkedToXContentListener.java | 5 + .../admin/cluster/RestNodesStatsAction.java | 21 +- .../action/info/RestClusterInfoAction.java | 17 +- .../cluster/node/stats/NodeStatsTests.java | 18 +- .../cluster/stats/ClusterStatsNodesTests.java | 4 +- .../indices/NodeIndicesStatsTests.java | 12 +- .../ingest/IngestServiceTests.java | 144 ++++---- .../ingest/IngestStatsTests.java | 143 +++++--- .../cluster/RestNodesStatsActionTests.java | 3 +- .../info/RestClusterInfoActionTests.java | 3 +- .../multi-project/build.gradle | 1 + .../node/stats/NodesStatsMultiProjectIT.java | 341 ++++++++++++++++++ .../stats/ClusterStatsMultiProjectIT.java | 44 ++- .../test/rest/ESRestTestCase.java | 10 +- .../org/elasticsearch/test/rest/Stash.java | 24 +- .../rest/yaml/ESClientYamlSuiteTestCase.java | 2 + .../test/rest/yaml/section/Assertion.java | 4 +- .../core/datatiers/DataTierUsageFixtures.java | 9 +- ...TrainedModelsStatsActionResponseTests.java | 21 +- .../MachineLearningUsageTransportAction.java | 23 +- .../TransportGetTrainedModelsStatsAction.java | 23 +- ...chineLearningInfoTransportActionTests.java | 77 ++-- ...sportGetTrainedModelsStatsActionTests.java | 85 ++++- .../node/NodeStatsMonitoringDocTests.java | 2 +- .../build.gradle | 6 +- 41 files changed, 1066 insertions(+), 357 deletions(-) create mode 100644 test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsMultiProjectIT.java diff --git a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java index f17ad979ac9b..4ccaa55d69c3 100644 --- a/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java +++ b/modules/ingest-common/src/internalClusterTest/java/org/elasticsearch/ingest/common/IngestRestartIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.internal.Requests; import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -112,7 +113,12 @@ public class IngestRestartIT extends ESIntegTestCase { NodesStatsResponse r = clusterAdmin().prepareNodesStats(internalCluster().getNodeNames()).setIngest(true).get(); int nodeCount = r.getNodes().size(); for (int k = 0; k < nodeCount; k++) { - List stats = r.getNodes().get(k).getIngestStats().processorStats().get(pipelineId); + List stats = r.getNodes() + .get(k) + .getIngestStats() + .processorStats() + .get(ProjectId.DEFAULT) + .get(pipelineId); for (IngestStats.ProcessorStat st : stats) { assertThat(st.stats().ingestCurrent(), greaterThanOrEqualTo(0L)); } diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml index a48b188e2306..082a580b9fee 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/15_info_ingest.yml @@ -85,22 +85,22 @@ teardown: - gte: { ingest.total.failed: 0 } # Pipelines section - - is_true: ingest.pipelines.ingest_info_pipeline - - gte: { ingest.pipelines.ingest_info_pipeline.count: 2 } - - gte: { ingest.pipelines.ingest_info_pipeline.time_in_millis: 0 } - - match: { ingest.pipelines.ingest_info_pipeline.current: 0 } - - match: { ingest.pipelines.ingest_info_pipeline.failed: 0 } - - gt: { ingest.pipelines.ingest_info_pipeline.ingested_as_first_pipeline_in_bytes: 0 } - - gt: { ingest.pipelines.ingest_info_pipeline.produced_as_first_pipeline_in_bytes: 0 } + - is_true: "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline" + - gte: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.count": 2 } + - gte: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.time_in_millis": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.current": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.failed": 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.ingested_as_first_pipeline_in_bytes": 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.produced_as_first_pipeline_in_bytes": 0 } # Processors section - - is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set - - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.type: "set" } - - is_true: ingest.pipelines.ingest_info_pipeline.processors.0.set.stats - - gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.count: 2 } - - gte: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.time_in_millis: 0 } - - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.current: 0 } - - match: { ingest.pipelines.ingest_info_pipeline.processors.0.set.stats.failed: 0 } + - is_true: "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set" + - match: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.type": "set" } + - is_true: "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.stats" + - gte: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.stats.count": 2 } + - gte: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.stats.time_in_millis": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.stats.current": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}ingest_info_pipeline.processors.0.set.stats.failed": 0 } --- "Test bytes_produced not increased when pipeline fails": @@ -128,9 +128,9 @@ teardown: - do: cluster.info: target: [ ingest ] - - match: { ingest.pipelines.pipeline-1.failed: 1 } - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - match: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.failed": 1 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": 0 } --- "Test drop processor": @@ -156,8 +156,8 @@ teardown: - do: cluster.info: target: [ ingest ] - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - match: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": 0 } --- "Test that pipeline processor has byte stats recorded in first pipeline": @@ -210,11 +210,11 @@ teardown: - do: cluster.info: target: [ ingest ] - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - set: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: ingest_bytes } - - gt: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: $ingest_bytes } - - match: { ingest.pipelines.pipeline-2.ingested_as_first_pipeline_in_bytes: 0 } - - match: { ingest.pipelines.pipeline-2.produced_as_first_pipeline_in_bytes: 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - set: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": ingest_bytes } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": $ingest_bytes } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.ingested_as_first_pipeline_in_bytes": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.produced_as_first_pipeline_in_bytes": 0 } --- "Test that final pipeline has byte stats recorded in first pipeline": @@ -262,11 +262,11 @@ teardown: - do: cluster.info: target: [ ingest ] - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - set: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: ingest_bytes } - - gt: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: $ingest_bytes } - - match: { ingest.pipelines.pipeline-2.ingested_as_first_pipeline_in_bytes: 0 } - - match: { ingest.pipelines.pipeline-2.produced_as_first_pipeline_in_bytes: 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - set: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": ingest_bytes } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": $ingest_bytes } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.ingested_as_first_pipeline_in_bytes": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.produced_as_first_pipeline_in_bytes": 0 } --- "Test that reroute processor has byte stats recorded in first pipeline": @@ -327,11 +327,11 @@ teardown: - do: cluster.info: target: [ ingest ] - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - set: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: ingest_bytes } - - gt: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: $ingest_bytes } - - match: { ingest.pipelines.pipeline-2.ingested_as_first_pipeline_in_bytes: 0 } - - match: { ingest.pipelines.pipeline-2.produced_as_first_pipeline_in_bytes: 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - set: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": ingest_bytes } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": $ingest_bytes } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.ingested_as_first_pipeline_in_bytes": 0 } + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-2.produced_as_first_pipeline_in_bytes": 0 } --- "Test human readable byte stat fields": @@ -360,8 +360,8 @@ teardown: target: [ ingest ] human: true - - match: { ingest.pipelines.pipeline-1.count: 1 } - - gt: { ingest.pipelines.pipeline-1.ingested_as_first_pipeline_in_bytes: 0 } - - gt: { ingest.pipelines.pipeline-1.produced_as_first_pipeline_in_bytes: 0 } - - is_true: ingest.pipelines.pipeline-1.ingested_as_first_pipeline - - is_true: ingest.pipelines.pipeline-1.produced_as_first_pipeline + - match: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.count": 1 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline_in_bytes": 0 } + - gt: { "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline_in_bytes": 0 } + - is_true: "ingest.pipelines.${_project_id_prefix_}pipeline-1.ingested_as_first_pipeline" + - is_true: "ingest.pipelines.${_project_id_prefix_}pipeline-1.produced_as_first_pipeline" diff --git a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml index c8ef06a18fe7..31983f63f26e 100644 --- a/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml +++ b/modules/ingest-common/src/yamlRestTest/resources/rest-api-spec/test/ingest/70_bulk.yml @@ -88,10 +88,10 @@ teardown: - gte: {nodes.$master.ingest.total.failed: 0} - gte: {nodes.$master.ingest.total.time_in_millis: 0} - match: {nodes.$master.ingest.total.current: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline1.count: 0} - - match: {nodes.$master.ingest.pipelines.pipeline1.failed: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline1.time_in_millis: 0} - - match: {nodes.$master.ingest.pipelines.pipeline1.current: 0} + - gte: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline1.count": 0} + - match: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline1.failed": 0} + - gte: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline1.time_in_millis": 0} + - match: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline1.current": 0} --- "Test bulk request with default pipeline": @@ -124,10 +124,10 @@ teardown: - gte: {nodes.$master.ingest.total.failed: 0} - gte: {nodes.$master.ingest.total.time_in_millis: 0} - match: {nodes.$master.ingest.total.current: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline2.count: 0} - - match: {nodes.$master.ingest.pipelines.pipeline2.failed: 0} - - gte: {nodes.$master.ingest.pipelines.pipeline2.time_in_millis: 0} - - match: {nodes.$master.ingest.pipelines.pipeline2.current: 0} + - gte: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline2.count": 0} + - match: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline2.failed": 0} + - gte: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline2.time_in_millis": 0} + - match: { "nodes.$master.ingest.pipelines.${_project_id_prefix_}pipeline2.current": 0} - do: get: diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/health/40_diagnosis.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/health/40_diagnosis.yml index 0d9ac3017420..cc627b2af6b9 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/health/40_diagnosis.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/health/40_diagnosis.yml @@ -26,4 +26,4 @@ - length: { indicators.shards_availability.diagnosis: 1 } - is_true: indicators.shards_availability.diagnosis.0.affected_resources - length: { indicators.shards_availability.diagnosis.0.affected_resources: 1 } - - match: { indicators.shards_availability.diagnosis.0.affected_resources.indices.0: "red_index" } + - match: { indicators.shards_availability.diagnosis.0.affected_resources.indices.0: "${_project_id_prefix_}red_index" } diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml index 195da7b8e685..372b9a9469cc 100644 --- a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml @@ -503,9 +503,9 @@ - gte: { nodes.$node_id.indices.mappings.total_count: 28 } - is_true: nodes.$node_id.indices.mappings.total_estimated_overhead - gte: { nodes.$node_id.indices.mappings.total_estimated_overhead_in_bytes: 26624 } - - match: { nodes.$node_id.indices.indices.index1.mappings.total_count: 28 } - - is_true: nodes.$node_id.indices.indices.index1.mappings.total_estimated_overhead - - match: { nodes.$node_id.indices.indices.index1.mappings.total_estimated_overhead_in_bytes: 28672 } + - match: { "nodes.$node_id.indices.indices.${_project_id_prefix_}index1.mappings.total_count": 28 } + - is_true: "nodes.$node_id.indices.indices.${_project_id_prefix_}index1.mappings.total_estimated_overhead" + - match: { "nodes.$node_id.indices.indices.${_project_id_prefix_}index1.mappings.total_estimated_overhead_in_bytes": 28672 } --- "Lucene segment level fields stats": diff --git a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java index d32803476ae7..99d586610cc3 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/ingest/IngestStatsNamesAndTypesIT.java @@ -15,6 +15,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.MockScriptEngine; @@ -109,7 +110,10 @@ public class IngestStatsNamesAndTypesIT extends ESIntegTestCase { assertThat(pipelineStat.pipelineId(), equalTo("pipeline1")); assertThat(pipelineStat.stats().ingestCount(), equalTo(1L)); - List processorStats = stats.getIngestStats().processorStats().get("pipeline1"); + List processorStats = stats.getIngestStats() + .processorStats() + .get(ProjectId.DEFAULT) + .get("pipeline1"); assertThat(processorStats.size(), equalTo(4)); IngestStats.ProcessorStat setA = processorStats.get(0); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 7ed5e29a65dc..2c119df3f8f3 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -260,6 +260,7 @@ public class TransportVersions { public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00); public static final TransportVersion ESQL_HASH_OPERATOR_STATUS_OUTPUT_TIME = def(9_077_0_00); public static final TransportVersion ML_INFERENCE_HUGGING_FACE_CHAT_COMPLETION_ADDED = def(9_078_0_00); + public static final TransportVersion NODES_STATS_SUPPORTS_MULTI_PROJECT = def(9_079_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/ActionModule.java b/server/src/main/java/org/elasticsearch/action/ActionModule.java index f59fa1a4f83a..2f9f4340bfa7 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/server/src/main/java/org/elasticsearch/action/ActionModule.java @@ -846,7 +846,7 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestNodesInfoAction(settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction()); registerHandler.accept(new RestNodesCapabilitiesAction()); - registerHandler.accept(new RestNodesStatsAction()); + registerHandler.accept(new RestNodesStatsAction(projectIdResolver)); registerHandler.accept(new RestNodesUsageAction()); registerHandler.accept(new RestNodesHotThreadsAction()); registerHandler.accept(new RestClusterAllocationExplainAction()); @@ -981,7 +981,7 @@ public class ActionModule extends AbstractModule { registerHandler.accept(new RestShardsAction()); registerHandler.accept(new RestMasterAction()); registerHandler.accept(new RestNodesAction()); - registerHandler.accept(new RestClusterInfoAction()); + registerHandler.accept(new RestClusterInfoAction(projectIdResolver)); registerHandler.accept(new RestTasksAction(nodesInCluster)); registerHandler.accept(new RestIndicesAction(projectIdResolver)); registerHandler.accept(new RestSegmentsAction()); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index ae4d6cb92c08..541ed3a78f8f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -52,6 +52,8 @@ import static org.elasticsearch.common.xcontent.ChunkedToXContentHelper.chunk; */ public class NodeStats extends BaseNodeResponse implements ChunkedToXContent { + public static final String MULTI_PROJECT_ENABLED_XCONTENT_PARAM_KEY = "multi_project_enabled_node_stats"; + private final long timestamp; @Nullable diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java index 2b214dcc3ad8..db532e28caa8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodes.java @@ -12,6 +12,7 @@ package org.elasticsearch.action.admin.cluster.stats; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.PluginsAndModules; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.Strings; import org.elasticsearch.common.network.NetworkModule; @@ -23,6 +24,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.TimeValue; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.index.stats.IndexingPressureStats; +import org.elasticsearch.ingest.IngestStats.ProcessorStat; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.os.OsInfo; @@ -709,37 +711,38 @@ public class ClusterStatsNodes implements ToXContentFragment { final SortedMap stats; IngestStats(final List nodeStats) { - Set pipelineIds = new HashSet<>(); + Map> pipelineIdsByProject = new HashMap<>(); SortedMap stats = new TreeMap<>(); for (NodeStats nodeStat : nodeStats) { if (nodeStat.getIngestStats() != null) { - for (Map.Entry> processorStats : nodeStat - .getIngestStats() - .processorStats() - .entrySet()) { - pipelineIds.add(processorStats.getKey()); - for (org.elasticsearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) { - stats.compute(stat.type(), (k, v) -> { - org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats(); - if (v == null) { - return new long[] { - nodeIngestStats.ingestCount(), - nodeIngestStats.ingestFailedCount(), - nodeIngestStats.ingestCurrent(), - nodeIngestStats.ingestTimeInMillis() }; - } else { - v[0] += nodeIngestStats.ingestCount(); - v[1] += nodeIngestStats.ingestFailedCount(); - v[2] += nodeIngestStats.ingestCurrent(); - v[3] += nodeIngestStats.ingestTimeInMillis(); - return v; - } - }); + Map>> nodeProcessorStats = nodeStat.getIngestStats().processorStats(); + for (Map.Entry>> processorStatsForProject : nodeProcessorStats.entrySet()) { + ProjectId projectId = processorStatsForProject.getKey(); + for (Map.Entry> processorStats : processorStatsForProject.getValue().entrySet()) { + pipelineIdsByProject.computeIfAbsent(projectId, k -> new HashSet<>()).add(processorStats.getKey()); + for (ProcessorStat stat : processorStats.getValue()) { + stats.compute(stat.type(), (k, v) -> { + org.elasticsearch.ingest.IngestStats.Stats nodeIngestStats = stat.stats(); + if (v == null) { + return new long[] { + nodeIngestStats.ingestCount(), + nodeIngestStats.ingestFailedCount(), + nodeIngestStats.ingestCurrent(), + nodeIngestStats.ingestTimeInMillis() }; + } else { + v[0] += nodeIngestStats.ingestCount(); + v[1] += nodeIngestStats.ingestFailedCount(); + v[2] += nodeIngestStats.ingestCurrent(); + v[3] += nodeIngestStats.ingestTimeInMillis(); + return v; + } + }); + } } } } } - this.pipelineCount = pipelineIds.size(); + this.pipelineCount = pipelineIdsByProject.values().stream().mapToInt(Set::size).sum(); this.stats = Collections.unmodifiableSortedMap(stats); } diff --git a/server/src/main/java/org/elasticsearch/cluster/project/ProjectIdResolver.java b/server/src/main/java/org/elasticsearch/cluster/project/ProjectIdResolver.java index ca9f8af4c3cd..e953316cd44a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/project/ProjectIdResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/project/ProjectIdResolver.java @@ -9,6 +9,7 @@ package org.elasticsearch.cluster.project; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; /** @@ -27,4 +28,14 @@ public interface ProjectIdResolver { * @return The identifier of the current project. */ ProjectId getProjectId(); + + /** + * Returns {@code false} if the cluster runs in a setup that always expects only a single default project (see also + * {@link Metadata#DEFAULT_PROJECT_ID}). + * Otherwise, it should return {@code true} to indicate the cluster can accommodate multiple projects regardless + * how many project it current has. + */ + default boolean supportsMultipleProjects() { + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java b/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java index dd6de4215906..b237bca5c295 100644 --- a/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java @@ -113,14 +113,4 @@ public interface ProjectResolver extends ProjectIdResolver { } }; } - - /** - * Returns {@code false} if the cluster runs in a setup that always expects only a single default project (see also - * {@link Metadata#DEFAULT_PROJECT_ID}). - * Otherwise, it should return {@code true} to indicate the cluster can accommodate multiple projects regardless - * how many project it current has. - */ - default boolean supportsMultipleProjects() { - return false; - } } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 9ff23a73ff7a..27e7273ac506 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.project.ProjectResolver; @@ -482,7 +483,13 @@ public class IndicesService extends AbstractLifecycleComponent } } - return new NodeIndicesStats(commonStats, statsByIndex(this, flags), statsByShard(this, flags), includeShardsStats); + return new NodeIndicesStats( + commonStats, + statsByIndex(this, flags), + statsByShard(this, flags), + projectsByIndex(), + includeShardsStats + ); } static Map statsByIndex(final IndicesService indicesService, final CommonStatsFlags flags) { @@ -564,6 +571,15 @@ public class IndicesService extends AbstractLifecycleComponent ); } + private Map projectsByIndex() { + Map map = new HashMap<>(indices.size()); + for (IndexService indexShards : indices.values()) { + Index index = indexShards.index(); + clusterService.state().metadata().lookupProject(index).ifPresent(project -> map.put(index, project.id())); + } + return map; + } + /** * Checks if changes (adding / removing) indices, shards and so on are allowed. * diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index 68c0ec870211..83add510c6b0 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -12,9 +12,11 @@ package org.elasticsearch.indices; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.NodeStatsLevel; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndexShardStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -55,6 +57,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import static java.util.Objects.requireNonNull; + /** * Global information on indices stats running on a specific node. */ @@ -66,6 +70,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { private final CommonStats stats; private final Map> statsByShard; private final Map statsByIndex; + private final Map projectsByIndex; public NodeIndicesStats(StreamInput in) throws IOException { stats = new CommonStats(in); @@ -87,20 +92,29 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { } else { statsByIndex = new HashMap<>(); } + + if (in.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) { + projectsByIndex = in.readMap(Index::new, ProjectId::readFrom); + } else { + // Older nodes do not include the index-to-project map, so we leave it empty. This means all indices will be treated as if the + // project is unknown. This does not matter as the map is only used in multi-project clusters which will not have old nodes. + projectsByIndex = Map.of(); + } } public NodeIndicesStats( CommonStats oldStats, Map statsByIndex, Map> statsByShard, + Map projectsByIndex, boolean includeShardsStats ) { if (includeShardsStats) { - this.statsByShard = Objects.requireNonNull(statsByShard); + this.statsByShard = requireNonNull(statsByShard); } else { this.statsByShard = EMPTY_STATS_BY_SHARD; } - this.statsByIndex = Objects.requireNonNull(statsByIndex); + this.statsByIndex = requireNonNull(statsByIndex); // make a total common stats from old ones and current ones this.stats = oldStats; @@ -114,6 +128,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { for (CommonStats indexStats : statsByIndex.values()) { stats.add(indexStats); } + this.projectsByIndex = requireNonNull(projectsByIndex); } @Nullable @@ -228,6 +243,9 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { if (out.getTransportVersion().onOrAfter(VERSION_SUPPORTING_STATS_BY_INDEX)) { out.writeMap(statsByIndex); } + if (out.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) { + out.writeMap(projectsByIndex); + } } @Override @@ -235,12 +253,15 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; NodeIndicesStats that = (NodeIndicesStats) o; - return stats.equals(that.stats) && statsByShard.equals(that.statsByShard) && statsByIndex.equals(that.statsByIndex); + return stats.equals(that.stats) + && statsByShard.equals(that.statsByShard) + && statsByIndex.equals(that.statsByIndex) + && projectsByIndex.equals(that.projectsByIndex); } @Override public int hashCode() { - return Objects.hash(stats, statsByShard, statsByIndex); + return Objects.hash(stats, statsByShard, statsByIndex, projectsByIndex); } @Override @@ -260,7 +281,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { case INDICES -> ChunkedToXContentHelper.object( Fields.INDICES, Iterators.map(createCommonStatsByIndex().entrySet().iterator(), entry -> (builder, params) -> { - builder.startObject(entry.getKey().getName()); + builder.startObject(xContentKey(entry.getKey(), outerParams)); entry.getValue().toXContent(builder, outerParams); return builder.endObject(); }) @@ -271,7 +292,7 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { Iterators.flatMap( statsByShard.entrySet().iterator(), entry -> ChunkedToXContentHelper.array( - entry.getKey().getName(), + xContentKey(entry.getKey(), outerParams), Iterators.flatMap( entry.getValue().iterator(), indexShardStats -> Iterators.concat( @@ -291,6 +312,21 @@ public class NodeIndicesStats implements Writeable, ChunkedToXContent { ); } + private String xContentKey(Index index, ToXContent.Params outerParams) { + if (outerParams.paramAsBoolean(NodeStats.MULTI_PROJECT_ENABLED_XCONTENT_PARAM_KEY, false)) { + ProjectId projectId = projectsByIndex.get(index); + if (projectId == null) { + // This can happen if the stats were captured after the IndexService was created but before the state was updated. + // The best we can do is handle it gracefully. We include the UUID as well as the name to ensure it is unambiguous. + return "/" + index.getName() + "/" + index.getUUID(); + } else { + return projectId + "/" + index.getName(); + } + } else { + return index.getName(); + } + } + private Map createCommonStatsByIndex() { Map statsMap = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index 165632a69c78..59cd9c11c9b6 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -39,7 +39,6 @@ import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexTemplateMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService; import org.elasticsearch.cluster.metadata.ProjectId; @@ -60,7 +59,6 @@ import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.TimeValue; @@ -1244,23 +1242,23 @@ public class IngestService implements ClusterStateApplier, ReportingService { - Pipeline pipeline = holder.pipeline; - CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); - statsBuilder.addPipelineMetrics(id, pipeline.getMetrics()); - List> processorMetrics = new ArrayList<>(); - collectProcessorMetrics(rootProcessor, processorMetrics); - processorMetrics.forEach(t -> { - Processor processor = t.v1(); - IngestMetric processorMetric = t.v2(); - statsBuilder.addProcessorMetrics(id, getProcessorName(processor), processor.getType(), processorMetric); + for (ProjectId projectId : pipelines.keySet()) { + pipelines.getOrDefault(projectId, ImmutableOpenMap.of()).forEach((id, holder) -> { + Pipeline pipeline = holder.pipeline; + CompoundProcessor rootProcessor = pipeline.getCompoundProcessor(); + statsBuilder.addPipelineMetrics(projectId, id, pipeline.getMetrics()); + List> processorMetrics = new ArrayList<>(); + collectProcessorMetrics(rootProcessor, processorMetrics); + processorMetrics.forEach(t -> { + Processor processor = t.v1(); + IngestMetric processorMetric = t.v2(); + statsBuilder.addProcessorMetrics(projectId, id, getProcessorName(processor), processor.getType(), processorMetric); + }); }); - }); + } return statsBuilder.build(); } diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java index 8aab867af5da..686c350d267c 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestStats.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestStats.java @@ -10,12 +10,14 @@ package org.elasticsearch.ingest; import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xcontent.ToXContent; @@ -24,7 +26,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -33,10 +34,14 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; -public record IngestStats(Stats totalStats, List pipelineStats, Map> processorStats) - implements - Writeable, - ChunkedToXContent { +import static java.util.Collections.unmodifiableList; +import static java.util.Collections.unmodifiableMap; + +public record IngestStats( + Stats totalStats, + List pipelineStats, + Map>> processorStats +) implements Writeable, ChunkedToXContent { private static final Comparator PIPELINE_STAT_COMPARATOR = Comparator.comparingLong( (PipelineStat p) -> p.stats.ingestTimeInMillis @@ -69,13 +74,17 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma return IDENTITY; } var pipelineStats = new ArrayList(size); - var processorStats = Maps.>newMapWithExpectedSize(size); + Map>> processorStats = new HashMap<>(); for (var i = 0; i < size; i++) { + ProjectId projectId = in.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT) + ? ProjectId.readFrom(in) + // We will not have older nodes in a multi-project cluster, so we can assume that everything is in the default project. + : Metadata.DEFAULT_PROJECT_ID; var pipelineId = in.readString(); var pipelineStat = readStats(in); var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? readByteStats(in) : ByteStats.IDENTITY; - pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat)); + pipelineStats.add(new PipelineStat(projectId, pipelineId, pipelineStat, byteStat)); int processorsSize = in.readVInt(); var processorStatsPerPipeline = new ArrayList(processorsSize); for (var j = 0; j < processorsSize; j++) { @@ -87,10 +96,10 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma processorType = namesAndTypesCache.computeIfAbsent(processorType, Function.identity()); processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat)); } - processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline)); + processorStats.computeIfAbsent(projectId, k -> new HashMap<>()).put(pipelineId, unmodifiableList(processorStatsPerPipeline)); } - return new IngestStats(stats, pipelineStats, processorStats); + return new IngestStats(stats, pipelineStats, unmodifiableMap(processorStats)); } @Override @@ -98,12 +107,16 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma totalStats.writeTo(out); out.writeVInt(pipelineStats.size()); for (PipelineStat pipelineStat : pipelineStats) { + if (out.getTransportVersion().onOrAfter(TransportVersions.NODES_STATS_SUPPORTS_MULTI_PROJECT)) { + pipelineStat.projectId().writeTo(out); + } out.writeString(pipelineStat.pipelineId()); pipelineStat.stats().writeTo(out); if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0)) { pipelineStat.byteStats().writeTo(out); } - List processorStatsForPipeline = processorStats.get(pipelineStat.pipelineId()); + List processorStatsForPipeline = processorStats.getOrDefault(pipelineStat.projectId(), Map.of()) + .get(pipelineStat.pipelineId()); if (processorStatsForPipeline == null) { out.writeVInt(0); } else { @@ -134,7 +147,10 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma pipelineStat -> Iterators.concat( Iterators.single((builder, params) -> { - builder.startObject(pipelineStat.pipelineId()); + String key = outerParams.paramAsBoolean(NodeStats.MULTI_PROJECT_ENABLED_XCONTENT_PARAM_KEY, false) + ? pipelineStat.projectId() + "/" + pipelineStat.pipelineId() + : pipelineStat.pipelineId(); + builder.startObject(key); pipelineStat.stats().toXContent(builder, params); pipelineStat.byteStats().toXContent(builder, params); builder.startArray("processors"); @@ -142,7 +158,9 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma }), Iterators.map( - processorStats.getOrDefault(pipelineStat.pipelineId(), List.of()).iterator(), + processorStats.getOrDefault(pipelineStat.projectId(), Map.of()) + .getOrDefault(pipelineStat.pipelineId(), List.of()) + .iterator(), processorStat -> (builder, params) -> { builder.startObject(); builder.startObject(processorStat.name()); @@ -170,7 +188,20 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma ); } - static Map> merge(Map> first, Map> second) { + static Map>> merge( + Map>> first, + Map>> second + ) { + Map>> totals = new HashMap<>(); + first.forEach((projectId, statsByPipeline) -> totals.merge(projectId, statsByPipeline, IngestStats::innerMerge)); + second.forEach((projectId, statsByPipeline) -> totals.merge(projectId, statsByPipeline, IngestStats::innerMerge)); + return totals; + } + + private static Map> innerMerge( + Map> first, + Map> second + ) { var totalsPerPipelineProcessor = new HashMap>(); first.forEach((pipelineId, stats) -> totalsPerPipelineProcessor.merge(pipelineId, stats, ProcessorStat::merge)); @@ -234,7 +265,7 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma static class Builder { private Stats totalStats = null; private final List pipelineStats = new ArrayList<>(); - private final Map> processorStats = new HashMap<>(); + private final Map>> processorStats = new HashMap<>(); Builder addTotalMetrics(IngestMetric totalMetric) { assert totalStats == null; @@ -242,44 +273,55 @@ public record IngestStats(Stats totalStats, List pipelineStats, Ma return this; } - Builder addPipelineMetrics(String pipelineId, IngestPipelineMetric ingestPipelineMetrics) { + Builder addPipelineMetrics(ProjectId projectId, String pipelineId, IngestPipelineMetric ingestPipelineMetrics) { this.pipelineStats.add( - new PipelineStat(pipelineId, ingestPipelineMetrics.createStats(), ingestPipelineMetrics.createByteStats()) + new PipelineStat(projectId, pipelineId, ingestPipelineMetrics.createStats(), ingestPipelineMetrics.createByteStats()) ); return this; } - Builder addProcessorMetrics(String pipelineId, String processorName, String processorType, IngestMetric metric) { - this.processorStats.computeIfAbsent(pipelineId, k -> new ArrayList<>()) + Builder addProcessorMetrics( + ProjectId projectId, + String pipelineId, + String processorName, + String processorType, + IngestMetric metric + ) { + this.processorStats.computeIfAbsent(projectId, k -> new HashMap<>()) + .computeIfAbsent(pipelineId, k -> new ArrayList<>()) .add(new ProcessorStat(processorName, processorType, metric.createStats())); return this; } IngestStats build() { - return new IngestStats(totalStats, Collections.unmodifiableList(pipelineStats), Collections.unmodifiableMap(processorStats)); + return new IngestStats(totalStats, unmodifiableList(pipelineStats), unmodifiableMap(processorStats)); } } /** * Container for pipeline stats. */ - public record PipelineStat(String pipelineId, Stats stats, ByteStats byteStats) { + public record PipelineStat(ProjectId projectId, String pipelineId, Stats stats, ByteStats byteStats) { static List merge(List first, List second) { - var totalsPerPipeline = new HashMap(); + record MergeKey(ProjectId projectId, String pipelineId) {} - first.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps, PipelineStat::merge)); - second.forEach(ps -> totalsPerPipeline.merge(ps.pipelineId, ps, PipelineStat::merge)); + var totalsPerPipeline = new HashMap(); + + first.forEach(ps -> totalsPerPipeline.merge(new MergeKey(ps.projectId, ps.pipelineId), ps, PipelineStat::merge)); + second.forEach(ps -> totalsPerPipeline.merge(new MergeKey(ps.projectId, ps.pipelineId), ps, PipelineStat::merge)); return totalsPerPipeline.entrySet() .stream() - .map(v -> new PipelineStat(v.getKey(), v.getValue().stats, v.getValue().byteStats)) + .map(v -> new PipelineStat(v.getKey().projectId(), v.getKey().pipelineId(), v.getValue().stats, v.getValue().byteStats)) .sorted(PIPELINE_STAT_COMPARATOR) .toList(); } private static PipelineStat merge(PipelineStat first, PipelineStat second) { + assert first.projectId.equals(second.projectId) : "Can only merge stats from the same project"; assert first.pipelineId.equals(second.pipelineId) : "Can only merge stats from the same pipeline"; return new PipelineStat( + first.projectId, first.pipelineId, Stats.merge(first.stats, second.stats), ByteStats.merge(first.byteStats, second.byteStats) diff --git a/server/src/main/java/org/elasticsearch/rest/action/RestRefCountedChunkedToXContentListener.java b/server/src/main/java/org/elasticsearch/rest/action/RestRefCountedChunkedToXContentListener.java index ce44b52fc947..22585bdfd98e 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/RestRefCountedChunkedToXContentListener.java +++ b/server/src/main/java/org/elasticsearch/rest/action/RestRefCountedChunkedToXContentListener.java @@ -14,6 +14,7 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.xcontent.ToXContent; /** * Same as {@link RestChunkedToXContentListener} but decrements the ref count on the response it receives by one after serialization of the @@ -25,6 +26,10 @@ public class RestRefCountedChunkedToXContentListener SUPPORTED_CAPABILITIES = Set.of("dense_vector_off_heap_stats"); + private final ProjectIdResolver projectIdResolver; + @Override public List routes() { return List.of( @@ -63,6 +68,10 @@ public class RestNodesStatsAction extends BaseRestHandler { FLAGS = Collections.unmodifiableMap(flags); } + public RestNodesStatsAction(ProjectIdResolver projectIdResolver) { + this.projectIdResolver = projectIdResolver; + } + @Override public String getName() { return "nodes_stats_action"; @@ -179,7 +188,17 @@ public class RestNodesStatsAction extends BaseRestHandler { return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() - .nodesStats(nodesStatsRequest, new RestRefCountedChunkedToXContentListener<>(channel)); + .nodesStats( + nodesStatsRequest, + new RestRefCountedChunkedToXContentListener<>(channel, xContentParamsForRequest(channel.request())) + ); + } + + private ToXContent.DelegatingMapParams xContentParamsForRequest(RestRequest request) { + return new ToXContent.DelegatingMapParams( + Map.of(NodeStats.MULTI_PROJECT_ENABLED_XCONTENT_PARAM_KEY, Boolean.toString(projectIdResolver.supportsMultipleProjects())), + request + ); } private final Set RESPONSE_PARAMS = Collections.singleton("level"); diff --git a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java index cee76dc9433e..dd031cf0258c 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/info/RestClusterInfoAction.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.project.ProjectIdResolver; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.xcontent.ChunkedToXContent; @@ -31,6 +32,7 @@ import org.elasticsearch.rest.action.RestCancellableNodeClient; import org.elasticsearch.rest.action.RestResponseListener; import org.elasticsearch.script.ScriptStats; import org.elasticsearch.threadpool.ThreadPoolStats; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; import java.util.List; @@ -47,7 +49,6 @@ import static org.elasticsearch.action.admin.cluster.node.stats.NodesStatsReques import static org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric.INGEST; import static org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric.SCRIPT; import static org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric.THREAD_POOL; -import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; @ServerlessScope(Scope.PUBLIC) public class RestClusterInfoAction extends BaseRestHandler { @@ -77,6 +78,12 @@ public class RestClusterInfoAction extends BaseRestHandler { static final Set AVAILABLE_TARGETS = RESPONSE_MAPPER.keySet(); static final Set AVAILABLE_TARGET_NAMES = AVAILABLE_TARGETS.stream().map(Metric::metricName).collect(toUnmodifiableSet()); + private final ProjectIdResolver projectIdResolver; + + public RestClusterInfoAction(ProjectIdResolver projectIdResolver) { + this.projectIdResolver = projectIdResolver; + } + @Override public String getName() { return "cluster_info_action"; @@ -137,7 +144,7 @@ public class RestClusterInfoAction extends BaseRestHandler { Iterators.flatMap(chunkedResponses, chunk -> chunk.toXContentChunked(outerParams)), ChunkedToXContentHelper.endObject() ), - EMPTY_PARAMS, + xContentParams(), channel ), null @@ -145,4 +152,10 @@ public class RestClusterInfoAction extends BaseRestHandler { } }); } + + private ToXContent.MapParams xContentParams() { + return new ToXContent.MapParams( + Map.of(NodeStats.MULTI_PROJECT_ENABLED_XCONTENT_PARAM_KEY, Boolean.toString(projectIdResolver.supportsMultipleProjects())) + ); + } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java index b99ae142dabc..70f278bd8d36 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.coordination.ClusterStateSerializationStats; import org.elasticsearch.cluster.coordination.PendingClusterStateStats; import org.elasticsearch.cluster.coordination.PublishClusterStateStats; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.routing.RecoverySource; @@ -29,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterStateUpdateStats; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.network.HandlingTimeTracker; -import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.Tuple; import org.elasticsearch.discovery.DiscoveryStats; @@ -533,7 +533,12 @@ public class NodeStatsTests extends ESTestCase { private static int expectedChunks(IngestStats ingestStats) { return 2 + ingestStats.pipelineStats() .stream() - .mapToInt(pipelineStats -> 2 + ingestStats.processorStats().getOrDefault(pipelineStats.pipelineId(), List.of()).size()) + .mapToInt( + pipelineStats -> 2 + ingestStats.processorStats() + .getOrDefault(pipelineStats.projectId(), Map.of()) + .getOrDefault(pipelineStats.pipelineId(), List.of()) + .size() + ) .sum(); } @@ -689,7 +694,8 @@ public class NodeStatsTests extends ESTestCase { statsByShard.put(indexTest, indexShardStats); CommonStats oldStats = new CommonStats(CommonStatsFlags.ALL); - nodeIndicesStats = new NodeIndicesStats(oldStats, statsByIndex, statsByShard, true); + Map projectsByIndex = Map.of(indexTest, randomUniqueProjectId()); + nodeIndicesStats = new NodeIndicesStats(oldStats, statsByIndex, statsByShard, projectsByIndex, true); } OsStats osStats = null; if (frequently()) { @@ -971,11 +977,13 @@ public class NodeStatsTests extends ESTestCase { randomLongBetween(0, maxStatValue) ); List ingestPipelineStats = new ArrayList<>(numPipelines); - Map> ingestProcessorStats = Maps.newMapWithExpectedSize(numPipelines); + Map>> ingestProcessorStats = new HashMap<>(); for (int i = 0; i < numPipelines; i++) { + ProjectId projectId = randomProjectIdOrDefault(); String pipelineId = randomAlphaOfLengthBetween(3, 10); ingestPipelineStats.add( new IngestStats.PipelineStat( + projectId, pipelineId, new IngestStats.Stats( randomLongBetween(0, maxStatValue), @@ -999,7 +1007,7 @@ public class NodeStatsTests extends ESTestCase { new IngestStats.ProcessorStat(randomAlphaOfLengthBetween(3, 10), randomAlphaOfLengthBetween(3, 10), processorStats) ); } - ingestProcessorStats.put(pipelineId, processorPerPipeline); + ingestProcessorStats.computeIfAbsent(projectId, k -> new HashMap<>()).put(pipelineId, processorPerPipeline); } ingestStats = new IngestStats(totalStats, ingestPipelineStats, ingestProcessorStats); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index 93fa1006ca3b..65611ca85fa1 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -66,7 +66,7 @@ public class ClusterStatsNodesTests extends ESTestCase { public void testIngestStats() throws Exception { NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); SortedMap processorStats = new TreeMap<>(); - nodeStats.getIngestStats().processorStats().values().forEach(stats -> { + nodeStats.getIngestStats().processorStats().values().stream().flatMap(map -> map.values().stream()).forEach(stats -> { stats.forEach(stat -> { processorStats.compute(stat.type(), (key, value) -> { if (value == null) { @@ -87,7 +87,7 @@ public class ClusterStatsNodesTests extends ESTestCase { }); ClusterStatsNodes.IngestStats stats = new ClusterStatsNodes.IngestStats(List.of(nodeStats)); - assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().processorStats().size())); + assertThat(stats.pipelineCount, equalTo(nodeStats.getIngestStats().pipelineStats().size())); StringBuilder processorStatsString = new StringBuilder("{"); Iterator> iter = processorStats.entrySet().iterator(); while (iter.hasNext()) { diff --git a/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java index 36d526e70e37..3f7d0f6303ee 100644 --- a/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/elasticsearch/indices/NodeIndicesStatsTests.java @@ -27,7 +27,13 @@ import static org.hamcrest.object.HasToString.hasToString; public class NodeIndicesStatsTests extends ESTestCase { public void testInvalidLevel() { - final NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), Collections.emptyMap(), randomBoolean()); + final NodeIndicesStats stats = new NodeIndicesStats( + null, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + randomBoolean() + ); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> stats.toXContentChunked(params)); @@ -42,9 +48,9 @@ public class NodeIndicesStatsTests extends ESTestCase { final Map> statsByShards = new HashMap<>(); final List emptyList = List.of(); statsByShards.put(index, emptyList); - NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), statsByShards, true); + NodeIndicesStats stats = new NodeIndicesStats(null, Collections.emptyMap(), statsByShards, Collections.emptyMap(), true); assertThat(stats.getShardStats(index), sameInstance(emptyList)); - stats = new NodeIndicesStats(null, Collections.emptyMap(), statsByShards, false); + stats = new NodeIndicesStats(null, Collections.emptyMap(), statsByShards, Collections.emptyMap(), false); assertThat(stats.getShardStats(index), nullValue()); } diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 2ab28c215119..6350b882d86c 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -52,7 +52,6 @@ import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Predicates; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; @@ -2132,19 +2131,22 @@ public class IngestServiceTests extends ESTestCase { // n.b. this 'pipeline' processor will always run the '_id3' pipeline, see the mocking/plumbing above and below PutPipelineRequest putRequest2 = putJsonPipelineRequest("_id2", "{\"processors\": [{\"pipeline\" : {}}]}"); PutPipelineRequest putRequest3 = putJsonPipelineRequest("_id3", "{\"processors\": [{\"mock\" : {}}]}"); - @FixForMultiProject(description = "Do not use default project id once stats are project aware") - var projectId = DEFAULT_PROJECT_ID; + var projectId1 = randomProjectIdOrDefault(); + var projectId2 = randomProjectIdOrDefault(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .putProjectMetadata(ProjectMetadata.builder(projectId1).build()) .build(); + if (projectId2.equals(projectId1) == false) { + clusterState = ClusterState.builder(clusterState).putProjectMetadata(ProjectMetadata.builder(projectId2).build()).build(); + } ClusterState previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest1, clusterState); - clusterState = executePut(projectId, putRequest2, clusterState); - clusterState = executePut(projectId, putRequest3, clusterState); + clusterState = executePut(projectId1, putRequest1, clusterState); + clusterState = executePut(projectId1, putRequest2, clusterState); + clusterState = executePut(projectId2, putRequest3, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); // hook up the mock ingest service to return pipeline3 when asked by the pipeline processor - pipelineToReturn[0] = ingestService.getPipeline(projectId, "_id3"); + pipelineToReturn[0] = ingestService.getPipeline(projectId2, "_id3"); { final IngestStats ingestStats = ingestService.stats(); @@ -2153,13 +2155,13 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(ingestStats.totalStats(), 0, 0, 0); // pipeline - assertPipelineStats(ingestStats.pipelineStats(), "_id1", 0, 0, 0, 0, 0); - assertPipelineStats(ingestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0); - assertPipelineStats(ingestStats.pipelineStats(), "_id3", 0, 0, 0, 0, 0); + assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id1", 0, 0, 0, 0, 0); + assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id2", 0, 0, 0, 0, 0); + assertPipelineStats(ingestStats.pipelineStats(), projectId2, "_id3", 0, 0, 0, 0, 0); // processor - assertProcessorStats(0, ingestStats, "_id1", 0, 0, 0); - assertProcessorStats(0, ingestStats, "_id2", 0, 0, 0); - assertProcessorStats(0, ingestStats, "_id3", 0, 0, 0); + assertProcessorStats(0, ingestStats, projectId1, "_id1", 0, 0, 0); + assertProcessorStats(0, ingestStats, projectId1, "_id2", 0, 0, 0); + assertProcessorStats(0, ingestStats, projectId2, "_id3", 0, 0, 0); } // put a single document through ingest processing @@ -2168,7 +2170,7 @@ public class IngestServiceTests extends ESTestCase { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); var startSize = indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId1, 1, List.of(indexRequest), indexReq -> {}, @@ -2186,13 +2188,13 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(ingestStats.totalStats(), 1, 0, 0); // pipeline - assertPipelineStats(ingestStats.pipelineStats(), "_id1", 1, 0, 0, startSize, indexRequest.ramBytesUsed()); - assertPipelineStats(ingestStats.pipelineStats(), "_id2", 1, 0, 0, 0, 0); - assertPipelineStats(ingestStats.pipelineStats(), "_id3", 1, 0, 0, 0, 0); + assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize, indexRequest.ramBytesUsed()); + assertPipelineStats(ingestStats.pipelineStats(), projectId1, "_id2", 1, 0, 0, 0, 0); + assertPipelineStats(ingestStats.pipelineStats(), projectId2, "_id3", 1, 0, 0, 0, 0); // processor - assertProcessorStats(0, ingestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, ingestStats, "_id2", 1, 0, 0); - assertProcessorStats(0, ingestStats, "_id3", 1, 0, 0); + assertProcessorStats(0, ingestStats, projectId1, "_id1", 1, 0, 0); + assertProcessorStats(0, ingestStats, projectId1, "_id2", 1, 0, 0); + assertProcessorStats(0, ingestStats, projectId2, "_id3", 1, 0, 0); } } @@ -2228,17 +2230,20 @@ public class IngestServiceTests extends ESTestCase { assertStats(initialStats.totalStats(), 0, 0, 0); PutPipelineRequest putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}]}"); - @FixForMultiProject(description = "Do not use default project id once stats are project aware") - var projectId = DEFAULT_PROJECT_ID; + var projectId1 = randomProjectIdOrDefault(); + var projectId2 = randomProjectIdOrDefault(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .putProjectMetadata(ProjectMetadata.builder(projectId).build()) + .putProjectMetadata(ProjectMetadata.builder(projectId1).build()) .build(); + if (projectId2.equals(projectId1) == false) { + clusterState = ClusterState.builder(clusterState).putProjectMetadata(ProjectMetadata.builder(projectId2).build()).build(); + } ClusterState previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePut(projectId1, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); putRequest = putJsonPipelineRequest("_id2", "{\"processors\": [{\"mock\" : {}}]}"); previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePut(projectId2, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); @SuppressWarnings("unchecked") @@ -2251,7 +2256,7 @@ public class IngestServiceTests extends ESTestCase { indexRequest.source(randomAlphaOfLength(10), randomAlphaOfLength(10)); var startSize1 = indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId1, 1, List.of(indexRequest), indexReq -> {}, @@ -2265,22 +2270,22 @@ public class IngestServiceTests extends ESTestCase { var endSize1 = indexRequest.ramBytesUsed(); assertThat(afterFirstRequestStats.pipelineStats().size(), equalTo(2)); - afterFirstRequestStats.processorStats().get("_id1").forEach(p -> assertEquals(p.name(), "mock:mockTag")); - afterFirstRequestStats.processorStats().get("_id2").forEach(p -> assertEquals(p.name(), "mock:mockTag")); + afterFirstRequestStats.processorStats().get(projectId1).get("_id1").forEach(p -> assertEquals(p.name(), "mock:mockTag")); + afterFirstRequestStats.processorStats().get(projectId2).get("_id2").forEach(p -> assertEquals(p.name(), "mock:mockTag")); // total assertStats(afterFirstRequestStats.totalStats(), 1, 0, 0); // pipeline - assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1); - assertPipelineStats(afterFirstRequestStats.pipelineStats(), "_id2", 0, 0, 0, 0, 0); + assertPipelineStats(afterFirstRequestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize1, endSize1); + assertPipelineStats(afterFirstRequestStats.pipelineStats(), projectId2, "_id2", 0, 0, 0, 0, 0); // processor - assertProcessorStats(0, afterFirstRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterFirstRequestStats, "_id2", 0, 0, 0); + assertProcessorStats(0, afterFirstRequestStats, projectId1, "_id1", 1, 0, 0); + assertProcessorStats(0, afterFirstRequestStats, projectId2, "_id2", 0, 0, 0); indexRequest.setPipeline("_id2"); var startSize2 = indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId2, 1, List.of(indexRequest), indexReq -> {}, @@ -2296,21 +2301,21 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(afterSecondRequestStats.totalStats(), 2, 0, 0); // pipeline - assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id1", 1, 0, 0, startSize1, endSize1); - assertPipelineStats(afterSecondRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2); + assertPipelineStats(afterSecondRequestStats.pipelineStats(), projectId1, "_id1", 1, 0, 0, startSize1, endSize1); + assertPipelineStats(afterSecondRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2); // processor - assertProcessorStats(0, afterSecondRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterSecondRequestStats, "_id2", 1, 0, 0); + assertProcessorStats(0, afterSecondRequestStats, projectId1, "_id1", 1, 0, 0); + assertProcessorStats(0, afterSecondRequestStats, projectId2, "_id2", 1, 0, 0); // update cluster state and ensure that new stats are added to old stats putRequest = putJsonPipelineRequest("_id1", "{\"processors\": [{\"mock\" : {}}, {\"mock\" : {}}]}"); previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePut(projectId1, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); startSize1 += indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId1, 1, List.of(indexRequest), indexReq -> {}, @@ -2326,26 +2331,26 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(afterThirdRequestStats.totalStats(), 3, 0, 0); // pipeline - assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id1", 2, 0, 0, startSize1, endSize1); - assertPipelineStats(afterThirdRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2); + assertPipelineStats(afterThirdRequestStats.pipelineStats(), projectId1, "_id1", 2, 0, 0, startSize1, endSize1); + assertPipelineStats(afterThirdRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2); // The number of processors for the "id1" pipeline changed, so the per-processor metrics are not carried forward. This is // due to the parallel array's used to identify which metrics to carry forward. Without unique ids or semantic equals for each // processor, parallel arrays are the best option for of carrying forward metrics between pipeline changes. However, in some cases, // like this one it may not be readily obvious why the metrics were not carried forward. - assertProcessorStats(0, afterThirdRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(1, afterThirdRequestStats, "_id1", 1, 0, 0); - assertProcessorStats(0, afterThirdRequestStats, "_id2", 1, 0, 0); + assertProcessorStats(0, afterThirdRequestStats, projectId1, "_id1", 1, 0, 0); + assertProcessorStats(1, afterThirdRequestStats, projectId1, "_id1", 1, 0, 0); + assertProcessorStats(0, afterThirdRequestStats, projectId2, "_id2", 1, 0, 0); // test a failure, and that the processor stats are added from the old stats putRequest = putJsonPipelineRequest("_id1", """ {"processors": [{"failure-mock" : { "on_failure": [{"mock" : {}}]}}, {"mock" : {}}]}"""); previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePut(projectId1, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id1"); startSize1 += indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId1, 1, List.of(indexRequest), indexReq -> {}, @@ -2361,22 +2366,22 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(afterForthRequestStats.totalStats(), 4, 0, 0); // pipeline - assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1); - assertPipelineStats(afterForthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2); + assertPipelineStats(afterForthRequestStats.pipelineStats(), projectId1, "_id1", 3, 0, 0, startSize1, endSize1); + assertPipelineStats(afterForthRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2); // processor - assertProcessorStats(0, afterForthRequestStats, "_id1", 1, 1, 0); // not carried forward since type changed - assertProcessorStats(1, afterForthRequestStats, "_id1", 2, 0, 0); // carried forward and added from old stats - assertProcessorStats(0, afterForthRequestStats, "_id2", 1, 0, 0); + assertProcessorStats(0, afterForthRequestStats, projectId1, "_id1", 1, 1, 0); // not carried forward since type changed + assertProcessorStats(1, afterForthRequestStats, projectId1, "_id1", 2, 0, 0); // carried forward and added from old stats + assertProcessorStats(0, afterForthRequestStats, projectId2, "_id2", 1, 0, 0); // test with drop processor putRequest = putJsonPipelineRequest("_id3", "{\"processors\": [{\"drop\" : {}}]}"); previousClusterState = clusterState; - clusterState = executePut(projectId, putRequest, clusterState); + clusterState = executePut(projectId1, putRequest, clusterState); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); indexRequest.setPipeline("_id3"); long startSize3 = indexRequest.ramBytesUsed(); ingestService.executeBulkRequest( - projectId, + projectId1, 1, List.of(indexRequest), indexReq -> {}, @@ -2391,13 +2396,13 @@ public class IngestServiceTests extends ESTestCase { // total assertStats(afterFifthRequestStats.totalStats(), 5, 0, 0); // pipeline - assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id1", 3, 0, 0, startSize1, endSize1); - assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id2", 1, 0, 0, startSize2, endSize2); - assertPipelineStats(afterFifthRequestStats.pipelineStats(), "_id3", 1, 0, 0, startSize3, 0); + assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId1, "_id1", 3, 0, 0, startSize1, endSize1); + assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId2, "_id2", 1, 0, 0, startSize2, endSize2); + assertPipelineStats(afterFifthRequestStats.pipelineStats(), projectId1, "_id3", 1, 0, 0, startSize3, 0); // processor - assertProcessorStats(0, afterFifthRequestStats, "_id1", 1, 1, 0); - assertProcessorStats(1, afterFifthRequestStats, "_id1", 2, 0, 0); - assertProcessorStats(0, afterFifthRequestStats, "_id2", 1, 0, 0); + assertProcessorStats(0, afterFifthRequestStats, projectId1, "_id1", 1, 1, 0); + assertProcessorStats(1, afterFifthRequestStats, projectId1, "_id1", 2, 0, 0); + assertProcessorStats(0, afterFifthRequestStats, projectId2, "_id2", 1, 0, 0); } public void testStatName() { @@ -3405,12 +3410,21 @@ public class IngestServiceTests extends ESTestCase { } } - private void assertProcessorStats(int processor, IngestStats stats, String pipelineId, long count, long failed, long time) { - assertStats(stats.processorStats().get(pipelineId).get(processor).stats(), count, failed, time); + private void assertProcessorStats( + int processor, + IngestStats stats, + ProjectId projectId, + String pipelineId, + long count, + long failed, + long time + ) { + assertStats(stats.processorStats().get(projectId).get(pipelineId).get(processor).stats(), count, failed, time); } private void assertPipelineStats( List pipelineStats, + ProjectId projectId, String pipelineId, long count, long failed, @@ -3418,7 +3432,7 @@ public class IngestServiceTests extends ESTestCase { long ingested, long produced ) { - var pipeline = getPipeline(pipelineStats, pipelineId); + var pipeline = getPipeline(pipelineStats, projectId, pipelineId); assertStats(pipeline.stats(), count, failed, time); assertByteStats(pipeline.byteStats(), ingested, produced); } @@ -3435,8 +3449,8 @@ public class IngestServiceTests extends ESTestCase { assertThat(byteStats.bytesProduced(), equalTo(produced)); } - private IngestStats.PipelineStat getPipeline(List pipelineStats, String id) { - return pipelineStats.stream().filter(p1 -> p1.pipelineId().equals(id)).findFirst().orElse(null); + private IngestStats.PipelineStat getPipeline(List pipelineStats, ProjectId projectId, String id) { + return pipelineStats.stream().filter(p1 -> p1.projectId().equals(projectId) && p1.pipelineId().equals(id)).findFirst().orElse(null); } private static List oneTask(ProjectId projectId, DeletePipelineRequest request) { diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java index 6586f180db87..27c8162b7e13 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.ingest; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.test.ESTestCase; @@ -28,7 +29,7 @@ public class IngestStatsTests extends ESTestCase { public void testSerialization() throws IOException { IngestStats.Stats totalStats = new IngestStats.Stats(50, 100, 200, 300); List pipelineStats = createPipelineStats(); - Map> processorStats = createProcessorStats(pipelineStats); + Map>> processorStats = createProcessorStats(pipelineStats); IngestStats ingestStats = new IngestStats(totalStats, pipelineStats, processorStats); IngestStats serializedStats = serialize(ingestStats); assertIngestStats(ingestStats, serializedStats); @@ -41,14 +42,15 @@ public class IngestStatsTests extends ESTestCase { public void testProcessorNameAndTypeIdentitySerialization() throws IOException { IngestStats.Builder builder = new IngestStats.Builder(); - builder.addPipelineMetrics("pipeline_id", new IngestPipelineMetric()); - builder.addProcessorMetrics("pipeline_id", "set", "set", new IngestMetric()); - builder.addProcessorMetrics("pipeline_id", "set:foo", "set", new IngestMetric()); - builder.addProcessorMetrics("pipeline_id", "set:bar", "set", new IngestMetric()); + ProjectId projectId = randomProjectIdOrDefault(); + builder.addPipelineMetrics(projectId, "pipeline_id", new IngestPipelineMetric()); + builder.addProcessorMetrics(projectId, "pipeline_id", "set", "set", new IngestMetric()); + builder.addProcessorMetrics(projectId, "pipeline_id", "set:foo", "set", new IngestMetric()); + builder.addProcessorMetrics(projectId, "pipeline_id", "set:bar", "set", new IngestMetric()); builder.addTotalMetrics(new IngestMetric()); IngestStats serializedStats = serialize(builder.build()); - List processorStats = serializedStats.processorStats().get("pipeline_id"); + List processorStats = serializedStats.processorStats().get(projectId).get("pipeline_id"); // these are just table stakes assertThat(processorStats.get(0).name(), is("set")); @@ -115,39 +117,44 @@ public class IngestStatsTests extends ESTestCase { public void testPipelineStatsMerge() { var first = List.of( - randomPipelineStat("pipeline-1"), - randomPipelineStat("pipeline-1"), - randomPipelineStat("pipeline-2"), - randomPipelineStat("pipeline-3"), - randomPipelineStat("pipeline-5") + randomPipelineStat("project-1", "pipeline-1"), + randomPipelineStat("project-1", "pipeline-1"), + randomPipelineStat("project-1", "pipeline-2"), + randomPipelineStat("project-2", "pipeline-1"), + randomPipelineStat("project-1", "pipeline-3") ); var second = List.of( - randomPipelineStat("pipeline-2"), - randomPipelineStat("pipeline-1"), - randomPipelineStat("pipeline-4"), - randomPipelineStat("pipeline-3") + randomPipelineStat("project-1", "pipeline-2"), + randomPipelineStat("project-1", "pipeline-1"), + randomPipelineStat("project-2", "pipeline-2"), + randomPipelineStat("project-2", "pipeline-1"), + randomPipelineStat("project-3", "pipeline-1") ); assertThat( IngestStats.PipelineStat.merge(first, second), containsInAnyOrder( new IngestStats.PipelineStat( + ProjectId.fromId("project-1"), "pipeline-1", merge(first.get(0).stats(), first.get(1).stats(), second.get(1).stats()), merge(first.get(0).byteStats(), first.get(1).byteStats(), second.get(1).byteStats()) ), new IngestStats.PipelineStat( + ProjectId.fromId("project-1"), "pipeline-2", merge(first.get(2).stats(), second.get(0).stats()), IngestStats.ByteStats.merge(first.get(2).byteStats(), second.get(0).byteStats()) ), new IngestStats.PipelineStat( - "pipeline-3", + ProjectId.fromId("project-2"), + "pipeline-1", merge(first.get(3).stats(), second.get(3).stats()), IngestStats.ByteStats.merge(first.get(3).byteStats(), second.get(3).byteStats()) ), - new IngestStats.PipelineStat("pipeline-4", second.get(2).stats(), second.get(2).byteStats()), - new IngestStats.PipelineStat("pipeline-5", first.get(4).stats(), first.get(4).byteStats()) + new IngestStats.PipelineStat(ProjectId.fromId("project-2"), "pipeline-2", second.get(2).stats(), second.get(2).byteStats()), + new IngestStats.PipelineStat(ProjectId.fromId("project-1"), "pipeline-3", first.get(4).stats(), first.get(4).byteStats()), + new IngestStats.PipelineStat(ProjectId.fromId("project-3"), "pipeline-1", second.get(4).stats(), second.get(4).byteStats()) ) ); } @@ -155,64 +162,75 @@ public class IngestStatsTests extends ESTestCase { public void testProcessorStatsMergeZeroCounts() { { var expected = randomPipelineProcessorStats(); - var first = Map.of("pipeline-1", expected); + var first = Map.of(ProjectId.fromId("project-1"), Map.of("pipeline-1", expected)); // merging with an empty map yields the non-empty map assertEquals(IngestStats.merge(Map.of(), first), first); assertEquals(IngestStats.merge(first, Map.of()), first); // it's the same exact reference, in fact - assertSame(expected, IngestStats.merge(Map.of(), first).get("pipeline-1")); - assertSame(expected, IngestStats.merge(first, Map.of()).get("pipeline-1")); + assertSame(expected, IngestStats.merge(Map.of(), first).get(ProjectId.fromId("project-1")).get("pipeline-1")); + assertSame(expected, IngestStats.merge(first, Map.of()).get(ProjectId.fromId("project-1")).get("pipeline-1")); } { var expected = randomPipelineProcessorStats(); - var first = Map.of("pipeline-1", expected); + var first = Map.of(ProjectId.fromId("project-1"), Map.of("pipeline-1", expected)); var zero = List.of( new IngestStats.ProcessorStat("proc-1", "type-1", zeroStats()), new IngestStats.ProcessorStat("proc-1", "type-2", zeroStats()), new IngestStats.ProcessorStat("proc-2", "type-1", zeroStats()), new IngestStats.ProcessorStat("proc-3", "type-3", zeroStats()) ); - var second = Map.of("pipeline-1", zero); + var second = Map.of(ProjectId.fromId("project-1"), Map.of("pipeline-1", zero)); // merging with a zero map yields the non-zero map assertEquals(IngestStats.merge(second, first), first); assertEquals(IngestStats.merge(first, second), first); // it's the same exact reference, in fact - assertSame(expected, IngestStats.merge(second, first).get("pipeline-1")); - assertSame(expected, IngestStats.merge(first, second).get("pipeline-1")); + assertSame(expected, IngestStats.merge(second, first).get(ProjectId.fromId("project-1")).get("pipeline-1")); + assertSame(expected, IngestStats.merge(first, second).get(ProjectId.fromId("project-1")).get("pipeline-1")); } } public void testProcessorStatsMerge() { var first = Map.of( - "pipeline-1", - randomPipelineProcessorStats(), - "pipeline-2", - randomPipelineProcessorStats(), - "pipeline-3", - randomPipelineProcessorStats() + ProjectId.fromId("project-1"), + Map.of("pipeline-1", randomPipelineProcessorStats(), "pipeline-2", randomPipelineProcessorStats()), + ProjectId.fromId("project-2"), + Map.of("pipeline-1", randomPipelineProcessorStats()) ); var second = Map.of( - "pipeline-2", - randomPipelineProcessorStats(), - "pipeline-3", - randomPipelineProcessorStats(), - "pipeline-1", - randomPipelineProcessorStats() + ProjectId.fromId("project-2"), + Map.of("pipeline-1", randomPipelineProcessorStats()), + ProjectId.fromId("project-1"), + Map.of("pipeline-2", randomPipelineProcessorStats(), "pipeline-1", randomPipelineProcessorStats()) ); assertEquals( IngestStats.merge(first, second), Map.of( - "pipeline-1", - expectedPipelineProcessorStats(first.get("pipeline-1"), second.get("pipeline-1")), - "pipeline-2", - expectedPipelineProcessorStats(first.get("pipeline-2"), second.get("pipeline-2")), - "pipeline-3", - expectedPipelineProcessorStats(first.get("pipeline-3"), second.get("pipeline-3")) + ProjectId.fromId("project-1"), + Map.of( + "pipeline-1", + expectedPipelineProcessorStats( + first.get(ProjectId.fromId("project-1")).get("pipeline-1"), + second.get(ProjectId.fromId("project-1")).get("pipeline-1") + ), + "pipeline-2", + expectedPipelineProcessorStats( + first.get(ProjectId.fromId("project-1")).get("pipeline-2"), + second.get(ProjectId.fromId("project-1")).get("pipeline-2") + ) + ), + ProjectId.fromId("project-2"), + Map.of( + "pipeline-1", + expectedPipelineProcessorStats( + first.get(ProjectId.fromId("project-2")).get("pipeline-1"), + second.get(ProjectId.fromId("project-2")).get("pipeline-1") + ) + ) ) ); } @@ -221,17 +239,20 @@ public class IngestStatsTests extends ESTestCase { // if a pipeline has heterogeneous *non-zero* stats, then we defer to the one with a smaller total ingest count var first = Map.of( - "pipeline-1", - List.of( - new IngestStats.ProcessorStat("name-1", "type-1", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0)), - new IngestStats.ProcessorStat("name-2", "type-2", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0)) + ProjectId.fromId("project-1"), + Map.of( + "pipeline-1", + List.of( + new IngestStats.ProcessorStat("name-1", "type-1", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0)), + new IngestStats.ProcessorStat("name-2", "type-2", new IngestStats.Stats(randomLongBetween(1, 100), 0, 0, 0)) + ) ) ); var expected = List.of(new IngestStats.ProcessorStat("name-1", "type-1", new IngestStats.Stats(1, 0, 0, 0))); - var second = Map.of("pipeline-1", expected); + var second = Map.of(ProjectId.fromId("project-1"), Map.of("pipeline-1", expected)); assertEquals(second, IngestStats.merge(first, second)); - assertSame(expected, IngestStats.merge(second, first).get("pipeline-1")); + assertSame(expected, IngestStats.merge(second, first).get(ProjectId.fromId("project-1")).get("pipeline-1")); } private static List expectedPipelineProcessorStats( @@ -265,16 +286,19 @@ public class IngestStatsTests extends ESTestCase { private static List createPipelineStats() { IngestStats.PipelineStat pipeline1Stats = new IngestStats.PipelineStat( + ProjectId.fromId("project1"), "pipeline1", new IngestStats.Stats(3, 3, 3, 3), new IngestStats.ByteStats(123, 456) ); IngestStats.PipelineStat pipeline2Stats = new IngestStats.PipelineStat( + ProjectId.fromId("project2"), "pipeline2", new IngestStats.Stats(47, 97, 197, 297), new IngestStats.ByteStats(1234567, 34567890) ); IngestStats.PipelineStat pipeline3Stats = new IngestStats.PipelineStat( + ProjectId.fromId("project1"), "pipeline3", new IngestStats.Stats(0, 0, 0, 0), new IngestStats.ByteStats(0, 0) @@ -282,7 +306,9 @@ public class IngestStatsTests extends ESTestCase { return List.of(pipeline1Stats, pipeline2Stats, pipeline3Stats); } - private static Map> createProcessorStats(List pipelineStats) { + private static Map>> createProcessorStats( + List pipelineStats + ) { assert (pipelineStats.size() >= 2); IngestStats.ProcessorStat processor1Stat = new IngestStats.ProcessorStat("processor1", "type", new IngestStats.Stats(1, 1, 1, 1)); IngestStats.ProcessorStat processor2Stat = new IngestStats.ProcessorStat("processor2", "type", new IngestStats.Stats(2, 2, 2, 2)); @@ -293,10 +319,10 @@ public class IngestStatsTests extends ESTestCase { ); // pipeline1 -> processor1,processor2; pipeline2 -> processor3 return Map.of( - pipelineStats.get(0).pipelineId(), - List.of(processor1Stat, processor2Stat), - pipelineStats.get(1).pipelineId(), - List.of(processor3Stat) + ProjectId.fromId("project1"), + Map.of(pipelineStats.get(0).pipelineId(), List.of(processor1Stat, processor2Stat)), + ProjectId.fromId("project2"), + Map.of(pipelineStats.get(1).pipelineId(), List.of(processor3Stat)) ); } @@ -333,8 +359,11 @@ public class IngestStatsTests extends ESTestCase { serializedPipelineStat.byteStats() ); List serializedProcessorStats = serializedStats.processorStats() + .getOrDefault(serializedPipelineStat.projectId(), Map.of()) + .get(serializedPipelineStat.pipelineId()); + List processorStat = ingestStats.processorStats() + .getOrDefault(serializedPipelineStat.projectId(), Map.of()) .get(serializedPipelineStat.pipelineId()); - List processorStat = ingestStats.processorStats().get(serializedPipelineStat.pipelineId()); if (processorStat != null) { Iterator it = processorStat.iterator(); // intentionally enforcing the identical ordering @@ -369,8 +398,8 @@ public class IngestStatsTests extends ESTestCase { return new IngestStats.ProcessorStat(name, type, randomStats()); } - private static IngestStats.PipelineStat randomPipelineStat(String id) { - return new IngestStats.PipelineStat(id, randomStats(), randomByteStats()); + private static IngestStats.PipelineStat randomPipelineStat(String projectId, String pipelineId) { + return new IngestStats.PipelineStat(ProjectId.fromId(projectId), pipelineId, randomStats(), randomByteStats()); } private static IngestStats.Stats randomStats() { diff --git a/server/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java index 8bdbcd9204cf..dc12a94d7079 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ClusterStatsLevel; import org.elasticsearch.action.NodeStatsLevel; import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParameters.Metric; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; @@ -33,7 +34,7 @@ public class RestNodesStatsActionTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - action = new RestNodesStatsAction(); + action = new RestNodesStatsAction(() -> ProjectId.DEFAULT); } public void testUnrecognizedMetric() { diff --git a/server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java b/server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java index eb638b777cfa..8c15df5940d9 100644 --- a/server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java +++ b/server/src/test/java/org/elasticsearch/rest/action/info/RestClusterInfoActionTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequestParame import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.http.HttpRouteStats; import org.elasticsearch.http.HttpStats; @@ -42,7 +43,7 @@ public class RestClusterInfoActionTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); - action = new RestClusterInfoAction(); + action = new RestClusterInfoAction(() -> ProjectId.DEFAULT); } public void testUnrecognizediTarget() { diff --git a/test/external-modules/multi-project/build.gradle b/test/external-modules/multi-project/build.gradle index dfb7cc0b7431..670719a805d9 100644 --- a/test/external-modules/multi-project/build.gradle +++ b/test/external-modules/multi-project/build.gradle @@ -11,6 +11,7 @@ dependencies { testImplementation project(path: ':test:test-clusters') clusterModules project(':test:external-modules:test-multi-project') clusterModules project(':modules:analysis-common') + clusterModules project(":modules:ingest-common") } tasks.named("javaRestTest").configure { diff --git a/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsMultiProjectIT.java b/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsMultiProjectIT.java new file mode 100644 index 000000000000..522829499c9f --- /dev/null +++ b/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsMultiProjectIT.java @@ -0,0 +1,341 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.node.stats; + +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.multiproject.MultiProjectRestTestCase; +import org.elasticsearch.test.cluster.ElasticsearchCluster; +import org.elasticsearch.test.cluster.local.distribution.DistributionType; +import org.elasticsearch.test.rest.ObjectPath; +import org.junit.After; +import org.junit.ClassRule; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +/** + * Multi-project integration tests for the parts of the _nodes/stats and _info APIs which return project-scoped stats. + */ +public class NodesStatsMultiProjectIT extends MultiProjectRestTestCase { + + private static final String PASSWORD = "hunter2"; + + @ClassRule + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .nodes(1) + .distribution(DistributionType.INTEG_TEST) + .module("test-multi-project") + .setting("test.multi_project.enabled", "true") + .setting("xpack.security.enabled", "true") + .user("admin", PASSWORD) + .module("ingest-common") + .build(); + + @Override + protected String getTestRestCluster() { + return cluster.getHttpAddresses(); + } + + @Override + protected Settings restClientSettings() { + final String token = basicAuthHeaderValue("admin", new SecureString(PASSWORD.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + @After + public void deletePipelines() throws IOException { + for (String projectId : getProjectIds(adminClient())) { + for (String pipelineId : getPipelineIds(projectId)) { + client().performRequest(setRequestProjectId(new Request("DELETE", "/_ingest/pipeline/" + pipelineId), projectId)); + } + } + } + + private static Set getPipelineIds(String projectId) throws IOException { + try { + return responseAsMap(client().performRequest(setRequestProjectId(new Request("GET", "/_ingest/pipeline"), projectId))).keySet(); + } catch (ResponseException e) { + assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404)); // expected if there are no pipelines + return Set.of(); + } + } + + public void testIndicesStats() throws IOException { + // Create two projects. We will use the default project as the third project in this test. + createProject("project-1"); + createProject("project-2"); + + // Create and write data into a number of indices. + // Some of the index names are used only in one project, some in two projects, some in all three. + int numDocs1Only = createPopulatedIndex("project-1", "my-index-project-1-only"); + int numDocs2Only = createPopulatedIndex("project-2", "my-index-project-2-only"); + int numDocsDefaultOnly = createPopulatedIndex("default", "my-index-default-project-only"); + int numDocs1Of1And2 = createPopulatedIndex("project-1", "my-index-projects-1-and-2"); + int numDocs2Of1And2 = createPopulatedIndex("project-2", "my-index-projects-1-and-2"); + int numDocs2Of2AndDefault = createPopulatedIndex("project-2", "my-index-projects-2-and-default"); + int numDocsDefaultOf2AndDefault = createPopulatedIndex("default", "my-index-projects-2-and-default"); + int numDocs1All = createPopulatedIndex("project-1", "my-index-all-projects"); + int numDocs2All = createPopulatedIndex("project-2", "my-index-all-projects"); + int numDocsDefaultAll = createPopulatedIndex("default", "my-index-all-projects"); + + String nodeId = findNodeId(); + + // Get indices stats at node level, and assert that the total docs count is correct: + int totalCount = ObjectPath.evaluate(getAsMap("/_nodes/stats/indices?level=node"), "nodes." + nodeId + ".indices.docs.count"); + assertThat( + totalCount, + equalTo( + numDocs1Only + numDocs2Only + numDocsDefaultOnly + numDocs1Of1And2 + numDocs2Of1And2 + numDocs2Of2AndDefault + + numDocsDefaultOf2AndDefault + numDocs1All + numDocs2All + numDocsDefaultAll + ) + ); + + // Get indices stats at index level... + Map indexStats = ObjectPath.evaluate( + getAsMap("/_nodes/stats/indices?level=indices"), + "nodes." + nodeId + ".indices.indices" + ); + // ...assert that all the indices are present, prefixed by their project IDs... + Set expectedProjectIdAndIndexNames = Set.of( + "project-1/my-index-project-1-only", + "project-2/my-index-project-2-only", + "default/my-index-default-project-only", + "project-1/my-index-projects-1-and-2", + "project-2/my-index-projects-1-and-2", + "project-2/my-index-projects-2-and-default", + "default/my-index-projects-2-and-default", + "project-1/my-index-all-projects", + "project-2/my-index-all-projects", + "default/my-index-all-projects" + ); + assertThat(indexStats.keySet(), equalTo(expectedProjectIdAndIndexNames)); + // ...and assert that the docs counts are correct for some of the indices: + assertThat(ObjectPath.evaluate(indexStats, "project-1/my-index-project-1-only.docs.count"), equalTo(numDocs1Only)); + assertThat(ObjectPath.evaluate(indexStats, "project-2/my-index-projects-2-and-default.docs.count"), equalTo(numDocs2Of2AndDefault)); + assertThat(ObjectPath.evaluate(indexStats, "default/my-index-all-projects.docs.count"), equalTo(numDocsDefaultAll)); + + // Get indices stats at shard level... + Map shardStats = ObjectPath.evaluate( + getAsMap("/_nodes/stats/indices?level=shards"), + "nodes." + nodeId + ".indices.shards" + ); + // ...assert that all the indices are present, prefixed by their project IDs... + assertThat(shardStats.keySet(), equalTo(expectedProjectIdAndIndexNames)); + // ...assert that the entry for the first index has exactly one entry (for its single shard)... + List> index1ShardStats = ObjectPath.evaluate(shardStats, "project-1/my-index-project-1-only"); + assertThat(index1ShardStats, hasSize(1)); + // ...and assert that that is shard 0 and that the doc count is correct: + assertThat(ObjectPath.evaluate(index1ShardStats.getFirst(), "0.docs.count"), equalTo(numDocs1Only)); + } + + // Warning: Some ingest stats are not reset as part of test tear-down. This only works because we do all the ingest tests in one method. + public void testIngestStats() throws IOException { + // Create two projects. We will use the default project as the third project in this test. + createProject("project-1"); + createProject("project-2"); + + // Create and run data through a number of indices. + // Some of the pipeline names are used only in one project, some in two projects, some in all three. + int numDocs1Only = createAndUsePipeline("project-1", "my-pipeline-project-1-only"); + int numDocs2Only = createAndUsePipeline("project-2", "my-pipeline-project-2-only"); + int numDocsDefaultOnly = createAndUsePipeline("default", "my-pipeline-default-project-only"); + int numDocs1Of1And2 = createAndUsePipeline("project-1", "my-pipeline-projects-1-and-2"); + int numDocs2Of1And2 = createAndUsePipeline("project-2", "my-pipeline-projects-1-and-2"); + int numDocs2Of2AndDefault = createAndUsePipeline("project-2", "my-pipeline-projects-2-and-default"); + int numDocsDefaultOf2AndDefault = createAndUsePipeline("default", "my-pipeline-projects-2-and-default"); + int numDocs1All = createAndUsePipeline("project-1", "my-pipeline-all-projects"); + int numDocs2All = createAndUsePipeline("project-2", "my-pipeline-all-projects"); + int numDocsDefaultAll = createAndUsePipeline("default", "my-pipeline-all-projects"); + + // Get the ingest stats from _nodes/stats and assert they are correct: + Map ingestNodesStats = ObjectPath.evaluate(getAsMap("/_nodes/stats/ingest"), "nodes." + findNodeId() + ".ingest"); + assertIngestStats( + ingestNodesStats, + numDocs1Only, + numDocs2Only, + numDocsDefaultOnly, + numDocs1Of1And2, + numDocs2Of1And2, + numDocs2Of2AndDefault, + numDocsDefaultOf2AndDefault, + numDocs1All, + numDocs2All, + numDocsDefaultAll + ); + // Do the same thing for the ingest stats from _info: + Map ingestInfo = ObjectPath.evaluate(getAsMap("/_info/ingest"), "ingest"); + assertIngestStats( + ingestInfo, + numDocs1Only, + numDocs2Only, + numDocsDefaultOnly, + numDocs1Of1And2, + numDocs2Of1And2, + numDocs2Of2AndDefault, + numDocsDefaultOf2AndDefault, + numDocs1All, + numDocs2All, + numDocsDefaultAll + ); + } + + private static void assertIngestStats( + Map ingestNodesStats, + int numDocs1Only, + int numDocs2Only, + int numDocsDefaultOnly, + int numDocs1Of1And2, + int numDocs2Of1And2, + int numDocs2Of2AndDefault, + int numDocsDefaultOf2AndDefault, + int numDocs1All, + int numDocs2All, + int numDocsDefaultAll + ) throws IOException { + // Assert that the total count is correct... + assertThat( + ObjectPath.evaluate(ingestNodesStats, "total.count"), + equalTo( + numDocs1Only + numDocs2Only + numDocsDefaultOnly + numDocs1Of1And2 + numDocs2Of1And2 + numDocs2Of2AndDefault + + numDocsDefaultOf2AndDefault + numDocs1All + numDocs2All + numDocsDefaultAll + ) + ); + // ...assert that all the pipelines are present, prefixed by their project IDs... + Map pipelineStats = ObjectPath.evaluate(ingestNodesStats, "pipelines"); + assertThat( + pipelineStats.keySet(), + containsInAnyOrder( + "project-1/my-pipeline-project-1-only", + "project-2/my-pipeline-project-2-only", + "default/my-pipeline-default-project-only", + "project-1/my-pipeline-projects-1-and-2", + "project-2/my-pipeline-projects-1-and-2", + "project-2/my-pipeline-projects-2-and-default", + "default/my-pipeline-projects-2-and-default", + "project-1/my-pipeline-all-projects", + "project-2/my-pipeline-all-projects", + "default/my-pipeline-all-projects" + ) + ); + // ...assert that the pipeline doc counts are for some of the pipelines correct... + assertThat(ObjectPath.evaluate(pipelineStats, "project-1/my-pipeline-project-1-only.count"), equalTo(numDocs1Only)); + assertThat( + ObjectPath.evaluate(pipelineStats, "project-2/my-pipeline-projects-2-and-default.count"), + equalTo(numDocs2Of2AndDefault) + ); + assertThat(ObjectPath.evaluate(pipelineStats, "default/my-pipeline-all-projects.count"), equalTo(numDocsDefaultAll)); + // ...and that the processors are correct with the correct counts: + // (the counts for the lowercase processors should be halved because it has an if condition which triggers half the time) + Map processorStatsPipeline1Only = extractMergedProcessorStats( + pipelineStats, + "project-1/my-pipeline-project-1-only" + ); + assertThat(ObjectPath.evaluate(processorStatsPipeline1Only, "set.stats.count"), equalTo(numDocs1Only)); + assertThat(ObjectPath.evaluate(processorStatsPipeline1Only, "lowercase.stats.count"), equalTo(numDocs1Only / 2)); + Map processorStatsPipeline2Of2AndDefault = extractMergedProcessorStats( + pipelineStats, + "project-2/my-pipeline-projects-2-and-default" + ); + assertThat(ObjectPath.evaluate(processorStatsPipeline2Of2AndDefault, "set.stats.count"), equalTo(numDocs2Of2AndDefault)); + assertThat(ObjectPath.evaluate(processorStatsPipeline2Of2AndDefault, "lowercase.stats.count"), equalTo(numDocs2Of2AndDefault / 2)); + Map processorStatsPipelineDefaultAll = extractMergedProcessorStats( + pipelineStats, + "default/my-pipeline-all-projects" + ); + assertThat(ObjectPath.evaluate(processorStatsPipelineDefaultAll, "set.stats.count"), equalTo(numDocsDefaultAll)); + assertThat(ObjectPath.evaluate(processorStatsPipelineDefaultAll, "lowercase.stats.count"), equalTo(numDocsDefaultAll / 2)); + } + + private int createPopulatedIndex(String projectId, String indexName) throws IOException { + createIndex(req -> { + setRequestProjectId(req, projectId); + return client().performRequest(req); + }, indexName, null, null, null); + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + Request request = new Request("POST", "/" + indexName + "/_doc"); + request.setJsonEntity(Strings.format("{ \"num\": %d, \"str\": \"%s\" }", randomInt(), randomAlphaOfLengthBetween(5, 10))); + setRequestProjectId(request, projectId); + client().performRequest(request); + } + client().performRequest(setRequestProjectId(new Request("POST", "/" + indexName + "/_refresh"), projectId)); + return numDocs; + } + + private int createAndUsePipeline(String projectId, String pipelineId) throws IOException { + Request createPipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId); + setRequestProjectId(createPipelineRequest, projectId); + createPipelineRequest.setJsonEntity(""" + { + "processors": [ + { + "set": { + "field": "foo", + "value": "bar" + } + }, + { + "lowercase": { + "field": "str", + "if": "ctx.needs_lower" + } + } + ] + } + """); + client().performRequest(createPipelineRequest); + + int numDocs = randomIntBetween(2, 6) * 2; + for (int i = 0; i < numDocs; i++) { + Request request = new Request("POST", "/my-index/_doc?pipeline=" + pipelineId); + boolean needsLower = (i % 2) == 0; // run the lowercase processor for every other doc + request.setJsonEntity( + Strings.format( + "{ \"num\": %d, \"str\": \"%s\", \"needs_lower\": %s }", + randomInt(), + randomAlphaOfLengthBetween(5, 10), + needsLower + ) + ); + setRequestProjectId(request, projectId); + client().performRequest(request); + } + client().performRequest(setRequestProjectId(new Request("POST", "/my-index/_refresh"), projectId)); + + return numDocs; + } + + private static String findNodeId() throws IOException { + return ObjectPath.>evaluate(getAsMap("/_nodes"), "nodes").keySet().stream().findAny().orElseThrow(); + } + + /** + * Given the map of all the stats for all the pipelines, extracts the processor stats for the given pipeline ID. The list of maps is + * merged into a single map. + */ + private static Map extractMergedProcessorStats(Map pipelineStats, String pipelineId) + throws IOException { + Map merged = new HashMap<>(); + ObjectPath.>>evaluate(pipelineStats, pipelineId + ".processors").forEach(merged::putAll); + return merged; + } +} diff --git a/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsMultiProjectIT.java b/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsMultiProjectIT.java index d62651afa06a..759a0d1daac3 100644 --- a/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsMultiProjectIT.java +++ b/test/external-modules/multi-project/src/javaRestTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsMultiProjectIT.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.core.Nullable; import org.elasticsearch.multiproject.MultiProjectRestTestCase; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; @@ -25,6 +26,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { @@ -39,6 +41,7 @@ public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { .setting("test.multi_project.enabled", "true") .setting("xpack.security.enabled", "true") .user("admin", PASSWORD) + .module("ingest-common") .build(); @Override @@ -70,12 +73,14 @@ public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { createIndex(projectId3, "idx"); + createPipeline(projectId1, "my-pipeline"); + for (int i = 0; i < 5; i++) { - createDocument(projectId1, "i1", "{ \"proj\":1, \"id\":" + i + ", \"b1\":true, \"b2\":false }"); + createDocument(projectId1, "i1", "{ \"proj\":1, \"id\":" + i + ", \"b1\":true, \"b2\":false }", "my-pipeline"); } refreshIndex(projectId1, "i1"); for (int i = 0; i < 3; i++) { - createDocument(projectId3, "idx", "{ \"proj\":3, \"date\":\"2020-02-20T20:20:20\" }"); + createDocument(projectId3, "idx", "{ \"proj\":3, \"date\":\"2020-02-20T20:20:20\" }", null); } refreshIndex(projectId3, "idx"); @@ -83,7 +88,11 @@ public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { assertThat(response.evaluate("status"), equalTo("green")); assertThat(response.evaluate("indices.count"), equalTo(3 + 2 + 1)); assertThat(response.evaluate("indices.docs.count"), equalTo(5 + 3)); - assertThat(response.evaluate("indices.mappings.total_field_count"), equalTo(4 + 2)); + // We expect: + // - 4 fields (2 long and 2 boolean) from the input for project 1, index i1 + // - 1 long field from the pipeline for project 1, index i1 + // - 2 fields (1 long and 1 date) for project 3, index idx + assertThat(response.evaluate("indices.mappings.total_field_count"), equalTo(4 + 1 + 2)); final List> fieldTypes = response.evaluate("indices.mappings.field_types"); assertThat(fieldTypes.size(), equalTo(3)); @@ -98,8 +107,13 @@ public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { assertThat(fieldTypes.get(1).get("index_count"), equalTo(1)); assertThat(fieldTypes.get(2).get("name"), equalTo("long")); - assertThat(fieldTypes.get(2).get("count"), equalTo(3)); + assertThat(fieldTypes.get(2).get("count"), equalTo(2 + 1 + 1)); assertThat(fieldTypes.get(2).get("index_count"), equalTo(2)); + + assertThat(response.evaluate("nodes.ingest.number_of_pipelines"), equalTo(1)); + Map ingestStats = response.evaluate("nodes.ingest.processor_stats"); + assertThat(ingestStats.keySet(), containsInAnyOrder("set")); + assertThat(ObjectPath.evaluate(ingestStats, "set.count"), equalTo(5)); } private void createIndex(String projectId, String indexName) throws IOException { @@ -109,8 +123,26 @@ public class ClusterStatsMultiProjectIT extends MultiProjectRestTestCase { }, indexName, null, null, null); } - private void createDocument(String projectId, String indexName, String body) throws IOException { - Request request = new Request("POST", "/" + indexName + "/_doc"); + private void createPipeline(String projectId, String pipelineId) throws IOException { + Request createPipelineRequest = new Request("PUT", "/_ingest/pipeline/" + pipelineId); + setRequestProjectId(createPipelineRequest, projectId); + createPipelineRequest.setJsonEntity(""" + { + "processors": [ + { + "set": { + "field": "foo", + "value": 999 + } + } + ] + } + """); + client().performRequest(createPipelineRequest); + } + + private void createDocument(String projectId, String indexName, String body, @Nullable String pipelineId) throws IOException { + Request request = new Request("POST", "/" + indexName + "/_doc" + (pipelineId != null ? "?pipeline=" + pipelineId : "")); request.setJsonEntity(body); setRequestProjectId(request, projectId); client().performRequest(request); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index ef77154c76b5..e6022779d529 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -2804,6 +2804,14 @@ public abstract class ESRestTestCase extends ESTestCase { } } + /** + * If multi-project is enabled, returns the active project ID followed by a slash, which is used to prefix various keys in REST + * responses. Otherwise, returns the empty string. + */ + protected String activeProjectPrefix() { + return multiProjectEnabled ? (activeProject + "/") : ""; + } + protected void createProject(String project) throws IOException { assert multiProjectEnabled; final Request request = new Request("PUT", "/_project/" + project); @@ -2826,7 +2834,7 @@ public abstract class ESRestTestCase extends ESTestCase { ); } - private Collection getProjectIds(RestClient client) throws IOException { + protected Collection getProjectIds(RestClient client) throws IOException { assert multiProjectEnabled; final Request request = new Request("GET", "/_cluster/state/routing_table?multi_project=true"); try { diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/Stash.java b/test/framework/src/main/java/org/elasticsearch/test/rest/Stash.java index a59f416bba97..61b8f2ee554c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/Stash.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/Stash.java @@ -11,7 +11,6 @@ package org.elasticsearch.test.rest; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.elasticsearch.common.Strings; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; @@ -59,24 +58,37 @@ public class Stash implements ToXContentFragment { /** * Tells whether a particular key needs to be looked up in the stash based on its name. - * Returns true if the string representation of the key starts with "$", false otherwise + * Returns true if the string representation of the key either starts with {@code $} or contains a {@code ${...}} reference. * The stash contains fields eventually extracted from previous responses that can be reused * as arguments for following requests (e.g. scroll_id) */ public boolean containsStashedValue(Object key) { - if (key == null || false == key instanceof CharSequence) { + if (false == key instanceof CharSequence) { return false; } String stashKey = key.toString(); - if (false == Strings.hasLength(stashKey)) { - return false; - } if (stashKey.startsWith("$")) { return true; } return EXTENDED_KEY.matcher(stashKey).find(); } + /** + * Tells whether a particular key represents exactly a stashed value reference. + * Returns true if the string representation of the key either starts with {@code $} or consists only of a {@code ${...}} reference. + * Unlike {@link #containsStashedValue}, returns false if the key contains an a {@code ${...}} reference within a longer string. + */ + public boolean isStashedValue(Object key) { + if (false == key instanceof CharSequence) { + return false; + } + String stashKey = key.toString(); + if (stashKey.startsWith("$")) { + return true; + } + return EXTENDED_KEY.matcher(stashKey).matches(); + } + /** * Retrieves a value from the current stash. * The stash contains fields eventually extracted from previous responses that can be reused diff --git a/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/ESClientYamlSuiteTestCase.java b/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/ESClientYamlSuiteTestCase.java index 15ebcf3d1feb..087feadfb84a 100644 --- a/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/ESClientYamlSuiteTestCase.java +++ b/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/ESClientYamlSuiteTestCase.java @@ -476,6 +476,8 @@ public abstract class ESClientYamlSuiteTestCase extends ESRestTestCase { } restTestExecutionContext.clear(); + // Prepare the stash so that ${_project_id_prefix_} is expanded as needed in some assertions: + restTestExecutionContext.stash().stashValue("_project_id_prefix_", activeProjectPrefix()); try { for (ExecutableSection executableSection : testCandidate.getTestSection().getExecutableSections()) { diff --git a/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/Assertion.java b/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/Assertion.java index 074fced33ba8..582543d15503 100644 --- a/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/Assertion.java +++ b/test/yaml-rest-runner/src/main/java/org/elasticsearch/test/rest/yaml/section/Assertion.java @@ -50,9 +50,11 @@ public abstract class Assertion implements ExecutableSection { } protected final Object getActualValue(ClientYamlTestExecutionContext executionContext) throws IOException { - if (executionContext.stash().containsStashedValue(field)) { + // If the "field" name contains only a simple stashed value reference, such as "$body", just get that value from the stash. + if (executionContext.stash().isStashedValue(field)) { return executionContext.stash().getValue(field); } + // Otherwise, get the value from the response. The field name will be subject to expansion of embedded ${...} references. return executionContext.response(field); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datatiers/DataTierUsageFixtures.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datatiers/DataTierUsageFixtures.java index 63cc6e4d7914..b7b6338ce970 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datatiers/DataTierUsageFixtures.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/datatiers/DataTierUsageFixtures.java @@ -79,7 +79,14 @@ class DataTierUsageFixtures extends ESTestCase { IndexShardStats shardStats = new IndexShardStats(shardId, new ShardStats[] { shardStat }); indexStats.computeIfAbsent(shardId.getIndex(), k -> new ArrayList<>()).add(shardStats); } - return new NodeIndicesStats(COMMON_STATS, Map.of(), indexStats, true); + return new NodeIndicesStats( + COMMON_STATS, + Map.of(), + indexStats, + // projectsByIndex is not needed as it is only used for rendering as XContent: + Map.of(), + true + ); } private static ShardStats shardStat(long byteCount, long docCount, ShardRouting routing) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java index 81a1b55009fc..1f58d2522a15 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ml.action; import org.elasticsearch.TransportVersion; import org.elasticsearch.TransportVersions; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.ingest.IngestStats; import org.elasticsearch.xpack.core.action.util.QueryPage; @@ -20,6 +21,7 @@ import org.elasticsearch.xpack.core.ml.inference.trainedmodel.InferenceStatsTest import org.elasticsearch.xpack.core.ml.inference.trainedmodel.TrainedModelSizeStatsTests; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,8 +62,15 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer List pipelineIds = Stream.generate(() -> randomAlphaOfLength(10)).limit(randomIntBetween(0, 10)).toList(); return new IngestStats( new IngestStats.Stats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong()), - pipelineIds.stream().map(id -> new IngestStats.PipelineStat(id, randomStats(), randomByteStats())).collect(Collectors.toList()), - pipelineIds.stream().collect(Collectors.toMap(Function.identity(), (v) -> randomProcessorStats())) + pipelineIds.stream() + .map(id -> new IngestStats.PipelineStat(ProjectId.DEFAULT, id, randomStats(), randomByteStats())) + .collect(Collectors.toList()), + pipelineIds.isEmpty() + ? Map.of() + : Map.of( + ProjectId.DEFAULT, + pipelineIds.stream().collect(Collectors.toMap(Function.identity(), v -> randomProcessorStats())) + ) ); } @@ -104,6 +113,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -139,6 +149,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -212,6 +223,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -285,6 +297,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -358,6 +371,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -432,6 +446,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -506,6 +521,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY @@ -580,6 +596,7 @@ public class GetTrainedModelsStatsActionResponseTests extends AbstractBWCWireSer .stream() .map( pipelineStat -> new IngestStats.PipelineStat( + ProjectId.DEFAULT, pipelineStat.pipelineId(), pipelineStat.stats(), IngestStats.ByteStats.IDENTITY diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java index bed0cbe3b867..00c8f2d04ddc 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningUsageTransportAction.java @@ -13,9 +13,11 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.OriginSettingClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.Maps; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.env.Environment; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.license.XPackLicenseState; @@ -586,6 +588,7 @@ public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransp } // TODO separate out ours and users models possibly regression vs classification + @FixForMultiProject // do not use default project private static void addInferenceIngestUsage(GetTrainedModelsStatsAction.Response statsResponse, Map inferenceUsage) { int pipelineCount = 0; StatsAccumulator docCountStats = new StatsAccumulator(); @@ -594,13 +597,19 @@ public class MachineLearningUsageTransportAction extends XPackUsageFeatureTransp for (GetTrainedModelsStatsAction.Response.TrainedModelStats modelStats : statsResponse.getResources().results()) { pipelineCount += modelStats.getPipelineCount(); - modelStats.getIngestStats().processorStats().values().stream().flatMap(List::stream).forEach(processorStat -> { - if (processorStat.name().equals(InferenceProcessor.TYPE)) { - docCountStats.add(processorStat.stats().ingestCount()); - timeStats.add(processorStat.stats().ingestTimeInMillis()); - failureStats.add(processorStat.stats().ingestFailedCount()); - } - }); + modelStats.getIngestStats() + .processorStats() + .getOrDefault(ProjectId.DEFAULT, Map.of()) + .values() + .stream() + .flatMap(List::stream) + .forEach(processorStat -> { + if (processorStat.name().equals(InferenceProcessor.TYPE)) { + docCountStats.add(processorStat.stats().ingestCount()); + timeStats.add(processorStat.stats().ingestTimeInMillis()); + failureStats.add(processorStat.stats().ingestFailedCount()); + } + }); } Map ingestUsage = Maps.newMapWithExpectedSize(6); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java index 30b3943726c5..34e44c380fb5 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsAction.java @@ -21,12 +21,15 @@ import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -420,21 +423,26 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction< return nodesStatsRequest; } + @FixForMultiProject // do not use default project static IngestStats ingestStatsForPipelineIds(NodeStats nodeStats, Set pipelineIds) { IngestStats fullNodeStats = nodeStats.getIngestStats(); - Map> filteredProcessorStats = new HashMap<>(fullNodeStats.processorStats()); + Map> filteredProcessorStats = new HashMap<>( + fullNodeStats.processorStats().getOrDefault(ProjectId.DEFAULT, Map.of()) + ); filteredProcessorStats.keySet().retainAll(pipelineIds); List filteredPipelineStats = fullNodeStats.pipelineStats() .stream() + .filter(pipelineStat -> pipelineStat.projectId().equals(ProjectId.DEFAULT)) .filter(pipelineStat -> pipelineIds.contains(pipelineStat.pipelineId())) .collect(Collectors.toList()); IngestStatsAccumulator accumulator = new IngestStatsAccumulator(); filteredPipelineStats.forEach(pipelineStat -> accumulator.inc(pipelineStat.stats())); - return new IngestStats(accumulator.build(), filteredPipelineStats, filteredProcessorStats); + return new IngestStats(accumulator.build(), filteredPipelineStats, Map.of(ProjectId.DEFAULT, filteredProcessorStats)); } + @FixForMultiProject // don't use default project private static IngestStats mergeStats(List ingestStatsList) { Map pipelineStatsAcc = Maps.newLinkedHashMapWithExpectedSize(ingestStatsList.size()); @@ -448,7 +456,7 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction< .inc(pipelineStat) ); - ingestStats.processorStats().forEach((pipelineId, processorStat) -> { + ingestStats.processorStats().getOrDefault(ProjectId.DEFAULT, Map.of()).forEach((pipelineId, processorStat) -> { Map processorAcc = processorStatsAcc.computeIfAbsent( pipelineId, k -> new LinkedHashMap<>() @@ -464,7 +472,12 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction< List pipelineStatList = new ArrayList<>(pipelineStatsAcc.size()); pipelineStatsAcc.forEach( (pipelineId, accumulator) -> pipelineStatList.add( - new IngestStats.PipelineStat(pipelineId, accumulator.buildStats(), accumulator.buildByteStats()) + new IngestStats.PipelineStat( + Metadata.DEFAULT_PROJECT_ID, + pipelineId, + accumulator.buildStats(), + accumulator.buildByteStats() + ) ) ); @@ -477,7 +490,7 @@ public class TransportGetTrainedModelsStatsAction extends TransportAction< processorStatList.put(pipelineId, processorStats); }); - return new IngestStats(totalStats.build(), pipelineStatList, processorStatList); + return new IngestStats(totalStats.build(), pipelineStatList, Map.of(ProjectId.DEFAULT, processorStatList)); } private static class IngestStatsAccumulator { diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java index 4a3b29904216..df923228b27f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningInfoTransportActionTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -987,20 +988,23 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase { new IngestStats.Stats(0, 0, 0, 0), List.of(), Map.of( - "pipeline_1", - List.of( - new IngestStats.ProcessorStat( - InferenceProcessor.TYPE, - InferenceProcessor.TYPE, - new IngestStats.Stats(10, 1, 1000, 100) - ), - new IngestStats.ProcessorStat( - InferenceProcessor.TYPE, - InferenceProcessor.TYPE, - new IngestStats.Stats(20, 2, 2000, 200) - ), - // Adding a non inference processor that should be ignored - new IngestStats.ProcessorStat("grok", "grok", new IngestStats.Stats(100, 100, 100, 100)) + ProjectId.DEFAULT, + Map.of( + "pipeline_1", + List.of( + new IngestStats.ProcessorStat( + InferenceProcessor.TYPE, + InferenceProcessor.TYPE, + new IngestStats.Stats(10, 1, 1000, 100) + ), + new IngestStats.ProcessorStat( + InferenceProcessor.TYPE, + InferenceProcessor.TYPE, + new IngestStats.Stats(20, 2, 2000, 200) + ), + // Adding a non inference processor that should be ignored + new IngestStats.ProcessorStat("grok", "grok", new IngestStats.Stats(100, 100, 100, 100)) + ) ) ) ), @@ -1015,12 +1019,15 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase { new IngestStats.Stats(0, 0, 0, 0), List.of(), Map.of( - "pipeline_1", - List.of( - new IngestStats.ProcessorStat( - InferenceProcessor.TYPE, - InferenceProcessor.TYPE, - new IngestStats.Stats(30, 3, 3000, 300) + ProjectId.DEFAULT, + Map.of( + "pipeline_1", + List.of( + new IngestStats.ProcessorStat( + InferenceProcessor.TYPE, + InferenceProcessor.TYPE, + new IngestStats.Stats(30, 3, 3000, 300) + ) ) ) ) @@ -1036,12 +1043,15 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase { new IngestStats.Stats(0, 0, 0, 0), List.of(), Map.of( - "pipeline_2", - List.of( - new IngestStats.ProcessorStat( - InferenceProcessor.TYPE, - InferenceProcessor.TYPE, - new IngestStats.Stats(40, 4, 4000, 400) + ProjectId.DEFAULT, + Map.of( + "pipeline_2", + List.of( + new IngestStats.ProcessorStat( + InferenceProcessor.TYPE, + InferenceProcessor.TYPE, + new IngestStats.Stats(40, 4, 4000, 400) + ) ) ) ) @@ -1088,12 +1098,15 @@ public class MachineLearningInfoTransportActionTests extends ESTestCase { new IngestStats.Stats(0, 0, 0, 0), List.of(), Map.of( - "pipeline_3", - List.of( - new IngestStats.ProcessorStat( - InferenceProcessor.TYPE, - InferenceProcessor.TYPE, - new IngestStats.Stats(50, 5, 5000, 500) + ProjectId.DEFAULT, + Map.of( + "pipeline_3", + List.of( + new IngestStats.ProcessorStat( + InferenceProcessor.TYPE, + InferenceProcessor.TYPE, + new IngestStats.Stats(50, 5, 5000, 500) + ) ) ) ) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index c4ec1fdc9eb5..06ba7ba113d4 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -168,9 +168,24 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { buildNodeStats( new IngestStats.Stats(2, 2, 3, 4), Arrays.asList( - new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(0, 0, 3, 1), new IngestStats.ByteStats(789, 0)), - new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1), new IngestStats.ByteStats(123, 123)), - new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1), new IngestStats.ByteStats(1234, 5678)) + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline1", + new IngestStats.Stats(0, 0, 3, 1), + new IngestStats.ByteStats(789, 0) + ), + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline2", + new IngestStats.Stats(1, 1, 0, 1), + new IngestStats.ByteStats(123, 123) + ), + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline3", + new IngestStats.Stats(2, 1, 1, 1), + new IngestStats.ByteStats(1234, 5678) + ) ), Arrays.asList( Arrays.asList( @@ -192,9 +207,24 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { buildNodeStats( new IngestStats.Stats(15, 5, 3, 4), Arrays.asList( - new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 3, 1), new IngestStats.ByteStats(5678, 123456)), - new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(1, 1, 0, 1), new IngestStats.ByteStats(111, 222)), - new IngestStats.PipelineStat("pipeline3", new IngestStats.Stats(2, 1, 1, 1), new IngestStats.ByteStats(555, 777)) + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline1", + new IngestStats.Stats(10, 1, 3, 1), + new IngestStats.ByteStats(5678, 123456) + ), + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline2", + new IngestStats.Stats(1, 1, 0, 1), + new IngestStats.ByteStats(111, 222) + ), + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline3", + new IngestStats.Stats(2, 1, 1, 1), + new IngestStats.ByteStats(555, 777) + ) ), Arrays.asList( Arrays.asList( @@ -230,13 +260,21 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { IngestStats expectedStatsModel1 = new IngestStats( new IngestStats.Stats(10, 1, 6, 2), Collections.singletonList( - new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2), new IngestStats.ByteStats(6467, 123456)) + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline1", + new IngestStats.Stats(10, 1, 6, 2), + new IngestStats.ByteStats(6467, 123456) + ) ), - Collections.singletonMap( - "pipeline1", - Arrays.asList( - new IngestStats.ProcessorStat("inference", "inference", new IngestStats.Stats(120, 12, 0, 1)), - new IngestStats.ProcessorStat("grok", "grok", new IngestStats.Stats(10, 1, 0, 0)) + Map.of( + ProjectId.DEFAULT, + Collections.singletonMap( + "pipeline1", + Arrays.asList( + new IngestStats.ProcessorStat("inference", "inference", new IngestStats.Stats(120, 12, 0, 1)), + new IngestStats.ProcessorStat("grok", "grok", new IngestStats.Stats(10, 1, 0, 0)) + ) ) ) ); @@ -244,10 +282,20 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { IngestStats expectedStatsModel2 = new IngestStats( new IngestStats.Stats(12, 3, 6, 4), Arrays.asList( - new IngestStats.PipelineStat("pipeline1", new IngestStats.Stats(10, 1, 6, 2), new IngestStats.ByteStats(6467, 123456)), - new IngestStats.PipelineStat("pipeline2", new IngestStats.Stats(2, 2, 0, 2), new IngestStats.ByteStats(234, 345)) + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline1", + new IngestStats.Stats(10, 1, 6, 2), + new IngestStats.ByteStats(6467, 123456) + ), + new IngestStats.PipelineStat( + ProjectId.DEFAULT, + "pipeline2", + new IngestStats.Stats(2, 2, 0, 2), + new IngestStats.ByteStats(234, 345) + ) ), - new HashMap<>() { + Map.of(ProjectId.DEFAULT, new HashMap<>() { { put( "pipeline2", @@ -264,7 +312,7 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { ) ); } - } + }) ); assertThat(ingestStatsMap, hasEntry("trained_model_1", expectedStatsModel1)); @@ -280,7 +328,10 @@ public class TransportGetTrainedModelsStatsActionTests extends ESTestCase { IngestStats ingestStats = new IngestStats( overallStats, pipelineNames, - IntStream.range(0, pipelineids.size()).boxed().collect(Collectors.toMap(pipelineids::get, processorStats::get)) + Map.of( + ProjectId.DEFAULT, + IntStream.range(0, pipelineids.size()).boxed().collect(Collectors.toMap(pipelineids::get, processorStats::get)) + ) ); return new NodeStats( mock(DiscoveryNode.class), diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index 867bf38c3f9a..f58059c288d3 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -367,7 +367,7 @@ public class NodeStatsMonitoringDocTests extends BaseFilteredMonitoringDocTestCa segmentsStats.addBitsetMemoryInBytes(++iota); indicesCommonStats.getSegments().add(segmentsStats); - final NodeIndicesStats indices = new NodeIndicesStats(indicesCommonStats, emptyMap(), emptyMap(), randomBoolean()); + final NodeIndicesStats indices = new NodeIndicesStats(indicesCommonStats, emptyMap(), emptyMap(), emptyMap(), randomBoolean()); // Filesystem final FsInfo.DeviceStats ioStatsOne = new FsInfo.DeviceStats( diff --git a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle index 9976fef0fe44..8e5462059792 100644 --- a/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/core-rest-tests-with-multiple-projects/build.gradle @@ -40,15 +40,11 @@ tasks.named("yamlRestTest").configure { '^cluster.desired_balance/10_basic/*', '^data_stream/40_supported_apis/Verify shard stores api', // uses _shard_stores API '^health/10_basic/*', - '^health/40_diagnosis/*', '^indices.get_alias/10_basic/Get alias against closed indices', // Does NOT work with security enabled, see also core-rest-tests-with-security '^indices.recovery/*/*', '^indices.resolve_cluster/*/*', '^indices.resolve_cluster/*/*/*', '^indices.shard_stores/*/*', - '^test/ingest/15_info_ingest/*', // uses cluster info API - '^test/ingest/70_bulk/Test bulk request with default pipeline', // uses stats API - '^test/ingest/70_bulk/Test bulk request without default pipeline', // uses stats API '^migration/*/*', '^snapshot.clone/*/*', '^snapshot.create/*/*', @@ -76,7 +72,7 @@ tasks.named("yamlRestTest").configure { // Reindex from remote is not supported on Serverless and required additional testing setup '^reindex/60_wait_for_active_shards/can override wait_for_active_shards', // <- Requires a single shard '^reindex/90_remote/*', - '^reindex/95_parent_join/Reindex from remote*' + '^reindex/95_parent_join/Reindex from remote*', ]; if (buildParams.snapshotBuild == false) { blacklist += [];