mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Fixing remote ENRICH by pushing the Enrich inside FragmentExec (#114665)
* Fixing remote ENRICH by pushing the Enrich inside FragmentExec * Improve handling of more complex cases such as several enriches
This commit is contained in:
parent
b4edc3ddab
commit
e789039dfa
5 changed files with 195 additions and 25 deletions
6
docs/changelog/114665.yaml
Normal file
6
docs/changelog/114665.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 114665
|
||||||
|
summary: Fixing remote ENRICH by pushing the Enrich inside `FragmentExec`
|
||||||
|
area: ES|QL
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 105095
|
|
@ -47,6 +47,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -469,27 +470,112 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testEnrichRemoteWithVendorNoSort() {
|
||||||
|
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
|
||||||
|
Boolean requestIncludeMeta = includeCCSMetadata.v1();
|
||||||
|
boolean responseExpectMeta = includeCCSMetadata.v2();
|
||||||
|
|
||||||
|
for (Enrich.Mode hostMode : List.of(Enrich.Mode.ANY, Enrich.Mode.REMOTE)) {
|
||||||
|
var query = String.format(Locale.ROOT, """
|
||||||
|
FROM *:events,events
|
||||||
|
| LIMIT 100
|
||||||
|
| eval ip= TO_STR(host)
|
||||||
|
| %s
|
||||||
|
| %s
|
||||||
|
| stats c = COUNT(*) by vendor
|
||||||
|
""", enrichHosts(hostMode), enrichVendors(Enrich.Mode.REMOTE));
|
||||||
|
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
|
||||||
|
var values = getValuesList(resp);
|
||||||
|
values.sort(Comparator.comparing(o -> (String) o.get(1), Comparator.nullsLast(Comparator.naturalOrder())));
|
||||||
|
assertThat(
|
||||||
|
values,
|
||||||
|
equalTo(
|
||||||
|
List.of(
|
||||||
|
List.of(6L, "Apple"),
|
||||||
|
List.of(7L, "Microsoft"),
|
||||||
|
List.of(1L, "Redhat"),
|
||||||
|
List.of(2L, "Samsung"),
|
||||||
|
List.of(1L, "Sony"),
|
||||||
|
List.of(2L, "Suse"),
|
||||||
|
Arrays.asList(3L, (String) null)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
|
||||||
|
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
|
||||||
|
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
|
||||||
|
assertCCSExecutionInfoDetails(executionInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void testTopNThenEnrichRemote() {
|
public void testTopNThenEnrichRemote() {
|
||||||
|
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
|
||||||
|
Boolean requestIncludeMeta = includeCCSMetadata.v1();
|
||||||
|
boolean responseExpectMeta = includeCCSMetadata.v2();
|
||||||
|
|
||||||
String query = String.format(Locale.ROOT, """
|
String query = String.format(Locale.ROOT, """
|
||||||
FROM *:events,events
|
FROM *:events,events
|
||||||
| eval ip= TO_STR(host)
|
| eval ip= TO_STR(host)
|
||||||
| SORT ip
|
| SORT timestamp, user, ip
|
||||||
| LIMIT 5
|
| LIMIT 5
|
||||||
| %s
|
| %s | KEEP host, timestamp, user, os
|
||||||
""", enrichHosts(Enrich.Mode.REMOTE));
|
""", enrichHosts(Enrich.Mode.REMOTE));
|
||||||
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
|
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
|
||||||
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
|
assertThat(
|
||||||
|
getValuesList(resp),
|
||||||
|
equalTo(
|
||||||
|
List.of(
|
||||||
|
List.of("192.168.1.2", 1L, "andres", "Windows"),
|
||||||
|
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
|
||||||
|
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
|
||||||
|
List.of("192.168.1.5", 2L, "akio", "Android"),
|
||||||
|
List.of("192.168.1.6", 2L, "sergio", "iOS")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
|
||||||
|
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
|
||||||
|
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
|
||||||
|
assertCCSExecutionInfoDetails(executionInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testLimitThenEnrichRemote() {
|
public void testLimitThenEnrichRemote() {
|
||||||
|
Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
|
||||||
|
Boolean requestIncludeMeta = includeCCSMetadata.v1();
|
||||||
|
boolean responseExpectMeta = includeCCSMetadata.v2();
|
||||||
|
|
||||||
String query = String.format(Locale.ROOT, """
|
String query = String.format(Locale.ROOT, """
|
||||||
FROM *:events,events
|
FROM *:events,events
|
||||||
| LIMIT 10
|
| LIMIT 25
|
||||||
| eval ip= TO_STR(host)
|
| eval ip= TO_STR(host)
|
||||||
| %s
|
| %s | KEEP host, timestamp, user, os
|
||||||
""", enrichHosts(Enrich.Mode.REMOTE));
|
""", enrichHosts(Enrich.Mode.REMOTE));
|
||||||
var error = expectThrows(VerificationException.class, () -> runQuery(query, randomBoolean()).close());
|
try (EsqlQueryResponse resp = runQuery(query, requestIncludeMeta)) {
|
||||||
assertThat(error.getMessage(), containsString("ENRICH with remote policy can't be executed after LIMIT"));
|
var values = getValuesList(resp);
|
||||||
|
values.sort(
|
||||||
|
Comparator.comparingLong((List<Object> o) -> (Long) o.get(1))
|
||||||
|
.thenComparing(o -> (String) o.get(0))
|
||||||
|
.thenComparing(o -> (String) o.get(2))
|
||||||
|
);
|
||||||
|
assertThat(
|
||||||
|
values.subList(0, 5),
|
||||||
|
equalTo(
|
||||||
|
List.of(
|
||||||
|
List.of("192.168.1.2", 1L, "andres", "Windows"),
|
||||||
|
Arrays.asList("192.168.1.25", 1L, "park", (String) null),
|
||||||
|
List.of("192.168.1.3", 1L, "matthew", "MacOS"),
|
||||||
|
List.of("192.168.1.5", 2L, "akio", "Android"),
|
||||||
|
List.of("192.168.1.5", 2L, "simon", "Android")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
EsqlExecutionInfo executionInfo = resp.getExecutionInfo();
|
||||||
|
assertThat(executionInfo.includeCCSMetadata(), equalTo(responseExpectMeta));
|
||||||
|
assertThat(executionInfo.clusterAliases(), equalTo(Set.of("", "c1", "c2")));
|
||||||
|
assertCCSExecutionInfoDetails(executionInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAggThenEnrichRemote() {
|
public void testAggThenEnrichRemote() {
|
||||||
|
|
|
@ -609,22 +609,15 @@ public class Verifier {
|
||||||
*/
|
*/
|
||||||
private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
|
private static void checkRemoteEnrich(LogicalPlan plan, Set<Failure> failures) {
|
||||||
boolean[] agg = { false };
|
boolean[] agg = { false };
|
||||||
boolean[] limit = { false };
|
|
||||||
boolean[] enrichCoord = { false };
|
boolean[] enrichCoord = { false };
|
||||||
|
|
||||||
plan.forEachUp(UnaryPlan.class, u -> {
|
plan.forEachUp(UnaryPlan.class, u -> {
|
||||||
if (u instanceof Limit) {
|
|
||||||
limit[0] = true; // TODO: Make Limit then enrich_remote work
|
|
||||||
}
|
|
||||||
if (u instanceof Aggregate) {
|
if (u instanceof Aggregate) {
|
||||||
agg[0] = true;
|
agg[0] = true;
|
||||||
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
|
} else if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.COORDINATOR) {
|
||||||
enrichCoord[0] = true;
|
enrichCoord[0] = true;
|
||||||
}
|
}
|
||||||
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
|
if (u instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
|
||||||
if (limit[0]) {
|
|
||||||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after LIMIT"));
|
|
||||||
}
|
|
||||||
if (agg[0]) {
|
if (agg[0]) {
|
||||||
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
|
failures.add(fail(enrich, "ENRICH with remote policy can't be executed after STATS"));
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,8 +52,10 @@ import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
|
||||||
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
|
import org.elasticsearch.xpack.esql.plan.physical.RowExec;
|
||||||
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
|
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
|
||||||
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
|
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
|
||||||
|
import org.elasticsearch.xpack.esql.plan.physical.UnaryExec;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>This class is part of the planner</p>
|
* <p>This class is part of the planner</p>
|
||||||
|
@ -104,6 +106,46 @@ public class Mapper {
|
||||||
//
|
//
|
||||||
// Unary Plan
|
// Unary Plan
|
||||||
//
|
//
|
||||||
|
if (localMode == false && p instanceof Enrich enrich && enrich.mode() == Enrich.Mode.REMOTE) {
|
||||||
|
// When we have remote enrich, we want to put it under FragmentExec, so it would be executed remotely.
|
||||||
|
// We're only going to do it on the coordinator node.
|
||||||
|
// The way we're going to do it is as follows:
|
||||||
|
// 1. Locate FragmentExec in the tree. If we have no FragmentExec, we won't do anything.
|
||||||
|
// 2. Put this Enrich under it, removing everything that was below it previously.
|
||||||
|
// 3. Above FragmentExec, we should deal with pipeline breakers, since pipeline ops already are supposed to go under
|
||||||
|
// FragmentExec.
|
||||||
|
// 4. Aggregates can't appear here since the plan should have errored out if we have aggregate inside remote Enrich.
|
||||||
|
// 5. So we should be keeping: LimitExec, ExchangeExec, OrderExec, TopNExec (actually OrderExec probably can't happen anyway).
|
||||||
|
|
||||||
|
var child = map(enrich.child());
|
||||||
|
AtomicBoolean hasFragment = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
var childTransformed = child.transformUp((f) -> {
|
||||||
|
// Once we reached FragmentExec, we stuff our Enrich under it
|
||||||
|
if (f instanceof FragmentExec) {
|
||||||
|
hasFragment.set(true);
|
||||||
|
return new FragmentExec(p);
|
||||||
|
}
|
||||||
|
if (f instanceof EnrichExec enrichExec) {
|
||||||
|
// It can only be ANY because COORDINATOR would have errored out earlier, and REMOTE should be under FragmentExec
|
||||||
|
assert enrichExec.mode() == Enrich.Mode.ANY : "enrich must be in ANY mode here";
|
||||||
|
return enrichExec.child();
|
||||||
|
}
|
||||||
|
if (f instanceof UnaryExec unaryExec) {
|
||||||
|
if (f instanceof LimitExec || f instanceof ExchangeExec || f instanceof OrderExec || f instanceof TopNExec) {
|
||||||
|
return f;
|
||||||
|
} else {
|
||||||
|
return unaryExec.child();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Currently, it's either UnaryExec or LeafExec. Leaf will either resolve to FragmentExec or we'll ignore it.
|
||||||
|
return f;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (hasFragment.get()) {
|
||||||
|
return childTransformed;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (p instanceof UnaryPlan ua) {
|
if (p instanceof UnaryPlan ua) {
|
||||||
var child = map(ua.child());
|
var child = map(ua.child());
|
||||||
|
|
|
@ -172,7 +172,7 @@ import static org.hamcrest.Matchers.not;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.hamcrest.Matchers.startsWith;
|
import static org.hamcrest.Matchers.startsWith;
|
||||||
|
|
||||||
// @TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
|
// @TestLogging(value = "org.elasticsearch.xpack.esql:DEBUG", reason = "debug")
|
||||||
public class PhysicalPlanOptimizerTests extends ESTestCase {
|
public class PhysicalPlanOptimizerTests extends ESTestCase {
|
||||||
|
|
||||||
private static final String PARAM_FORMATTING = "%1$s";
|
private static final String PARAM_FORMATTING = "%1$s";
|
||||||
|
@ -5851,14 +5851,14 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
|
||||||
| EVAL employee_id = to_str(emp_no)
|
| EVAL employee_id = to_str(emp_no)
|
||||||
| ENRICH _remote:departments
|
| ENRICH _remote:departments
|
||||||
| LIMIT 10""");
|
| LIMIT 10""");
|
||||||
var enrich = as(plan, EnrichExec.class);
|
var finalLimit = as(plan, LimitExec.class);
|
||||||
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
|
|
||||||
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
|
|
||||||
var eval = as(enrich.child(), EvalExec.class);
|
|
||||||
var finalLimit = as(eval.child(), LimitExec.class);
|
|
||||||
var exchange = as(finalLimit.child(), ExchangeExec.class);
|
var exchange = as(finalLimit.child(), ExchangeExec.class);
|
||||||
var fragment = as(exchange.child(), FragmentExec.class);
|
var fragment = as(exchange.child(), FragmentExec.class);
|
||||||
var partialLimit = as(fragment.fragment(), Limit.class);
|
var enrich = as(fragment.fragment(), Enrich.class);
|
||||||
|
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
|
||||||
|
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
|
||||||
|
var evalFragment = as(enrich.child(), Eval.class);
|
||||||
|
var partialLimit = as(evalFragment.child(), Limit.class);
|
||||||
as(partialLimit.child(), EsRelation.class);
|
as(partialLimit.child(), EsRelation.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5901,13 +5901,21 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testLimitThenEnrichRemote() {
|
public void testLimitThenEnrichRemote() {
|
||||||
var error = expectThrows(VerificationException.class, () -> physicalPlan("""
|
var plan = physicalPlan("""
|
||||||
FROM test
|
FROM test
|
||||||
| LIMIT 10
|
| LIMIT 10
|
||||||
| EVAL employee_id = to_str(emp_no)
|
| EVAL employee_id = to_str(emp_no)
|
||||||
| ENRICH _remote:departments
|
| ENRICH _remote:departments
|
||||||
"""));
|
""");
|
||||||
assertThat(error.getMessage(), containsString("line 4:3: ENRICH with remote policy can't be executed after LIMIT"));
|
var finalLimit = as(plan, LimitExec.class);
|
||||||
|
var exchange = as(finalLimit.child(), ExchangeExec.class);
|
||||||
|
var fragment = as(exchange.child(), FragmentExec.class);
|
||||||
|
var enrich = as(fragment.fragment(), Enrich.class);
|
||||||
|
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
|
||||||
|
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
|
||||||
|
var evalFragment = as(enrich.child(), Eval.class);
|
||||||
|
var partialLimit = as(evalFragment.child(), Limit.class);
|
||||||
|
as(partialLimit.child(), EsRelation.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEnrichBeforeTopN() {
|
public void testEnrichBeforeTopN() {
|
||||||
|
@ -5961,6 +5969,23 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
|
||||||
var eval = as(enrich.child(), Eval.class);
|
var eval = as(enrich.child(), Eval.class);
|
||||||
as(eval.child(), EsRelation.class);
|
as(eval.child(), EsRelation.class);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
var plan = physicalPlan("""
|
||||||
|
FROM test
|
||||||
|
| EVAL employee_id = to_str(emp_no)
|
||||||
|
| ENRICH _remote:departments
|
||||||
|
| SORT department
|
||||||
|
| LIMIT 10""");
|
||||||
|
var topN = as(plan, TopNExec.class);
|
||||||
|
var exchange = as(topN.child(), ExchangeExec.class);
|
||||||
|
var fragment = as(exchange.child(), FragmentExec.class);
|
||||||
|
var partialTopN = as(fragment.fragment(), TopN.class);
|
||||||
|
var enrich = as(partialTopN.child(), Enrich.class);
|
||||||
|
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
|
||||||
|
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
|
||||||
|
var eval = as(enrich.child(), Eval.class);
|
||||||
|
as(eval.child(), EsRelation.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testEnrichAfterTopN() {
|
public void testEnrichAfterTopN() {
|
||||||
|
@ -6000,6 +6025,24 @@ public class PhysicalPlanOptimizerTests extends ESTestCase {
|
||||||
var partialTopN = as(fragment.fragment(), TopN.class);
|
var partialTopN = as(fragment.fragment(), TopN.class);
|
||||||
as(partialTopN.child(), EsRelation.class);
|
as(partialTopN.child(), EsRelation.class);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
var plan = physicalPlan("""
|
||||||
|
FROM test
|
||||||
|
| SORT emp_no
|
||||||
|
| LIMIT 10
|
||||||
|
| EVAL employee_id = to_str(emp_no)
|
||||||
|
| ENRICH _remote:departments
|
||||||
|
""");
|
||||||
|
var topN = as(plan, TopNExec.class);
|
||||||
|
var exchange = as(topN.child(), ExchangeExec.class);
|
||||||
|
var fragment = as(exchange.child(), FragmentExec.class);
|
||||||
|
var enrich = as(fragment.fragment(), Enrich.class);
|
||||||
|
assertThat(enrich.mode(), equalTo(Enrich.Mode.REMOTE));
|
||||||
|
assertThat(enrich.concreteIndices(), equalTo(Map.of("cluster_1", ".enrich-departments-2")));
|
||||||
|
var evalFragment = as(enrich.child(), Eval.class);
|
||||||
|
var partialTopN = as(evalFragment.child(), TopN.class);
|
||||||
|
as(partialTopN.child(), EsRelation.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testManyEnrich() {
|
public void testManyEnrich() {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue