Make ESQL join operators project-aware

This commit is contained in:
Niels Bauman 2025-06-25 11:25:17 -03:00
parent bc515c4070
commit 24329bbe64
No known key found for this signature in database
GPG key ID: 1E23BD8DDAC3C49C
9 changed files with 79 additions and 41 deletions

View file

@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.ClusterStatePublisher;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -138,6 +140,16 @@ public class ClusterServiceUtils {
DiscoveryNode localNode,
Settings providedSettings,
ClusterSettings clusterSettings
) {
return createClusterService(threadPool, localNode, providedSettings, clusterSettings, null);
}
public static ClusterService createClusterService(
ThreadPool threadPool,
DiscoveryNode localNode,
Settings providedSettings,
ClusterSettings clusterSettings,
ProjectId projectId
) {
Settings settings = Settings.builder()
.put("node.name", "test")
@ -151,12 +163,14 @@ public class ClusterServiceUtils {
new TaskManager(settings, threadPool, Collections.emptySet(), Tracer.NOOP)
);
clusterService.setNodeConnectionsService(createNoOpNodeConnectionsService());
ClusterState initialClusterState = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
ClusterState.Builder builder = ClusterState.builder(new ClusterName(ClusterServiceUtils.class.getSimpleName()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.putCompatibilityVersions(localNode.getId(), CompatibilityVersionsUtils.staticCurrent())
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
clusterService.getClusterApplierService().setInitialState(initialClusterState);
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK);
if (projectId != null) {
builder.putProjectMetadata(ProjectMetadata.builder(projectId));
}
clusterService.getClusterApplierService().setInitialState(builder.build());
clusterService.getMasterService().setClusterStatePublisher(createClusterStatePublisher(clusterService.getClusterApplierService()));
clusterService.getMasterService().setClusterStateSupplier(clusterService.getClusterApplierService()::state);
clusterService.start();

View file

@ -15,8 +15,8 @@ import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
@ -134,6 +134,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
private final BigArrays bigArrays;
private final BlockFactory blockFactory;
private final LocalCircuitBreaker.SizeSettings localBreakerSettings;
private final ProjectResolver projectResolver;
/**
* Should output {@link Page pages} be combined into a single resulting page?
* If this is {@code true} we'll run a {@link MergePositionsOperator} to merge
@ -154,7 +155,8 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
BigArrays bigArrays,
BlockFactory blockFactory,
boolean mergePages,
CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest
CheckedBiFunction<StreamInput, BlockFactory, T, IOException> readRequest,
ProjectResolver projectResolver
) {
this.actionName = actionName;
this.clusterService = clusterService;
@ -167,6 +169,7 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
this.blockFactory = blockFactory;
this.localBreakerSettings = new LocalCircuitBreaker.SizeSettings(clusterService.getSettings());
this.mergePages = mergePages;
this.projectResolver = projectResolver;
transportService.registerRequestHandler(
actionName,
transportService.getThreadPool().executor(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME),
@ -227,8 +230,9 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
*/
public final void lookupAsync(R request, CancellableTask parentTask, ActionListener<List<Page>> outListener) {
ClusterState clusterState = clusterService.state();
var projectState = projectResolver.getProjectState(clusterState);
List<ShardIterator> shardIterators = clusterService.operationRouting()
.searchShards(clusterState.projectState(), new String[] { request.index }, Map.of(), "_local");
.searchShards(projectState, new String[] { request.index }, Map.of(), "_local");
if (shardIterators.size() != 1) {
outListener.onFailure(new EsqlIllegalArgumentException("target index {} has more than one shard", request.index));
return;
@ -278,12 +282,11 @@ public abstract class AbstractLookupService<R extends AbstractLookupService.Requ
final List<Releasable> releasables = new ArrayList<>(6);
boolean started = false;
try {
ProjectMetadata projMeta = clusterService.state().metadata().getProject();
var projectState = projectResolver.getProjectState(clusterService.state());
AliasFilter aliasFilter = indicesService.buildAliasFilter(
clusterService.state().projectState(),
projectState,
request.shardId.getIndex().getName(),
indexNameExpressionResolver.resolveExpressions(projMeta, request.indexPattern)
indexNameExpressionResolver.resolveExpressions(projectState.metadata(), request.indexPattern)
);
LookupShardContext shardContext = lookupShardContextFactory.create(request.shardId);

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -75,7 +76,8 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
TransportService transportService,
IndexNameExpressionResolver indexNameExpressionResolver,
BigArrays bigArrays,
BlockFactory blockFactory
BlockFactory blockFactory,
ProjectResolver projectResolver
) {
super(
LOOKUP_ACTION_NAME,
@ -87,7 +89,8 @@ public class EnrichLookupService extends AbstractLookupService<EnrichLookupServi
bigArrays,
blockFactory,
true,
TransportRequest::readFrom
TransportRequest::readFrom,
projectResolver
);
}

View file

@ -13,6 +13,7 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -77,13 +78,20 @@ public class EnrichPolicyResolver {
private final TransportService transportService;
private final ThreadPool threadPool;
private final RemoteClusterService remoteClusterService;
private final ProjectResolver projectResolver;
public EnrichPolicyResolver(ClusterService clusterService, TransportService transportService, IndexResolver indexResolver) {
public EnrichPolicyResolver(
ClusterService clusterService,
TransportService transportService,
IndexResolver indexResolver,
ProjectResolver projectResolver
) {
this.clusterService = clusterService;
this.transportService = transportService;
this.indexResolver = indexResolver;
this.threadPool = transportService.getThreadPool();
this.remoteClusterService = transportService.getRemoteClusterService();
this.projectResolver = projectResolver;
transportService.registerRequestHandler(
RESOLVE_ACTION_NAME,
threadPool.executor(ThreadPool.Names.SEARCH),
@ -445,7 +453,8 @@ public class EnrichPolicyResolver {
}
protected Map<String, EnrichPolicy> availablePolicies() {
final EnrichMetadata metadata = clusterService.state().metadata().getProject().custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
final EnrichMetadata metadata = projectResolver.getProjectMetadata(clusterService.state())
.custom(EnrichMetadata.TYPE, EnrichMetadata.EMPTY);
return metadata.getPolicies();
}

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.enrich;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
@ -55,7 +56,8 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
TransportService transportService,
IndexNameExpressionResolver indexNameExpressionResolver,
BigArrays bigArrays,
BlockFactory blockFactory
BlockFactory blockFactory,
ProjectResolver projectResolver
) {
super(
LOOKUP_ACTION_NAME,
@ -67,7 +69,8 @@ public class LookupFromIndexService extends AbstractLookupService<LookupFromInde
bigArrays,
blockFactory,
false,
TransportRequest::readFrom
TransportRequest::readFrom,
projectResolver
);
}

View file

@ -113,7 +113,12 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
this.requestExecutor = threadPool.executor(ThreadPool.Names.SEARCH);
exchangeService.registerTransportHandler(transportService);
this.exchangeService = exchangeService;
this.enrichPolicyResolver = new EnrichPolicyResolver(clusterService, transportService, planExecutor.indexResolver());
this.enrichPolicyResolver = new EnrichPolicyResolver(
clusterService,
transportService,
planExecutor.indexResolver(),
projectResolver
);
AbstractLookupService.LookupShardContextFactory lookupLookupShardContextFactory = AbstractLookupService.LookupShardContextFactory
.fromSearchService(searchService);
this.enrichLookupService = new EnrichLookupService(
@ -123,7 +128,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
transportService,
indexNameExpressionResolver,
bigArrays,
blockFactoryProvider.blockFactory()
blockFactoryProvider.blockFactory(),
projectResolver
);
this.lookupFromIndexService = new LookupFromIndexService(
clusterService,
@ -132,7 +138,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
transportService,
indexNameExpressionResolver,
bigArrays,
blockFactoryProvider.blockFactory()
blockFactoryProvider.blockFactory(),
projectResolver
);
this.asyncTaskManagementService = new AsyncTaskManagementService<>(

View file

@ -22,8 +22,10 @@ import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.FilterClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.VersionInformation;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
@ -95,9 +97,10 @@ public class EnrichPolicyResolverTests extends ESTestCase {
}
AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_a").getLocalNode());
AbstractSimpleTransportTestCase.connectToNode(transports.get(""), transports.get("cluster_b").getLocalNode());
localCluster = newEnrichPolicyResolver(LOCAL_CLUSTER_GROUP_KEY);
clusterA = newEnrichPolicyResolver("cluster_a");
clusterB = newEnrichPolicyResolver("cluster_b");
final var projectId = randomProjectIdOrDefault();
localCluster = newEnrichPolicyResolver(projectId, LOCAL_CLUSTER_GROUP_KEY);
clusterA = newEnrichPolicyResolver(projectId, "cluster_a");
clusterB = newEnrichPolicyResolver(projectId, "cluster_b");
// hosts policies are the same across clusters
var hostsPolicy = new EnrichPolicy("match", null, List.of(), "ip", List.of("region", "cost"));
@ -401,8 +404,8 @@ public class EnrichPolicyResolverTests extends ESTestCase {
}
}
TestEnrichPolicyResolver newEnrichPolicyResolver(String cluster) {
return new TestEnrichPolicyResolver(cluster, new HashMap<>(), new HashMap<>(), new HashMap<>());
TestEnrichPolicyResolver newEnrichPolicyResolver(ProjectId projectId, String cluster) {
return new TestEnrichPolicyResolver(projectId, cluster, new HashMap<>(), new HashMap<>(), new HashMap<>());
}
class TestEnrichPolicyResolver extends EnrichPolicyResolver {
@ -412,15 +415,17 @@ public class EnrichPolicyResolverTests extends ESTestCase {
final Map<String, Map<String, String>> mappings;
TestEnrichPolicyResolver(
ProjectId projectId,
String cluster,
Map<String, EnrichPolicy> policies,
Map<String, String> aliases,
Map<String, Map<String, String>> mappings
) {
super(
mockClusterService(policies),
mockClusterService(projectId, policies),
transports.get(cluster),
new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings))
new IndexResolver(new FieldCapsClient(threadPool, aliases, mappings)),
TestProjectResolvers.singleProject(projectId)
);
this.policies = policies;
this.cluster = cluster;
@ -457,11 +462,11 @@ public class EnrichPolicyResolverTests extends ESTestCase {
listener.onResponse(transports.get("").getConnection(transports.get(remoteCluster).getLocalNode()));
}
static ClusterService mockClusterService(Map<String, EnrichPolicy> policies) {
static ClusterService mockClusterService(ProjectId projectId, Map<String, EnrichPolicy> policies) {
ClusterService clusterService = mock(ClusterService.class);
EnrichMetadata enrichMetadata = new EnrichMetadata(policies);
ClusterState state = ClusterState.builder(new ClusterName("test"))
.metadata(Metadata.builder().projectCustoms(Map.of(EnrichMetadata.TYPE, enrichMetadata)))
.putProjectMetadata(ProjectMetadata.builder(projectId).customs(Map.of(EnrichMetadata.TYPE, enrichMetadata)))
.build();
when(clusterService.state()).thenReturn(state);
return clusterService;

View file

@ -18,6 +18,7 @@ import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
@ -183,7 +184,8 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
IndicesService indicesService = mock(IndicesService.class);
IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
releasables.add(clusterService::stop);
ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state("idx", 1, 1));
final var projectId = randomProjectIdOrDefault();
ClusterServiceUtils.setState(clusterService, ClusterStateCreationUtils.state(projectId, "idx", 1, 1));
if (beCranky) {
logger.info("building a cranky lookup");
}
@ -197,7 +199,8 @@ public class LookupFromIndexOperatorTests extends OperatorTestCase {
transportService(clusterService),
indexNameExpressionResolver,
bigArrays,
blockFactory
blockFactory,
TestProjectResolvers.singleProject(projectId)
);
}

View file

@ -34,17 +34,8 @@ tasks.named("yamlRestTest").configure {
'^data_streams/10_data_stream_resolvability/*',
'^deprecation/10_basic/*',
'^dlm/10_usage/*',
'^esql/60_enrich/*',
'^esql/60_usage/*',
'^esql/61_enrich_ip/*',
'^esql/62_extra_enrich/*',
'^esql/63_enrich_int_range/*',
'^esql/64_enrich_int_match/*',
'^esql/180_match_operator/*',
'^esql/190_lookup_join/*',
'^esql/191_lookup_join_on_datastreams/*',
'^esql/191_lookup_join_text/*',
'^esql/192_lookup_join_on_aliases/*',
'^health/10_usage/*',
'^ilm/60_operation_mode/*',
'^ilm/80_health/*',