ESQL: drop RowExec (#117133)

Drop `RowExec` physical node: `Row` is now optimised away into a
`LocalRelation`, which has its own physical mapping. `Row` is kept
around as a container for the logical optimisations/folding of the
expressions supported by the `ROW` command (which makes it in fact a
source _plus_ `EVAL`), `LocalRelation` only being a container for the
schema and end results (it doesn't actually go through transformations).

Fixes #104960
This commit is contained in:
Bogdan Pintea 2024-11-22 14:48:47 +01:00 committed by GitHub
parent f9223531ac
commit 09a53388cc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 38 additions and 293 deletions

View file

@ -1,47 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.operator;
import org.elasticsearch.compute.data.BlockFactory;
import java.util.List;
import java.util.Objects;
import static java.util.stream.Collectors.joining;
public class RowOperator extends LocalSourceOperator {
private final List<Object> objects;
public record RowOperatorFactory(List<Object> objects) implements SourceOperatorFactory {
@Override
public SourceOperator get(DriverContext driverContext) {
return new RowOperator(driverContext.blockFactory(), objects);
}
@Override
public String describe() {
return "RowOperator[objects = " + objects.stream().map(Objects::toString).collect(joining(",")) + "]";
}
}
public RowOperator(BlockFactory blockFactory, List<Object> objects) {
super(blockFactory, () -> objects);
this.objects = objects;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClass().getSimpleName()).append("[");
sb.append("objects=").append(objects);
sb.append("]");
return sb.toString();
}
}

View file

@ -1,81 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.operator;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.TestBlockFactory;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class RowOperatorTests extends ESTestCase {
final DriverContext driverContext = new DriverContext(
new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, new NoneCircuitBreakerService()).withCircuitBreaking(),
TestBlockFactory.getNonBreakingInstance()
);
public void testBoolean() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(List.of(false));
assertThat(factory.describe(), equalTo("RowOperator[objects = false]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[false]]"));
BooleanBlock block = factory.get(driverContext).getOutput().getBlock(0);
assertThat(block.getBoolean(0), equalTo(false));
}
public void testInt() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(List.of(213));
assertThat(factory.describe(), equalTo("RowOperator[objects = 213]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[213]]"));
IntBlock block = factory.get(driverContext).getOutput().getBlock(0);
assertThat(block.getInt(0), equalTo(213));
}
public void testLong() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(List.of(21321343214L));
assertThat(factory.describe(), equalTo("RowOperator[objects = 21321343214]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[21321343214]]"));
LongBlock block = factory.get(driverContext).getOutput().getBlock(0);
assertThat(block.getLong(0), equalTo(21321343214L));
}
public void testDouble() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(List.of(2.0));
assertThat(factory.describe(), equalTo("RowOperator[objects = 2.0]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[2.0]]"));
DoubleBlock block = factory.get(driverContext).getOutput().getBlock(0);
assertThat(block.getDouble(0), equalTo(2.0));
}
public void testString() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(List.of(new BytesRef("cat")));
assertThat(factory.describe(), equalTo("RowOperator[objects = [63 61 74]]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[[63 61 74]]]"));
BytesRefBlock block = factory.get(driverContext).getOutput().getBlock(0);
assertThat(block.getBytesRef(0, new BytesRef()), equalTo(new BytesRef("cat")));
}
public void testNull() {
RowOperator.RowOperatorFactory factory = new RowOperator.RowOperatorFactory(Arrays.asList(new Object[] { null }));
assertThat(factory.describe(), equalTo("RowOperator[objects = null]"));
assertThat(factory.get(driverContext).toString(), equalTo("RowOperator[objects=[null]]"));
Block block = factory.get(driverContext).getOutput().getBlock(0);
assertTrue(block.isNull(0));
}
}

View file

