From 49f8e5c0aed576172ebb1d7d38e4992aa7c812a5 Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Fri, 6 Jun 2025 16:49:39 +0200 Subject: [PATCH] ES|QL: Support STATS after FORK (#128745) --- .../xpack/esql/ccq/MultiClusterSpecIT.java | 4 +- .../src/main/resources/fork.csv-spec | 89 ++++++++++++++++--- .../xpack/esql/action/ForkIT.java | 33 +++++++ .../xpack/esql/action/EsqlCapabilities.java | 2 +- .../elasticsearch/xpack/esql/CsvTests.java | 2 +- 5 files changed, 112 insertions(+), 18 deletions(-) diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 75372792c28d..fec351eb7784 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -46,7 +46,7 @@ import static org.elasticsearch.xpack.esql.CsvSpecReader.specParser; import static org.elasticsearch.xpack.esql.CsvTestUtils.isEnabled; import static org.elasticsearch.xpack.esql.CsvTestsDataLoader.ENRICH_SOURCE_INDICES; import static org.elasticsearch.xpack.esql.EsqlTestUtils.classpathResources; -import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V6; +import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.FORK_V7; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V2; import static org.elasticsearch.xpack.esql.action.EsqlCapabilities.Cap.INLINESTATS_V7; @@ -132,7 +132,7 @@ public class MultiClusterSpecIT extends EsqlSpecTestCase { assumeFalse("LOOKUP JOIN not yet supported in CCS", testCase.requiredCapabilities.contains(JOIN_LOOKUP_V12.capabilityName())); // Unmapped fields require a coorect capability response from every cluster, which isn't currently implemented. assumeFalse("UNMAPPED FIELDS not yet supported in CCS", testCase.requiredCapabilities.contains(UNMAPPED_FIELDS.capabilityName())); - assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V6.capabilityName())); + assumeFalse("FORK not yet supported in CCS", testCase.requiredCapabilities.contains(FORK_V7.capabilityName())); } @Override diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec index 3e2d2ce6ae3b..925c02ba3069 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/fork.csv-spec @@ -3,7 +3,7 @@ // simpleFork -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK ( WHERE emp_no == 10001 ) @@ -18,7 +18,7 @@ emp_no:integer | _fork:keyword ; forkWithWhereSortAndLimit -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name | LIMIT 5 ) @@ -38,7 +38,7 @@ emp_no:integer | first_name:keyword | _fork:keyword ; fiveFork -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK ( WHERE emp_no == 10005 ) @@ -59,7 +59,7 @@ fork5 | 10001 ; forkWithWhereSortDescAndLimit -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK ( WHERE hire_date < "1985-03-01T00:00:00Z" | SORT first_name DESC | LIMIT 2 ) @@ -76,7 +76,7 @@ fork2 | 10087 | Xinglin ; forkWithCommonPrefilter -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | WHERE emp_no > 10050 @@ -94,7 +94,7 @@ fork2 | 10100 ; forkWithSemanticSearchAndScore -required_capability: fork_v6 +required_capability: fork_v7 required_capability: semantic_text_field_caps required_capability: metadata_score @@ -114,7 +114,7 @@ fork2 | 6.093784261960139E18 | 2 | all we have to decide is w ; forkWithEvals -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081 | EVAL x = "abc" | EVAL y = 1) @@ -131,7 +131,7 @@ fork2 | 10087 | def | null | 2 ; forkWithStats -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | FORK (WHERE emp_no == 10048 OR emp_no == 10081) @@ -152,7 +152,7 @@ fork4 | null | 100 | 10001 | null ; forkWithDissect -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 @@ -172,7 +172,7 @@ fork2 | 10081 | Rosen | 10081 | null | Zhongwei ; forkWithMixOfCommands -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | WHERE emp_no == 10048 OR emp_no == 10081 @@ -197,7 +197,7 @@ fork4 | 10081 | abc | aaa | null | null ; forkWithFiltersOnConstantValues -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | EVAL z = 1 @@ -218,7 +218,7 @@ fork3 | null | 100 | 10100 | 10001 ; forkWithUnsupportedAttributes -required_capability: fork_v6 +required_capability: fork_v7 FROM heights | FORK (SORT description DESC | LIMIT 1 | EVAL x = length(description) ) @@ -232,7 +232,7 @@ Medium Height | null | null | fork2 ; forkAfterLookupJoin -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | EVAL language_code = languages @@ -253,7 +253,7 @@ fork3 | 10081 | 2 | Klingon ; forkBeforeLookupJoin -required_capability: fork_v6 +required_capability: fork_v7 FROM employees | EVAL language_code = languages @@ -272,3 +272,64 @@ fork2 | 10081 | 2 | French fork2 | 10087 | 5 | null fork3 | 10081 | 2 | French ; + + +forkBeforeStats +required_capability: fork_v7 + +FROM employees +| WHERE emp_no == 10048 OR emp_no == 10081 +| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name) + | DISSECT a "%{x} %{y} %{z}" + | EVAL y = y::keyword ) + ( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword ) + ( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name ) + ( EVAL x = "abc" | EVAL y = "aaa" ) +| STATS c = count(*), m = max(_fork) +; + +c:long | m:keyword +7 | fork4 +; + +forkBeforeStatsWithWhere +required_capability: fork_v7 + +FROM employees +| WHERE emp_no == 10048 OR emp_no == 10081 +| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name) + | DISSECT a "%{x} %{y} %{z}" + | EVAL y = y::keyword ) + ( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword ) + ( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name ) + ( EVAL x = "abc" | EVAL y = "aaa" ) +| STATS a = count(*) WHERE _fork == "fork1", + b = max(_fork) +; + +a:long | b:keyword +2 | fork4 +; + +forkBeforeStatsByWithWhere +required_capability: fork_v7 + +FROM employees +| WHERE emp_no == 10048 OR emp_no == 10081 +| FORK ( EVAL a = CONCAT(first_name, " ", emp_no::keyword, " ", last_name) + | DISSECT a "%{x} %{y} %{z}" + | EVAL y = y::keyword ) + ( STATS x = COUNT(*)::keyword, y = MAX(emp_no)::keyword, z = MIN(emp_no)::keyword ) + ( SORT emp_no ASC | LIMIT 2 | EVAL x = last_name ) + ( EVAL x = "abc" | EVAL y = "aaa" ) +| STATS a = count(*) WHERE emp_no > 10000, + b = max(x) WHERE _fork == "fork1" BY _fork +| SORT _fork +; + +a:long | b:keyword | _fork:keyword +2 | Zhongwei | fork1 +0 | null | fork2 +2 | null | fork3 +2 | null | fork4 +; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java index 046c583be392..86051e7e4164 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java @@ -588,6 +588,39 @@ public class ForkIT extends AbstractEsqlIntegTestCase { } } + public void testWithStatsAfterFork() { + var query = """ + FROM test + | FORK ( WHERE content:"fox" | EVAL a = 1) + ( WHERE content:"cat" | EVAL b = 2 ) + ( WHERE content:"dog" | EVAL c = 3 ) + | STATS c = count(*) + """; + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("c")); + assertColumnTypes(resp.columns(), List.of("long")); + Iterable> expectedValues = List.of(List.of(7L)); + assertValues(resp.values(), expectedValues); + } + } + + public void testWithStatsWithWhereAfterFork() { + var query = """ + FROM test + | FORK ( WHERE content:"fox" | EVAL a = 1) + ( WHERE content:"cat" | EVAL b = 2 ) + ( WHERE content:"dog" | EVAL c = 3 ) + | STATS c = count(*) WHERE _fork == "fork1" + """; + try (var resp = run(query)) { + assertColumnNames(resp.columns(), List.of("c")); + assertColumnTypes(resp.columns(), List.of("long")); + + Iterable> expectedValues = List.of(List.of(2L)); + assertValues(resp.values(), expectedValues); + } + } + public void testWithConditionOnForkField() { var query = """ FROM test diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 2b86b1726d05..8396965096e0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1035,7 +1035,7 @@ public class EsqlCapabilities { /** * Support streaming of sub plan results */ - FORK_V6(Build.current().isSnapshot()), + FORK_V7(Build.current().isSnapshot()), /** * Support for the {@code leading_zeros} named parameter. diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index e3b1a71d080c..58860a163d94 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -318,7 +318,7 @@ public class CsvTests extends ESTestCase { ); assumeFalse( "CSV tests cannot currently handle FORK", - testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V6.capabilityName()) + testCase.requiredCapabilities.contains(EsqlCapabilities.Cap.FORK_V7.capabilityName()) ); assumeFalse( "CSV tests cannot currently handle multi_match function that depends on Lucene",