From 24329bbe646bcd687f6d4b36c5b8690f20641f46 Mon Sep 17 00:00:00 2001 From: Niels Bauman Date: Wed, 25 Jun 2025 11:25:17 -0300 Subject: [PATCH] Make ESQL join operators project-aware --- .../test/ClusterServiceUtils.java | 22 +++++++++++++--- .../esql/enrich/AbstractLookupService.java | 17 +++++++------ .../esql/enrich/EnrichLookupService.java | 7 ++++-- .../esql/enrich/EnrichPolicyResolver.java | 13 ++++++++-- .../esql/enrich/LookupFromIndexService.java | 7 ++++-- .../esql/plugin/TransportEsqlQueryAction.java | 13 +++++++--- .../enrich/EnrichPolicyResolverTests.java | 25 +++++++++++-------- .../enrich/LookupFromIndexOperatorTests.java | 7 ++++-- .../build.gradle | 9 ------- 9 files changed, 79 insertions(+), 41 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java index e0e7505191da..e0936702c048 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.ClusterStatePublisher; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -138,6 +140,16 @@ public class ClusterServiceUtils { DiscoveryNode localNode, Settings providedSettings, ClusterSettings clusterSettings + ) { + return createClusterService(threadPool, localNode, providedSettings, clusterSettings, null); + } + + public static ClusterService createClusterService( + ThreadPool threadPool, + DiscoveryNode localNode, + Settings providedSettings, + ClusterSettings clusterSettings, + ProjectId projectId ) { Settings settings = Settings.builder() .put("node.name", "test") @@ -151,12 +163,14 @@ public class ClusterServiceUtils { new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP) ); clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService()); - ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId())) .putCompatibilityVersions(localNode.getId(), CompatibilityVersionsUtils.staticCurrent()) - .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) - .build(); - clusterService.getClusterApplierService().setInitialState(initialClusterState); + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK); + if (projectId != null) { + builder.putProjectMetadata(ProjectMetadata.builder(projectId)); + } + clusterService.getClusterApplierService().setInitialState(builder.build()); clusterService.getMasterService().setClusterStatePublisher(createClusterStatePublisher(clusterService.getClusterApplierService())); clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state); clusterService.start(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index ea78252197e3..f9d768fa3426 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -15,8 +15,8 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; @@ -134,6 +134,7 @@ public abstract class AbstractLookupService readRequest + CheckedBiFunction readRequest, + ProjectResolver projectResolver ) { this.actionName = actionName; this.clusterService = clusterService; @@ -167,6 +169,7 @@ public abstract class AbstractLookupService> outListener) { ClusterState clusterState = clusterService.state(); + var projectState = projectResolver.getProjectState(clusterState); List shardIterators = clusterService.operationRouting() - .searchShards(clusterState.projectState(), new String[] { request.index }, Map.of(), "_local"); + .searchShards(projectState, new String[] { request.index }, Map.of(), "_local"); if (shardIterators.size() != 1) { outListener.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", request.index)); return; @@ -278,12 +282,11 @@ public abstract class AbstractLookupService releasables = new ArrayList<>(6); boolean started = false; try { - - ProjectMetadata projMeta = clusterService.state().metadata().getProject(); + var projectState = projectResolver.getProjectState(clusterService.state()); AliasFilter aliasFilter = indicesService.buildAliasFilter( - clusterService.state().projectState(), + projectState, request.shardId.getIndex().getName(), - indexNameExpressionResolver.resolveExpressions(projMeta, request.indexPattern) + indexNameExpressionResolver.resolveExpressions(projectState.metadata(), request.indexPattern) ); LookupShardContext shardContext = lookupShardContextFactory.create(request.shardId); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java index 00cdb7b753c6..663698d77c6d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupService.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -75,7 +76,8 @@ public class EnrichLookupService extends AbstractLookupService availablePolicies() { - final EnrichMetadata metadata = clusterService.state().metadata().getProject().custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY); + final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state()) + .custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY); return metadata.getPolicies(); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java index 72859b621087..972a952a7b1f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexService.java @@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.TransportVersions; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; @@ -55,7 +56,8 @@ public class LookupFromIndexService extends AbstractLookupService( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java index b09195319c53..241be34c96a5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichPolicyResolverTests.java @@ -22,8 +22,10 @@ import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.VersionInformation; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -95,9 +97,10 @@ public class EnrichPolicyResolverTests extends ESTestCase { } AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_a").getLocalNode()); AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_b").getLocalNode()); - localCluster = newEnrichPolicyResolver(LOCAL_CLUSTER_GROUP_KEY); - clusterA = newEnrichPolicyResolver("cluster_a"); - clusterB = newEnrichPolicyResolver("cluster_b"); + final var projectId = randomProjectIdOrDefault(); + localCluster = newEnrichPolicyResolver(projectId, LOCAL_CLUSTER_GROUP_KEY); + clusterA = newEnrichPolicyResolver(projectId, "cluster_a"); + clusterB = newEnrichPolicyResolver(projectId, "cluster_b"); // hosts policies are the same across clusters var hostsPolicy = new EnrichPolicy("match", null, List.of(), "ip", List.of("region", "cost")); @@ -401,8 +404,8 @@ public class EnrichPolicyResolverTests extends ESTestCase { } } - TestEnrichPolicyResolver newEnrichPolicyResolver(String cluster) { - return new TestEnrichPolicyResolver(cluster, new HashMap<>(), new HashMap<>(), new HashMap<>()); + TestEnrichPolicyResolver newEnrichPolicyResolver(ProjectId projectId, String cluster) { + return new TestEnrichPolicyResolver(projectId, cluster, new HashMap<>(), new HashMap<>(), new HashMap<>()); } class TestEnrichPolicyResolver extends EnrichPolicyResolver { @@ -412,15 +415,17 @@ public class EnrichPolicyResolverTests extends ESTestCase { final Map> mappings; TestEnrichPolicyResolver( + ProjectId projectId, String cluster, Map policies, Map aliases, Map> mappings ) { super( - mockClusterService(policies), + mockClusterService(projectId, policies), transports.get(cluster), - new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings)) + new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings)), + TestProjectResolvers.singleProject(projectId) ); this.policies = policies; this.cluster = cluster; @@ -457,11 +462,11 @@ public class EnrichPolicyResolverTests extends ESTestCase { listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode())); } - static ClusterService mockClusterService(Map policies) { + static ClusterService mockClusterService(ProjectId projectId, Map policies) { ClusterService clusterService = mock(ClusterService.class); EnrichMetadata enrichMetadata = new EnrichMetadata(policies); ClusterState state = ClusterState.builder(new ClusterName("test")) - .metadata(Metadata.builder().projectCustoms(Map.of(EnrichMetadata.TYPE, enrichMetadata))) + .putProjectMetadata(ProjectMetadata.builder(projectId).customs(Map.of(EnrichMetadata.TYPE, enrichMetadata))) .build(); when(clusterService.state()).thenReturn(state); return clusterService; diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java index dabcc6cbce89..a4a6ee28c84b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/LookupFromIndexOperatorTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -183,7 +184,8 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase { IndicesService indicesService = mock(IndicesService.class); IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(); releasables.add(clusterService::stop); - ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state("idx", 1, 1)); + final var projectId = randomProjectIdOrDefault(); + ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state(projectId, "idx", 1, 1)); if (beCranky) { logger.info("building a cranky lookup"); } @@ -197,7 +199,8 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase { transportService(clusterService), indexNameExpressionResolver, bigArrays, - blockFactory + blockFactory, + TestProjectResolvers.singleProject(projectId) ); } diff --git a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle index 3776645f25a8..93cf42b9119a 100644 --- a/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle +++ b/x-pack/qa/multi-project/xpack-rest-tests-with-multiple-projects/build.gradle @@ -34,17 +34,8 @@ tasks.named("yamlRestTest").configure { '^data_streams/10_data_stream_resolvability/*', '^deprecation/10_basic/*', '^dlm/10_usage/*', - '^esql/60_enrich/*', '^esql/60_usage/*', - '^esql/61_enrich_ip/*', - '^esql/62_extra_enrich/*', - '^esql/63_enrich_int_range/*', - '^esql/64_enrich_int_match/*', '^esql/180_match_operator/*', - '^esql/190_lookup_join/*', - '^esql/191_lookup_join_on_datastreams/*', - '^esql/191_lookup_join_text/*', - '^esql/192_lookup_join_on_aliases/*', '^health/10_usage/*', '^ilm/60_operation_mode/*', '^ilm/80_health/*',