From 2bc62848e8b7768bbbd01bfdc707a1e9c18ff511 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 25 Jun 2025 11:48:15 -0700 Subject: [PATCH] Avoid dropping aggregate groupings in local plans (#129370) The local plan optimizer should not change the layout, as it has already been agreed upon. However, CombineProjections can violate this when some grouping elements refer to the same attribute. This occurs when ReplaceFieldWithConstantOrNull replaces missing fields with the same reference for a given data type. Closes #128054 Closes #129811 --- docs/changelog/129370.yaml | 7 + .../xpack/esql/action/EsqlActionIT.java | 35 +++++ .../optimizer/LocalLogicalPlanOptimizer.java | 7 +- .../esql/optimizer/LogicalPlanOptimizer.java | 10 +- .../rules/logical/CombineProjections.java | 141 +++++++++++------- .../LocalLogicalPlanOptimizerTests.java | 30 ++++ 6 files changed, 170 insertions(+), 60 deletions(-) create mode 100644 docs/changelog/129370.yaml diff --git a/docs/changelog/129370.yaml b/docs/changelog/129370.yaml new file mode 100644 index 000000000000..73d1c25f4b34 --- /dev/null +++ b/docs/changelog/129370.yaml @@ -0,0 +1,7 @@ +pr: 129370 +summary: Avoid dropping aggregate groupings in local plans +area: ES|QL +type: bug +issues: + - 129811 + - 128054 diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java index c1a1e04d6385..488c90f77d3e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionIT.java @@ -1679,6 +1679,39 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase { } } + public void testGroupingStatsOnMissingFields() { + assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long")); + long oneValue = between(1, 1000); + indexDoc("missing_field_index", "1", "data", oneValue); + refresh("missing_field_index"); + QueryPragmas pragmas = randomPragmas(); + pragmas = new QueryPragmas( + Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build() + ); + EsqlQueryRequest request = new EsqlQueryRequest(); + request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color"); + request.pragmas(pragmas); + try (var r = run(request)) { + var rows = getValuesList(r); + assertThat(rows, hasSize(4)); + for (List row : rows) { + assertThat(row, hasSize(3)); + } + assertThat(rows.get(0).get(0), equalTo(20L)); + assertThat(rows.get(0).get(1), equalTo("blue")); + assertNull(rows.get(0).get(2)); + assertThat(rows.get(1).get(0), equalTo(10L)); + assertThat(rows.get(1).get(1), equalTo("green")); + assertNull(rows.get(1).get(2)); + assertThat(rows.get(2).get(0), equalTo(30L)); + assertThat(rows.get(2).get(1), equalTo("red")); + assertNull(rows.get(2).get(2)); + assertThat(rows.get(3).get(0), equalTo(oneValue)); + assertNull(rows.get(3).get(1)); + assertNull(rows.get(3).get(2)); + } + } + private void assertEmptyIndexQueries(String from) { try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) { assertFalse(resp.values().hasNext()); @@ -1816,6 +1849,8 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase { "time", "type=long", "color", + "type=keyword", + "tag", "type=keyword" ) ); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java index 5f675adbcd50..dab2b35025aa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizer.java @@ -30,7 +30,8 @@ import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.operat * This class is part of the planner. Data node level logical optimizations. At this point we have access to * {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index. * - *

NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} + *

NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)} + * and {@link LogicalPlanOptimizer#cleanup()} */ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor { @@ -58,8 +59,8 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor localOperators() { - var operators = operators(); - var rules = operators().rules(); + var operators = operators(true); + var rules = operators.rules(); List> newRules = new ArrayList<>(rules.length); // apply updates to existing rules that have different applicability locally diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java index f64f4fe38ac0..14a858f85fd2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/LogicalPlanOptimizer.java @@ -82,7 +82,7 @@ import java.util.List; *

  • The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example, * a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase * also applies surrogates, such as replacing an average with a sum divided by a count.
  • - *
  • {@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different + *
  • {@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different * things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse * time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever * possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off @@ -90,14 +90,14 @@ import java.util.List; *
  • {@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN
  • * * - *

    Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the + *

    Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the * {@link LocalLogicalPlanOptimizer} layer.

    */ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor { private static final List> RULES = List.of( substitutions(), - operators(), + operators(false), new Batch<>("Skip Compute", new SkipQueryOnLimitZero()), cleanup(), new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized()) @@ -160,10 +160,10 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor operators() { + protected static Batch operators(boolean local) { return new Batch<>( "Operator Optimization", - new CombineProjections(), + new CombineProjections(local), new CombineEvals(), new PruneEmptyPlans(), new PropagateEmptyRelation(), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java index e9ca958c5e97..0a3f0cca0d9e 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/CombineProjections.java @@ -18,18 +18,24 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute; import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; +import org.elasticsearch.xpack.esql.plan.logical.Eval; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import org.elasticsearch.xpack.esql.plan.logical.Project; import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Set; public final class CombineProjections extends OptimizerRules.OptimizerRule { + // don't drop groupings from a local plan, as the layout has already been agreed upon + private final boolean local; - public CombineProjections() { + public CombineProjections(boolean local) { super(OptimizerRules.TransformDirection.UP); + this.local = local; } @Override @@ -60,27 +66,91 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule groupingAttrs = new ArrayList<>(a.groupings().size()); - for (Expression grouping : groupings) { - if (grouping instanceof Attribute attribute) { - groupingAttrs.add(attribute); - } else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) { - groupingAttrs.add(as); + if (plan instanceof Aggregate a && child instanceof Project p) { + var groupings = a.groupings(); + + // sanity checks + for (Expression grouping : groupings) { + if ((grouping instanceof Attribute + || grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) == false) { + // After applying ReplaceAggregateNestedExpressionWithEval, + // evaluatable groupings can only contain attributes. + throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping); + } + } + assert groupings.size() <= 1 + || groupings.stream() + .anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false + : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; + + // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) + AttributeMap.Builder aliasesBuilder = AttributeMap.builder(); + for (NamedExpression ne : p.projections()) { + // Record the aliases. + // Projections are just aliases for attributes, so casting is safe. + aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); + } + var aliases = aliasesBuilder.build(); + + // Propagate any renames from the lower projection into the upper groupings. + List resolvedGroupings = new ArrayList<>(); + for (Expression grouping : groupings) { + Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as)); + resolvedGroupings.add(transformed); + } + + // This can lead to duplicates in the groupings: e.g. + // | EVAL x = y | STATS ... BY x, y + if (local) { + // On the data node, the groupings must be preserved because they affect the physical output (see + // AbstractPhysicalOperationProviders#intermediateAttributes). + // In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place + // of the original projection to create new attributes for the duplicate groups. + Set seenResolvedGroupings = new HashSet<>(resolvedGroupings.size()); + List newGroupings = new ArrayList<>(); + List aliasesAgainstDuplication = new ArrayList<>(); + + for (int i = 0; i < groupings.size(); i++) { + Expression resolvedGrouping = resolvedGroupings.get(i); + if (seenResolvedGroupings.add(resolvedGrouping)) { + newGroupings.add(resolvedGrouping); } else { - // After applying ReplaceAggregateNestedExpressionWithEval, - // evaluatable groupings can only contain attributes. - throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping); + // resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to. + // should really only be 1 attribute, anyway, but going via .references() includes the case of a + // GroupingFunction.NonEvaluatableGroupingFunction. + Attribute coreAttribute = resolvedGrouping.references().iterator().next(); + + Alias renameAgainstDuplication = new Alias( + coreAttribute.source(), + TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name()), + coreAttribute + ); + aliasesAgainstDuplication.add(renameAgainstDuplication); + + // propagate the new alias into the new grouping + AttributeMap.Builder resolverBuilder = AttributeMap.builder(); + resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute()); + AttributeMap resolver = resolverBuilder.build(); + + newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr))); } } - plan = a.with( - p.child(), - combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()), - combineProjections(a.aggregates(), p.projections()) - ); + + LogicalPlan newChild = aliasesAgainstDuplication.isEmpty() + ? p.child() + : new Eval(p.source(), p.child(), aliasesAgainstDuplication); + plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections())); + } else { + // On the coordinator, we can just discard the duplicates. + // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which + // will be an alias like `c = CATEGORIZE(attribute)`. + // Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on + // regular equality (i.e. based on names) instead of name ids. + // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. + // for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also + // applies in the local case below. + List newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings)); + plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections())); } } @@ -143,39 +213,6 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule combineUpperGroupingsAndLowerProjections( - List upperGroupings, - List lowerProjections - ) { - assert upperGroupings.size() <= 1 - || upperGroupings.stream() - .anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false - : "CombineProjections only tested with a single CATEGORIZE with no additional groups"; - // Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..) - AttributeMap.Builder aliasesBuilder = AttributeMap.builder(); - for (NamedExpression ne : lowerProjections) { - // Record the aliases. - // Projections are just aliases for attributes, so casting is safe. - aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne)); - } - var aliases = aliasesBuilder.build(); - - // Propagate any renames from the lower projection into the upper groupings. - // This can lead to duplicates: e.g. - // | EVAL x = y | STATS ... BY x, y - // All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be - // an alias like `c = CATEGORIZE(attribute)`. - // Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet). - // TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for - // `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. - LinkedHashSet resolvedGroupings = new LinkedHashSet<>(); - for (NamedExpression ne : upperGroupings) { - NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a)); - resolvedGroupings.add(transformed); - } - return new ArrayList<>(resolvedGroupings); - } - /** * Replace grouping alias previously contained in the aggregations that might have been projected away. */ diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java index 8251611dcfe4..f2b70c99253b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalLogicalPlanOptimizerTests.java @@ -744,6 +744,36 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase { assertEquals("integer_long_field", unionTypeField.fieldName().string()); } + /** + * \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam + * e{r}#7, first_name{r}#7 AS last_name#10]] + * \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]] + * \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..] + */ + public void testGroupingByMissingFields() { + var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name"); + var testStats = statsForMissingField("first_name", "last_name"); + var localPlan = localPlan(plan, testStats); + Limit limit = as(localPlan, Limit.class); + Aggregate aggregate = as(limit.child(), Aggregate.class); + assertThat(aggregate.groupings(), hasSize(2)); + ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class); + ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class); + Eval eval = as(aggregate.child(), Eval.class); + assertThat(eval.fields(), hasSize(2)); + Alias eval1 = eval.fields().get(0); + Literal literal1 = as(eval1.child(), Literal.class); + assertNull(literal1.value()); + assertThat(literal1.dataType(), is(DataType.KEYWORD)); + Alias eval2 = eval.fields().get(1); + Literal literal2 = as(eval2.child(), Literal.class); + assertNull(literal2.value()); + assertThat(literal2.dataType(), is(DataType.KEYWORD)); + assertThat(grouping1.id(), equalTo(eval1.id())); + assertThat(grouping2.id(), equalTo(eval2.id())); + as(eval.child(), EsRelation.class); + } + private IsNotNull isNotNull(Expression field) { return new IsNotNull(EMPTY, field); }