different operator categories wrt sampling. Remove SampleBreaking interface

This commit is contained in:
Jan Kuipers 2025-04-15 15:50:04 +02:00
parent aca72bf903
commit b481aaa63f
6 changed files with 52 additions and 94 deletions

View file

@ -12,9 +12,9 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.expression.function.aggregate.HasSampleCorrection;
import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Limit;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.SampleBreaking;
import org.elasticsearch.xpack.esql.rule.Rule;
import java.util.ArrayList;
@ -36,7 +36,9 @@ public class ApplySampleCorrections extends Rule<LogicalPlan, LogicalPlan> {
: e
);
}
if (plan instanceof SampleBreaking) {
// Operations that map many to many rows break/reset sampling.
// Therefore, the sample probabilities are cleared.
if (plan instanceof Aggregate || plan instanceof Limit) {
sampleProbabilities.clear();
}
return plan;

View file

@ -11,11 +11,45 @@ import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Foldables;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.optimizer.LogicalOptimizerContext;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
import org.elasticsearch.xpack.esql.plan.logical.Filter;
import org.elasticsearch.xpack.esql.plan.logical.Insist;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RegexExtract;
import org.elasticsearch.xpack.esql.plan.logical.Sample;
import org.elasticsearch.xpack.esql.plan.logical.SampleBreaking;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
/**
* Pushes down the SAMPLE operator. SAMPLE can be pushed down through an
* operator if
* <p>
* <code>| SAMPLE p | OPERATOR</code>
* <p>
* is equivalent to
* <p>
* <code>| OPERATOR | SAMPLE p</code>
* <p>
* statistically (i.e. same possible output with same probabilities).
* In that case, we push down sampling to Lucene for efficiency.
* <p>
*
* As a rule of thumb, if an operator can be swapped with sampling if it maps:
* <ul>
* <li>
* one row to one row (e.g. <code>DISSECT</code>, <code>DROP</code>, <code>ENRICH</code>,
* <code>EVAL</code>, <code>GROK</code>, <code>KEEP</code>, <code>RENAME</code>)
* </li>
* <li>
* one row to zero or one row (<code>WHERE</code>)
* </li>
* <li>
* reorders the rows (<code>SORT</code>)
* </li>
* </ul>
*/
public class PushDownAndCombineSample extends OptimizerRules.ParameterizedOptimizerRule<Sample, LogicalOptimizerContext> {
public PushDownAndCombineSample() {
@ -30,9 +64,16 @@ public class PushDownAndCombineSample extends OptimizerRules.ParameterizedOptimi
var probability = combinedProbability(context, sample, sampleChild);
var seed = combinedSeed(context, sample, sampleChild);
plan = new Sample(sample.source(), probability, seed, sampleChild.child());
} else if (child instanceof UnaryPlan unaryChild && child instanceof SampleBreaking == false) {
plan = unaryChild.replaceChild(sample.replaceChild(unaryChild.child()));
}
} else if (child instanceof Enrich
|| child instanceof Eval
|| child instanceof Filter
|| child instanceof Insist
|| child instanceof OrderBy
|| child instanceof Project
|| child instanceof RegexExtract) {
var unaryChild = (UnaryPlan) child;
plan = unaryChild.replaceChild(sample.replaceChild(unaryChild.child()));
}
return plan;
}

View file

@ -42,7 +42,7 @@ import static org.elasticsearch.xpack.esql.common.Failure.fail;
import static org.elasticsearch.xpack.esql.expression.NamedExpressions.mergeOutputAttributes;
import static org.elasticsearch.xpack.esql.plan.logical.Filter.checkFilterConditionDataType;
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic, SampleBreaking {
public class Aggregate extends UnaryPlan implements PostAnalysisVerificationAware, TelemetryAware, SortAgnostic {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
LogicalPlan.class,
"Aggregate",

View file

@ -18,7 +18,7 @@ import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import java.io.IOException;
import java.util.Objects;
public class Limit extends UnaryPlan implements TelemetryAware, SampleBreaking {
public class Limit extends UnaryPlan implements TelemetryAware {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "Limit", Limit::new);
private final Expression limit;

View file

@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class MvExpand extends UnaryPlan implements TelemetryAware, SortAgnostic, SampleBreaking {
public class MvExpand extends UnaryPlan implements TelemetryAware, SortAgnostic {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(LogicalPlan.class, "MvExpand", MvExpand::new);
private final NamedExpression target;

View file

@ -1,85 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.logical;
/**
* This interface is to check whether a plan breaks the random sampling context.
* <p>
*
* Random sampling aims to correct <code>STATS</code> after them, e.g.
* <p>
* <code>
* | SAMPLE 0.1 | STATS SUM(value)
* </code>
* <p>
* gives an estimate of the sum of the values, and not 10% of the sum.
* <p>
*
* For many commands inbetween this works fine, because they can be swapped
* with <code>SAMPLE</code>. For example,
* <p>
* <code>
* | SAMPLE 0.1 | SORT value
* </code>
* <p>
* <code>
* | SAMPLE 0.1 | WHERE value > 10
* </code>
* <p>
* are equivalent to
* <p>
* <code>
* | SORT value | SAMPLE 0.1
* </code>
* <p>
* <code>
* | WHERE value > 10 | SAMPLE 0.1
* </code>
* <p>
* (statistically equivalent, not necessary identical), and therefore succeeding
* <code>STATS</code> can be adjusted for the sample size.
* <p>
*
* In other cases, commands cannot be swapped with <code>SAMPLE</code>, e.g.
* <p>
* <code>
* | SAMPLE 0.1 | MV_EXPAND value
* </code>
* <p>
* <code>
* | SAMPLE 0.1 | LIMIT 100
* </code>
* <p>
* In those cases, it also makes no sense to correct any succeeding <code>STATS</code>.
* <p>
*
* As a rule of thumb, if an operator can be swapped with random sampling if it maps:
* <ul>
* <li>
* one row to one row (e.g. <code>DISSECT</code>, <code>DROP</code>, <code>ENRICH</code>,
* <code>EVAL</code>, <code>GROK</code>, <code>KEEP</code>, <code>RENAME</code>)
* </li>
* <li>
* one row to zero or one row (<code>WHERE</code>)
* </li>
* <li>
* reorders the rows (<code>SORT</code>)
* </li>
* </ul>
* Contrarily, it is sampling breaking (and should implement this interface) if it maps:
* <ul>
* <li>
* one row to many rows (<code>MV_EXPAND</code>)
* </li>
* <li>
* many rows to many rows (<code>LIMIT</code>, <code>STATS</code>)
* </li>
* </ul>
*
*/
public interface SampleBreaking {}