Avoid dropping aggregate groupings in local plans (#129370)

The local plan optimizer should not change the layout, as it has already 
been agreed upon. However, CombineProjections can violate this when some
grouping elements refer to the same attribute. This occurs when
ReplaceFieldWithConstantOrNull replaces missing fields with the same
reference for a given data type.

Closes #128054
Closes #129811
This commit is contained in:
Nhat Nguyen 2025-06-25 11:48:15 -07:00 committed by GitHub
parent 72b3343ed3
commit 2bc62848e8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 170 additions and 60 deletions

View file

@ -0,0 +1,7 @@
pr: 129370
summary: Avoid dropping aggregate groupings in local plans
area: ES|QL
type: bug
issues:
- 129811
- 128054

View file

@ -1679,6 +1679,39 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
}
}
public void testGroupingStatsOnMissingFields() {
assertAcked(client().admin().indices().prepareCreate("missing_field_index").setMapping("data", "type=long"));
long oneValue = between(1, 1000);
indexDoc("missing_field_index", "1", "data", oneValue);
refresh("missing_field_index");
QueryPragmas pragmas = randomPragmas();
pragmas = new QueryPragmas(
Settings.builder().put(pragmas.getSettings()).put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()
);
EsqlQueryRequest request = new EsqlQueryRequest();
request.query("FROM missing_field_index,test | STATS s = sum(data) BY color, tag | SORT color");
request.pragmas(pragmas);
try (var r = run(request)) {
var rows = getValuesList(r);
assertThat(rows, hasSize(4));
for (List<Object> row : rows) {
assertThat(row, hasSize(3));
}
assertThat(rows.get(0).get(0), equalTo(20L));
assertThat(rows.get(0).get(1), equalTo("blue"));
assertNull(rows.get(0).get(2));
assertThat(rows.get(1).get(0), equalTo(10L));
assertThat(rows.get(1).get(1), equalTo("green"));
assertNull(rows.get(1).get(2));
assertThat(rows.get(2).get(0), equalTo(30L));
assertThat(rows.get(2).get(1), equalTo("red"));
assertNull(rows.get(2).get(2));
assertThat(rows.get(3).get(0), equalTo(oneValue));
assertNull(rows.get(3).get(1));
assertNull(rows.get(3).get(2));
}
}
private void assertEmptyIndexQueries(String from) {
try (EsqlQueryResponse resp = run(from + "METADATA _source | KEEP _source | LIMIT 1")) {
assertFalse(resp.values().hasNext());
@ -1816,6 +1849,8 @@ public class EsqlActionIT extends AbstractEsqlIntegTestCase {
"time",
"type=long",
"color",
"type=keyword",
"tag",
"type=keyword"
)
);

View file

@ -30,7 +30,8 @@ import static org.elasticsearch.xpack.esql.optimizer.LogicalPlanOptimizer.operat
* This class is part of the planner. Data node level logical optimizations. At this point we have access to
* {@link org.elasticsearch.xpack.esql.stats.SearchStats} which provides access to metadata about the index.
*
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()}
* <p>NB: This class also reapplies all the rules from {@link LogicalPlanOptimizer#operators(boolean)}
* and {@link LogicalPlanOptimizer#cleanup()}
*/
public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LocalLogicalOptimizerContext> {
@ -58,8 +59,8 @@ public class LocalLogicalPlanOptimizer extends ParameterizedRuleExecutor<Logical
@SuppressWarnings("unchecked")
private static Batch<LogicalPlan> localOperators() {
var operators = operators();
var rules = operators().rules();
var operators = operators(true);
var rules = operators.rules();
List<Rule<?, LogicalPlan>> newRules = new ArrayList<>(rules.length);
// apply updates to existing rules that have different applicability locally

View file

@ -82,7 +82,7 @@ import java.util.List;
* <li>The {@link LogicalPlanOptimizer#substitutions()} phase rewrites things to expand out shorthand in the syntax. For example,
* a nested expression embedded in a stats gets replaced with an eval followed by a stats, followed by another eval. This phase
* also applies surrogates, such as replacing an average with a sum divided by a count.</li>
* <li>{@link LogicalPlanOptimizer#operators()} (NB: The word "operator" is extremely overloaded and referrers to many different
* <li>{@link LogicalPlanOptimizer#operators(boolean)} (NB: The word "operator" is extremely overloaded and referrers to many different
* things.) transform the tree in various different ways. This includes folding (i.e. computing constant expressions at parse
* time), combining expressions, dropping redundant clauses, and some normalization such as putting literals on the right whenever
* possible. These rules are run in a loop until none of the rules make any changes to the plan (there is also a safety shut off
@ -90,14 +90,14 @@ import java.util.List;
* <li>{@link LogicalPlanOptimizer#cleanup()} Which can replace sorts+limit with a TopN</li>
* </ul>
*
* <p>Note that the {@link LogicalPlanOptimizer#operators()} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
* <p>Note that the {@link LogicalPlanOptimizer#operators(boolean)} and {@link LogicalPlanOptimizer#cleanup()} steps are reapplied at the
* {@link LocalLogicalPlanOptimizer} layer.</p>
*/
public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan, LogicalOptimizerContext> {
private static final List<RuleExecutor.Batch<LogicalPlan>> RULES = List.of(
substitutions(),
operators(),
operators(false),
new Batch<>("Skip Compute", new SkipQueryOnLimitZero()),
cleanup(),
new Batch<>("Set as Optimized", Limiter.ONCE, new SetAsOptimized())
@ -160,10 +160,10 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
);
}
protected static Batch<LogicalPlan> operators() {
protected static Batch<LogicalPlan> operators(boolean local) {
return new Batch<>(
"Operator Optimization",
new CombineProjections(),
new CombineProjections(local),
new CombineEvals(),
new PruneEmptyPlans(),
new PropagateEmptyRelation(),

View file

@ -18,18 +18,24 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.expression.function.grouping.GroupingFunction;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
public final class CombineProjections extends OptimizerRules.OptimizerRule<UnaryPlan> {
// don't drop groupings from a local plan, as the layout has already been agreed upon
private final boolean local;
public CombineProjections() {
public CombineProjections(boolean local) {
super(OptimizerRules.TransformDirection.UP);
this.local = local;
}
@Override
@ -60,27 +66,91 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule<Unary
return plan;
}
// Agg with underlying Project (group by on sub-queries)
if (plan instanceof Aggregate a) {
if (child instanceof Project p) {
var groupings = a.groupings();
List<NamedExpression> groupingAttrs = new ArrayList<>(a.groupings().size());
for (Expression grouping : groupings) {
if (grouping instanceof Attribute attribute) {
groupingAttrs.add(attribute);
} else if (grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) {
groupingAttrs.add(as);
if (plan instanceof Aggregate a && child instanceof Project p) {
var groupings = a.groupings();
// sanity checks
for (Expression grouping : groupings) {
if ((grouping instanceof Attribute
|| grouping instanceof Alias as && as.child() instanceof GroupingFunction.NonEvaluatableGroupingFunction) == false) {
// After applying ReplaceAggregateNestedExpressionWithEval,
// evaluatable groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an attribute or grouping function, got {}", grouping);
}
}
assert groupings.size() <= 1
|| groupings.stream()
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
for (NamedExpression ne : p.projections()) {
// Record the aliases.
// Projections are just aliases for attributes, so casting is safe.
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
}
var aliases = aliasesBuilder.build();
// Propagate any renames from the lower projection into the upper groupings.
List<Expression> resolvedGroupings = new ArrayList<>();
for (Expression grouping : groupings) {
Expression transformed = grouping.transformUp(Attribute.class, as -> aliases.resolve(as, as));
resolvedGroupings.add(transformed);
}
// This can lead to duplicates in the groupings: e.g.
// | EVAL x = y | STATS ... BY x, y
if (local) {
// On the data node, the groupings must be preserved because they affect the physical output (see
// AbstractPhysicalOperationProviders#intermediateAttributes).
// In case that propagating the lower projection leads to duplicates in the resolved groupings, we'll leave an Eval in place
// of the original projection to create new attributes for the duplicate groups.
Set<Expression> seenResolvedGroupings = new HashSet<>(resolvedGroupings.size());
List<Expression> newGroupings = new ArrayList<>();
List<Alias> aliasesAgainstDuplication = new ArrayList<>();
for (int i = 0; i < groupings.size(); i++) {
Expression resolvedGrouping = resolvedGroupings.get(i);
if (seenResolvedGroupings.add(resolvedGrouping)) {
newGroupings.add(resolvedGrouping);
} else {
// After applying ReplaceAggregateNestedExpressionWithEval,
// evaluatable groupings can only contain attributes.
throw new EsqlIllegalArgumentException("Expected an Attribute, got {}", grouping);
// resolving the renames leads to a duplicate here - we need to alias the underlying attribute this refers to.
// should really only be 1 attribute, anyway, but going via .references() includes the case of a
// GroupingFunction.NonEvaluatableGroupingFunction.
Attribute coreAttribute = resolvedGrouping.references().iterator().next();
Alias renameAgainstDuplication = new Alias(
coreAttribute.source(),
TemporaryNameUtils.locallyUniqueTemporaryName(coreAttribute.name()),
coreAttribute
);
aliasesAgainstDuplication.add(renameAgainstDuplication);
// propagate the new alias into the new grouping
AttributeMap.Builder<Attribute> resolverBuilder = AttributeMap.builder();
resolverBuilder.put(coreAttribute, renameAgainstDuplication.toAttribute());
AttributeMap<Attribute> resolver = resolverBuilder.build();
newGroupings.add(resolvedGrouping.transformUp(Attribute.class, attr -> resolver.resolve(attr, attr)));
}
}
plan = a.with(
p.child(),
combineUpperGroupingsAndLowerProjections(groupingAttrs, p.projections()),
combineProjections(a.aggregates(), p.projections())
);
LogicalPlan newChild = aliasesAgainstDuplication.isEmpty()
? p.child()
: new Eval(p.source(), p.child(), aliasesAgainstDuplication);
plan = a.with(newChild, newGroupings, combineProjections(a.aggregates(), p.projections()));
} else {
// On the coordinator, we can just discard the duplicates.
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which
// will be an alias like `c = CATEGORIZE(attribute)`.
// Due to such aliases, we can't use an AttributeSet to deduplicate. But we can use a regular set to deduplicate based on
// regular equality (i.e. based on names) instead of name ids.
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g.
// for `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead. Also
// applies in the local case below.
List<Expression> newGroupings = new ArrayList<>(new LinkedHashSet<>(resolvedGroupings));
plan = a.with(p.child(), newGroupings, combineProjections(a.aggregates(), p.projections()));
}
}
@ -143,39 +213,6 @@ public final class CombineProjections extends OptimizerRules.OptimizerRule<Unary
return replaced;
}
private static List<Expression> combineUpperGroupingsAndLowerProjections(
List<? extends NamedExpression> upperGroupings,
List<? extends NamedExpression> lowerProjections
) {
assert upperGroupings.size() <= 1
|| upperGroupings.stream()
.anyMatch(group -> group.anyMatch(expr -> expr instanceof GroupingFunction.NonEvaluatableGroupingFunction)) == false
: "CombineProjections only tested with a single CATEGORIZE with no additional groups";
// Collect the alias map for resolving the source (f1 = 1, f2 = f1, etc..)
AttributeMap.Builder<Attribute> aliasesBuilder = AttributeMap.builder();
for (NamedExpression ne : lowerProjections) {
// Record the aliases.
// Projections are just aliases for attributes, so casting is safe.
aliasesBuilder.put(ne.toAttribute(), (Attribute) Alias.unwrap(ne));
}
var aliases = aliasesBuilder.build();
// Propagate any renames from the lower projection into the upper groupings.
// This can lead to duplicates: e.g.
// | EVAL x = y | STATS ... BY x, y
// All substitutions happen before; groupings must be attributes at this point except for non-evaluatable groupings which will be
// an alias like `c = CATEGORIZE(attribute)`.
// Therefore, it is correct to deduplicate based on simple equality (based on names) instead of name ids (Set vs. AttributeSet).
// TODO: The deduplication based on simple equality will be insufficient in case of multiple non-evaluatable groupings, e.g. for
// `| EVAL x = y | STATS ... BY CATEGORIZE(x), CATEGORIZE(y)`. That will require semantic equality instead.
LinkedHashSet<NamedExpression> resolvedGroupings = new LinkedHashSet<>();
for (NamedExpression ne : upperGroupings) {
NamedExpression transformed = (NamedExpression) ne.transformUp(Attribute.class, a -> aliases.resolve(a, a));
resolvedGroupings.add(transformed);
}
return new ArrayList<>(resolvedGroupings);
}
/**
* Replace grouping alias previously contained in the aggregations that might have been projected away.
*/

View file

@ -744,6 +744,36 @@ public class LocalLogicalPlanOptimizerTests extends ESTestCase {
assertEquals("integer_long_field", unionTypeField.fieldName().string());
}
/**
* \_Aggregate[[first_name{r}#7, $$first_name$temp_name$17{r}#18],[SUM(salary{f}#11,true[BOOLEAN]) AS SUM(salary)#5, first_nam
* e{r}#7, first_name{r}#7 AS last_name#10]]
* \_Eval[[null[KEYWORD] AS first_name#7, null[KEYWORD] AS $$first_name$temp_name$17#18]]
* \_EsRelation[test][_meta_field{f}#12, emp_no{f}#6, first_name{f}#7, ge..]
*/
public void testGroupingByMissingFields() {
var plan = plan("FROM test | STATS SUM(salary) BY first_name, last_name");
var testStats = statsForMissingField("first_name", "last_name");
var localPlan = localPlan(plan, testStats);
Limit limit = as(localPlan, Limit.class);
Aggregate aggregate = as(limit.child(), Aggregate.class);
assertThat(aggregate.groupings(), hasSize(2));
ReferenceAttribute grouping1 = as(aggregate.groupings().get(0), ReferenceAttribute.class);
ReferenceAttribute grouping2 = as(aggregate.groupings().get(1), ReferenceAttribute.class);
Eval eval = as(aggregate.child(), Eval.class);
assertThat(eval.fields(), hasSize(2));
Alias eval1 = eval.fields().get(0);
Literal literal1 = as(eval1.child(), Literal.class);
assertNull(literal1.value());
assertThat(literal1.dataType(), is(DataType.KEYWORD));
Alias eval2 = eval.fields().get(1);
Literal literal2 = as(eval2.child(), Literal.class);
assertNull(literal2.value());
assertThat(literal2.dataType(), is(DataType.KEYWORD));
assertThat(grouping1.id(), equalTo(eval1.id()));
assertThat(grouping2.id(), equalTo(eval2.id()));
as(eval.child(), EsRelation.class);
}
private IsNotNull isNotNull(Expression field) {
return new IsNotNull(EMPTY, field);
}