@ -47,6 +47,7 @@ import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceAliasingEvalW
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceLimitAndSortAsTopN;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceOrderByExpressionWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRegexMatch;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceRowAsLocalRelation;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceStatsFilteredAggWithEval;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.ReplaceTrivialTypeConversions;
import org.elasticsearch.xpack.esql.optimizer.rules.logical.SetAsOptimized;
@ -192,6 +193,6 @@ public class LogicalPlanOptimizer extends ParameterizedRuleExecutor<LogicalPlan,
}
protected static Batch<LogicalPlan> cleanup() {
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN());
return new Batch<>("Clean Up", new ReplaceLimitAndSortAsTopN(), new ReplaceRowAsLocalRelation());
}
}

View file

@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.optimizer.rules.logical;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import java.util.ArrayList;
import java.util.List;
public final class ReplaceRowAsLocalRelation extends OptimizerRules.OptimizerRule<Row> {
@Override
protected LogicalPlan rule(Row row) {
var fields = row.fields();
List<Object> values = new ArrayList<>(fields.size());
fields.forEach(f -> values.add(f.child().fold()));
var blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
return new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks));
}
}

View file

@ -45,7 +45,6 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.OrderExec;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.SubqueryExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
@ -106,7 +105,6 @@ public class PlanWritables {
MvExpandExec.ENTRY,
OrderExec.ENTRY,
ProjectExec.ENTRY,
RowExec.ENTRY,
ShowExec.ENTRY,
SubqueryExec.ENTRY,
TopNExec.ENTRY

View file

@ -1,75 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.physical;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class RowExec extends LeafExec {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(PhysicalPlan.class, "RowExec", RowExec::new);
private final List<Alias> fields;
public RowExec(Source source, List<Alias> fields) {
super(source);
this.fields = fields;
}
private RowExec(StreamInput in) throws IOException {
this(Source.readFrom((PlanStreamInput) in), in.readCollectionAsList(Alias::new));
}
@Override
public void writeTo(StreamOutput out) throws IOException {
Source.EMPTY.writeTo(out);
out.writeCollection(fields());
}
@Override
public String getWriteableName() {
return ENTRY.name;
}
public List<Alias> fields() {
return fields;
}
@Override
public List<Attribute> output() {
return Expressions.asAttributes(fields);
}
@Override
protected NodeInfo<? extends PhysicalPlan> info() {
return NodeInfo.create(this, RowExec::new, fields);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RowExec constant = (RowExec) o;
return Objects.equals(fields, constant.fields);
}
@Override
public int hashCode() {
return Objects.hash(fields);
}
}

View file

@ -31,7 +31,6 @@ import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.RowInTableLookupOperator;
import org.elasticsearch.compute.operator.RowOperator.RowOperatorFactory;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
import org.elasticsearch.compute.operator.SinkOperator.SinkOperatorFactory;
@ -89,7 +88,6 @@ import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
@ -220,8 +218,6 @@ public class LocalExecutionPlanner {
return planEsQueryNode(esQuery, context);
} else if (node instanceof EsStatsQueryExec statsQuery) {
return planEsStats(statsQuery, context);
} else if (node instanceof RowExec row) {
return planRow(row, context);
} else if (node instanceof LocalSourceExec localSource) {
return planLocal(localSource, context);
} else if (node instanceof ShowExec show) {
@ -620,13 +616,6 @@ public class LocalExecutionPlanner {
return EvalMapper.toEvaluator(exp, layout);
}
private PhysicalOperation planRow(RowExec row, LocalExecutionPlannerContext context) {
List<Object> obj = row.fields().stream().map(f -> f.child().fold()).toList();
Layout.Builder layout = new Layout.Builder();
layout.append(row.output());
return PhysicalOperation.fromSource(new RowOperatorFactory(obj), layout.build());
}
private PhysicalOperation planLocal(LocalSourceExec localSourceExec, LocalExecutionPlannerContext context) {
Layout.Builder layout = new Layout.Builder();
layout.append(localSourceExec.output());

View file

@ -9,10 +9,7 @@ package org.elasticsearch.xpack.esql.planner.mapper;
import org.elasticsearch.common.lucene.BytesRefs;
import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.tree.Source;
@ -27,10 +24,8 @@ import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalSupplier;
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
@ -45,9 +40,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import java.util.ArrayList;
import java.util.List;
/**
@ -57,18 +50,6 @@ class MapperUtils {
private MapperUtils() {}
static PhysicalPlan mapLeaf(LeafPlan p) {
if (p instanceof Row row) {
// return new RowExec(row.source(), row.fields());
// convert row into local relation
List<Alias> fields = row.fields();
List<Object> values = new ArrayList<>(fields.size());
for (Alias field : fields) {
values.add(field.child().fold());
}
Block[] blocks = BlockUtils.fromListRow(PlannerUtils.NON_BREAKING_BLOCK_FACTORY, values);
p = new LocalRelation(row.source(), row.output(), LocalSupplier.of(blocks));
}
if (p instanceof LocalRelation local) {
return new LocalSourceExec(local.source(), local.output(), local.supplier());
}

View file

@ -2141,7 +2141,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
mvExpand = as(topN.child(), MvExpand.class);
var limit = as(mvExpand.child(), Limit.class);
assertThat(limit.limit().fold(), equalTo(7300));
as(limit.child(), Row.class);
as(limit.child(), LocalRelation.class);
}
/**
@ -2286,7 +2286,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var expand = as(plan, MvExpand.class);
assertThat(expand.limit(), equalTo(1000));
var topN = as(expand.child(), TopN.class);
var row = as(topN.child(), Row.class);
var row = as(topN.child(), LocalRelation.class);
}
/**
@ -2327,7 +2327,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
assertThat(expand.limit(), equalTo(1000));
var limit2 = as(expand.child(), Limit.class);
assertThat(limit2.limit().fold(), is(1000));
var row = as(limit2.child(), Row.class);
var row = as(limit2.child(), LocalRelation.class);
}
private static List<String> orderNames(TopN topN) {
@ -3545,7 +3545,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var filterProp = ((GreaterThan) filter.condition()).left();
assertTrue(expand.expanded().semanticEquals(filterProp));
assertFalse(expand.target().semanticEquals(filterProp));
var row = as(expand.child(), Row.class);
var row = as(expand.child(), LocalRelation.class);
}
/**
@ -3564,7 +3564,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var limit = as(plan, Limit.class);
var agg = as(limit.child(), Aggregate.class);
assertThat(Expressions.names(agg.groupings()), contains("a"));
var row = as(agg.child(), Row.class);
var row = as(agg.child(), LocalRelation.class);
}
/**
@ -3583,7 +3583,7 @@ public class LogicalPlanOptimizerTests extends ESTestCase {
var limit = as(plan, Limit.class);
var agg = as(limit.child(), Aggregate.class);
assertThat(Expressions.names(agg.groupings()), contains("a", "b"));
var row = as(agg.child(), Row.class);
var row = as(agg.child(), LocalRelation.class);
}
/**

View file

@ -1,51 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.physical;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.LiteralTests;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.tree.SourceTests;
import java.io.IOException;
import java.util.List;
public class RowExecSerializationTests extends AbstractPhysicalPlanSerializationTests<RowExec> {
public static RowExec randomRowExec() {
Source source = randomSource();
List<Alias> fields = randomList(1, 10, RowExecSerializationTests::randomAlias);
return new RowExec(source, fields);
}
private static Alias randomAlias() {
Source source = SourceTests.randomSource();
String name = randomAlphaOfLength(5);
Expression child = LiteralTests.randomLiteral();
boolean synthetic = randomBoolean();
return new Alias(source, name, child, new NameId(), synthetic);
}
@Override
protected RowExec createTestInstance() {
return randomRowExec();
}
@Override
protected RowExec mutateInstance(RowExec instance) throws IOException {
List<Alias> fields = instance.fields();
fields = randomValueOtherThan(fields, () -> randomList(1, 10, RowExecSerializationTests::randomAlias));
return new RowExec(instance.source(), fields);
}
@Override
protected boolean alwaysEmptySource() {
return true;
}
}