ES|QL: Add initial grammar and planning for RRF (snapshot) (#123396)

This commit is contained in:
Ioana Tagirta 2025-03-11 10:18:11 +01:00 committed by GitHub
parent f1f2df77ba
commit cda82554aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 3247 additions and 2363 deletions

View file

@ -0,0 +1,5 @@
pr: 123396
summary: Add initial grammar and planning for RRF (snapshot)
area: ES|QL
type: feature
issues: []

View file

@ -0,0 +1,79 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.operator;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.Page;
import java.util.HashMap;
/**
* Updates the score column with new scores using the RRF formula.
* Receives the position of the score and fork columns.
* The new score we assign to each row is equal to {@code 1 / (rank_constant + row_number)}.
* We use the fork discriminator column to determine the {@code row_number} for each row.
*/
public class RrfScoreEvalOperator extends AbstractPageMappingOperator {
public record Factory(int forkPosition, int scorePosition) implements OperatorFactory {
@Override
public Operator get(DriverContext driverContext) {
return new RrfScoreEvalOperator(forkPosition, scorePosition);
}
@Override
public String describe() {
return "RrfScoreEvalOperator";
}
}
private final int scorePosition;
private final int forkPosition;
private HashMap<String, Integer> counters = new HashMap<>();
public RrfScoreEvalOperator(int forkPosition, int scorePosition) {
this.scorePosition = scorePosition;
this.forkPosition = forkPosition;
}
@Override
protected Page process(Page page) {
BytesRefBlock forkBlock = (BytesRefBlock) page.getBlock(forkPosition);
DoubleVector.Builder scores = forkBlock.blockFactory().newDoubleVectorBuilder(forkBlock.getPositionCount());
for (int i = 0; i < page.getPositionCount(); i++) {
String fork = forkBlock.getBytesRef(i, new BytesRef()).utf8ToString();
int rank = counters.getOrDefault(fork, 1);
counters.put(fork, rank + 1);
scores.appendDouble(1.0 / (60 + rank));
}
Block scoreBlock = scores.build().asBlock();
page = page.appendBlock(scoreBlock);
int[] projections = new int[page.getBlockCount() - 1];
for (int i = 0; i < page.getBlockCount() - 1; i++) {
projections[i] = i == scorePosition ? page.getBlockCount() - 1 : i;
}
return page.projectBlocks(projections);
}
@Override
public String toString() {
return "RrfScoreEvalOperator";
}
}

View file

@ -0,0 +1,111 @@
//
// CSV spec for RRF command
//
simpleRrf
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon
FROM employees METADATA _id, _index, _score
| FORK ( WHERE emp_no:10001 )
( WHERE emp_no:10002 )
| RRF
| EVAL _score = round(_score, 4)
| KEEP _score, _fork, emp_no
| SORT _score, _fork, emp_no
;
_score:double | _fork:keyword | emp_no:integer
0.0164 | fork1 | 10001
0.0164 | fork2 | 10002
;
rrfWithMatchAndScore
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon
FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 5)
| KEEP _score, _fork, _id
;
_score:double | _fork:keyword | _id:keyword
0.03279 | [fork1, fork2] | 4
0.01613 | fork1 | 56
0.01613 | fork2 | 60
0.01587 | fork2 | 1
0.01587 | fork1 | 26
;
rrfWithDisjunctionAndPostFilter
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon
FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" OR author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 5)
| KEEP _score, _fork, _id
| WHERE _score > 0.014
;
_score:double | _fork:keyword | _id:keyword
0.03252 | [fork1, fork2] | 60
0.032 | [fork1, fork2] | 1
0.01639 | fork2 | 4
0.01587 | fork1 | 40
;
rrfWithStats
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon
FROM books METADATA _id, _index, _score
| FORK ( WHERE title:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Tolkien" | SORT _score, _id DESC | LIMIT 3 )
( WHERE author:"Ursula K. Le Guin" AND title:"short stories" | SORT _score, _id DESC | LIMIT 3)
| RRF
| STATS count_fork=COUNT(*) BY _fork
;
count_fork:long | _fork:keyword
3 | fork1
3 | fork2
1 | fork3
;
rrfWithMultipleForkBranches
required_capability: fork
required_capability: rrf
required_capability: match_operator_colon
FROM books METADATA _id, _index, _score
| FORK (WHERE author:"Keith Faulkner" AND qstr("author:Rory or author:Beverlie") | SORT _score, _id DESC | LIMIT 3)
(WHERE author:"Ursula K. Le Guin" | SORT _score, _id DESC | LIMIT 3)
(WHERE title:"Tolkien" AND author:"Tolkien" AND year > 2000 AND mv_count(author) == 1 | SORT _score, _id DESC | LIMIT 3)
(WHERE match(author, "Keith Faulkner") AND match(author, "Rory Tyger") | SORT _score, _id DESC | LIMIT 3)
| RRF
| EVAL _fork = mv_sort(_fork)
| EVAL _score = round(_score, 4)
| EVAL title = trim(substring(title, 1, 20))
| KEEP _score, author, title, _fork
;
_score:double | author:keyword | title:keyword | _fork:keyword
0.0328 | [Keith Faulkner, Rory Tyger] | Pop! Went Another Ba | [fork1, fork4]
0.0164 | J.R.R. Tolkien | Letters of J R R Tol | fork3
0.0164 | Ursula K. Le Guin | The wind's twelve qu | fork2
0.0161 | [Beverlie Manson, Keith Faulkner] | Rainbow's End: A Mag | fork1
0.0161 | Ursula K. Le Guin | The Word For World i | fork2
0.0159 | Ursula K. Le Guin | The Dispossessed | fork2
;

View file

@ -350,6 +350,32 @@ public class ForkIT extends AbstractEsqlIntegTestCase {
}
}
public void testRrf() {
assumeTrue("requires RRF capability", EsqlCapabilities.Cap.RRF.isEnabled());
var query = """
FROM test METADATA _score, _id, _index
| WHERE id > 2
| FORK
( WHERE content:"fox" | SORT _score, _id DESC )
( WHERE content:"dog" | SORT _score, _id DESC )
| RRF
| EVAL _score = round(_score, 4)
| KEEP id, content, _score, _fork
""";
try (var resp = run(query)) {
assertColumnNames(resp.columns(), List.of("id", "content", "_score", "_fork"));
assertColumnTypes(resp.columns(), List.of("integer", "keyword", "double", "keyword"));
assertThat(getValuesList(resp.values()).size(), equalTo(3));
Iterable<Iterable<Object>> expectedValues = List.of(
List.of(6, "The quick brown fox jumps over the lazy dog", 0.0325, List.of("fork1", "fork2")),
List.of(4, "The dog is brown but this document is very very long", 0.0164, "fork2"),
List.of(3, "This dog is really brown", 0.0159, "fork2")
);
assertValues(resp.values(), expectedValues);
}
}
public void testThreeSubQueries() {
var query = """
FROM test

View file

@ -65,6 +65,7 @@ import ChangePoint,
Metrics,
MvExpand,
Project,
Rrf,
Rename,
Show,
UnknownCommand;

View file

@ -25,118 +25,119 @@ MV_EXPAND=24
DROP=25
KEEP=26
DEV_INSIST=27
RENAME=28
SHOW=29
UNKNOWN_CMD=30
CHANGE_POINT_LINE_COMMENT=31
CHANGE_POINT_MULTILINE_COMMENT=32
CHANGE_POINT_WS=33
ON=34
WITH=35
ENRICH_POLICY_NAME=36
ENRICH_LINE_COMMENT=37
ENRICH_MULTILINE_COMMENT=38
ENRICH_WS=39
ENRICH_FIELD_LINE_COMMENT=40
ENRICH_FIELD_MULTILINE_COMMENT=41
ENRICH_FIELD_WS=42
SETTING=43
SETTING_LINE_COMMENT=44
SETTTING_MULTILINE_COMMENT=45
SETTING_WS=46
EXPLAIN_WS=47
EXPLAIN_LINE_COMMENT=48
EXPLAIN_MULTILINE_COMMENT=49
PIPE=50
QUOTED_STRING=51
INTEGER_LITERAL=52
DECIMAL_LITERAL=53
BY=54
AND=55
ASC=56
ASSIGN=57
CAST_OP=58
COLON=59
COMMA=60
DESC=61
DOT=62
FALSE=63
FIRST=64
IN=65
IS=66
LAST=67
LIKE=68
NOT=69
NULL=70
NULLS=71
OR=72
PARAM=73
RLIKE=74
TRUE=75
EQ=76
CIEQ=77
NEQ=78
LT=79
LTE=80
GT=81
GTE=82
PLUS=83
MINUS=84
ASTERISK=85
SLASH=86
PERCENT=87
LEFT_BRACES=88
RIGHT_BRACES=89
NAMED_OR_POSITIONAL_PARAM=90
OPENING_BRACKET=91
CLOSING_BRACKET=92
LP=93
RP=94
UNQUOTED_IDENTIFIER=95
QUOTED_IDENTIFIER=96
EXPR_LINE_COMMENT=97
EXPR_MULTILINE_COMMENT=98
EXPR_WS=99
METADATA=100
UNQUOTED_SOURCE=101
FROM_LINE_COMMENT=102
FROM_MULTILINE_COMMENT=103
FROM_WS=104
FORK_WS=105
FORK_LINE_COMMENT=106
FORK_MULTILINE_COMMENT=107
JOIN=108
USING=109
JOIN_LINE_COMMENT=110
JOIN_MULTILINE_COMMENT=111
JOIN_WS=112
LOOKUP_LINE_COMMENT=113
LOOKUP_MULTILINE_COMMENT=114
LOOKUP_WS=115
LOOKUP_FIELD_LINE_COMMENT=116
LOOKUP_FIELD_MULTILINE_COMMENT=117
LOOKUP_FIELD_WS=118
METRICS_LINE_COMMENT=119
METRICS_MULTILINE_COMMENT=120
METRICS_WS=121
CLOSING_METRICS_LINE_COMMENT=122
CLOSING_METRICS_MULTILINE_COMMENT=123
CLOSING_METRICS_WS=124
MVEXPAND_LINE_COMMENT=125
MVEXPAND_MULTILINE_COMMENT=126
MVEXPAND_WS=127
ID_PATTERN=128
PROJECT_LINE_COMMENT=129
PROJECT_MULTILINE_COMMENT=130
PROJECT_WS=131
AS=132
RENAME_LINE_COMMENT=133
RENAME_MULTILINE_COMMENT=134
RENAME_WS=135
INFO=136
SHOW_LINE_COMMENT=137
SHOW_MULTILINE_COMMENT=138
SHOW_WS=139
DEV_RRF=28
RENAME=29
SHOW=30
UNKNOWN_CMD=31
CHANGE_POINT_LINE_COMMENT=32
CHANGE_POINT_MULTILINE_COMMENT=33
CHANGE_POINT_WS=34
ON=35
WITH=36
ENRICH_POLICY_NAME=37
ENRICH_LINE_COMMENT=38
ENRICH_MULTILINE_COMMENT=39
ENRICH_WS=40
ENRICH_FIELD_LINE_COMMENT=41
ENRICH_FIELD_MULTILINE_COMMENT=42
ENRICH_FIELD_WS=43
SETTING=44
SETTING_LINE_COMMENT=45
SETTTING_MULTILINE_COMMENT=46
SETTING_WS=47
EXPLAIN_WS=48
EXPLAIN_LINE_COMMENT=49
EXPLAIN_MULTILINE_COMMENT=50
PIPE=51
QUOTED_STRING=52
INTEGER_LITERAL=53
DECIMAL_LITERAL=54
BY=55
AND=56
ASC=57
ASSIGN=58
CAST_OP=59
COLON=60
COMMA=61
DESC=62
DOT=63
FALSE=64
FIRST=65
IN=66
IS=67
LAST=68
LIKE=69
NOT=70
NULL=71
NULLS=72
OR=73
PARAM=74
RLIKE=75
TRUE=76
EQ=77
CIEQ=78
NEQ=79
LT=80
LTE=81
GT=82
GTE=83
PLUS=84
MINUS=85
ASTERISK=86
SLASH=87
PERCENT=88
LEFT_BRACES=89
RIGHT_BRACES=90
NAMED_OR_POSITIONAL_PARAM=91
OPENING_BRACKET=92
CLOSING_BRACKET=93
LP=94
RP=95
UNQUOTED_IDENTIFIER=96
QUOTED_IDENTIFIER=97
EXPR_LINE_COMMENT=98
EXPR_MULTILINE_COMMENT=99
EXPR_WS=100
METADATA=101
UNQUOTED_SOURCE=102
FROM_LINE_COMMENT=103
FROM_MULTILINE_COMMENT=104
FROM_WS=105
FORK_WS=106
FORK_LINE_COMMENT=107
FORK_MULTILINE_COMMENT=108
JOIN=109
USING=110
JOIN_LINE_COMMENT=111
JOIN_MULTILINE_COMMENT=112
JOIN_WS=113
LOOKUP_LINE_COMMENT=114
LOOKUP_MULTILINE_COMMENT=115
LOOKUP_WS=116
LOOKUP_FIELD_LINE_COMMENT=117
LOOKUP_FIELD_MULTILINE_COMMENT=118
LOOKUP_FIELD_WS=119
METRICS_LINE_COMMENT=120
METRICS_MULTILINE_COMMENT=121
METRICS_WS=122
CLOSING_METRICS_LINE_COMMENT=123
CLOSING_METRICS_MULTILINE_COMMENT=124
CLOSING_METRICS_WS=125
MVEXPAND_LINE_COMMENT=126
MVEXPAND_MULTILINE_COMMENT=127
MVEXPAND_WS=128
ID_PATTERN=129
PROJECT_LINE_COMMENT=130
PROJECT_MULTILINE_COMMENT=131
PROJECT_WS=132
AS=133
RENAME_LINE_COMMENT=134
RENAME_MULTILINE_COMMENT=135
RENAME_WS=136
INFO=137
SHOW_LINE_COMMENT=138
SHOW_MULTILINE_COMMENT=139
SHOW_WS=140
'enrich'=5
'explain'=6
'dissect'=7
@ -152,51 +153,51 @@ SHOW_WS=139
'mv_expand'=24
'drop'=25
'keep'=26
'rename'=28
'show'=29
'on'=34
'with'=35
'|'=50
'by'=54
'and'=55
'asc'=56
'='=57
'::'=58
':'=59
','=60
'desc'=61
'.'=62
'false'=63
'first'=64
'in'=65
'is'=66
'last'=67
'like'=68
'not'=69
'null'=70
'nulls'=71
'or'=72
'?'=73
'rlike'=74
'true'=75
'=='=76
'=~'=77
'!='=78
'<'=79
'<='=80
'>'=81
'>='=82
'+'=83
'-'=84
'*'=85
'/'=86
'%'=87
'{'=88
'}'=89
']'=92
')'=94
'metadata'=100
'join'=108
'USING'=109
'as'=132
'info'=136
'rename'=29
'show'=30
'on'=35
'with'=36
'|'=51
'by'=55
'and'=56
'asc'=57
'='=58
'::'=59
':'=60
','=61
'desc'=62
'.'=63
'false'=64
'first'=65
'in'=66
'is'=67
'last'=68
'like'=69
'not'=70
'null'=71
'nulls'=72
'or'=73
'?'=74
'rlike'=75
'true'=76
'=='=77
'=~'=78
'!='=79
'<'=80
'<='=81
'>'=82
'>='=83
'+'=84
'-'=85
'*'=86
'/'=87
'%'=88
'{'=89
'}'=90
']'=93
')'=95
'metadata'=101
'join'=109
'USING'=110
'as'=133
'info'=137

View file

@ -61,6 +61,7 @@ processingCommand
| {this.isDevVersion()}? changePointCommand
| {this.isDevVersion()}? insistCommand
| {this.isDevVersion()}? forkCommand
| {this.isDevVersion()}? rrfCommand
;
whereCommand
@ -266,3 +267,7 @@ forkSubQueryProcessingCommand
| sortCommand
| limitCommand
;
rrfCommand
: DEV_RRF
;

View file

@ -25,118 +25,119 @@ MV_EXPAND=24
DROP=25
KEEP=26
DEV_INSIST=27
RENAME=28
SHOW=29
UNKNOWN_CMD=30
CHANGE_POINT_LINE_COMMENT=31
CHANGE_POINT_MULTILINE_COMMENT=32
CHANGE_POINT_WS=33
ON=34
WITH=35
ENRICH_POLICY_NAME=36
ENRICH_LINE_COMMENT=37
ENRICH_MULTILINE_COMMENT=38
ENRICH_WS=39
ENRICH_FIELD_LINE_COMMENT=40
ENRICH_FIELD_MULTILINE_COMMENT=41
ENRICH_FIELD_WS=42
SETTING=43
SETTING_LINE_COMMENT=44
SETTTING_MULTILINE_COMMENT=45
SETTING_WS=46
EXPLAIN_WS=47
EXPLAIN_LINE_COMMENT=48
EXPLAIN_MULTILINE_COMMENT=49
PIPE=50
QUOTED_STRING=51
INTEGER_LITERAL=52
DECIMAL_LITERAL=53
BY=54
AND=55
ASC=56
ASSIGN=57
CAST_OP=58
COLON=59
COMMA=60
DESC=61
DOT=62
FALSE=63
FIRST=64
IN=65
IS=66
LAST=67
LIKE=68
NOT=69
NULL=70
NULLS=71
OR=72
PARAM=73
RLIKE=74
TRUE=75
EQ=76
CIEQ=77
NEQ=78
LT=79
LTE=80
GT=81
GTE=82
PLUS=83
MINUS=84
ASTERISK=85
SLASH=86
PERCENT=87
LEFT_BRACES=88
RIGHT_BRACES=89
NAMED_OR_POSITIONAL_PARAM=90
OPENING_BRACKET=91
CLOSING_BRACKET=92
LP=93
RP=94
UNQUOTED_IDENTIFIER=95
QUOTED_IDENTIFIER=96
EXPR_LINE_COMMENT=97
EXPR_MULTILINE_COMMENT=98
EXPR_WS=99
METADATA=100
UNQUOTED_SOURCE=101
FROM_LINE_COMMENT=102
FROM_MULTILINE_COMMENT=103
FROM_WS=104
FORK_WS=105
FORK_LINE_COMMENT=106
FORK_MULTILINE_COMMENT=107
JOIN=108
USING=109
JOIN_LINE_COMMENT=110
JOIN_MULTILINE_COMMENT=111
JOIN_WS=112
LOOKUP_LINE_COMMENT=113
LOOKUP_MULTILINE_COMMENT=114
LOOKUP_WS=115
LOOKUP_FIELD_LINE_COMMENT=116
LOOKUP_FIELD_MULTILINE_COMMENT=117
LOOKUP_FIELD_WS=118
METRICS_LINE_COMMENT=119
METRICS_MULTILINE_COMMENT=120
METRICS_WS=121
CLOSING_METRICS_LINE_COMMENT=122
CLOSING_METRICS_MULTILINE_COMMENT=123
CLOSING_METRICS_WS=124
MVEXPAND_LINE_COMMENT=125
MVEXPAND_MULTILINE_COMMENT=126
MVEXPAND_WS=127
ID_PATTERN=128
PROJECT_LINE_COMMENT=129
PROJECT_MULTILINE_COMMENT=130
PROJECT_WS=131
AS=132
RENAME_LINE_COMMENT=133
RENAME_MULTILINE_COMMENT=134
RENAME_WS=135
INFO=136
SHOW_LINE_COMMENT=137
SHOW_MULTILINE_COMMENT=138
SHOW_WS=139
DEV_RRF=28
RENAME=29
SHOW=30
UNKNOWN_CMD=31
CHANGE_POINT_LINE_COMMENT=32
CHANGE_POINT_MULTILINE_COMMENT=33
CHANGE_POINT_WS=34
ON=35
WITH=36
ENRICH_POLICY_NAME=37
ENRICH_LINE_COMMENT=38
ENRICH_MULTILINE_COMMENT=39
ENRICH_WS=40
ENRICH_FIELD_LINE_COMMENT=41
ENRICH_FIELD_MULTILINE_COMMENT=42
ENRICH_FIELD_WS=43
SETTING=44
SETTING_LINE_COMMENT=45
SETTTING_MULTILINE_COMMENT=46
SETTING_WS=47
EXPLAIN_WS=48
EXPLAIN_LINE_COMMENT=49
EXPLAIN_MULTILINE_COMMENT=50
PIPE=51
QUOTED_STRING=52
INTEGER_LITERAL=53
DECIMAL_LITERAL=54
BY=55
AND=56
ASC=57
ASSIGN=58
CAST_OP=59
COLON=60
COMMA=61
DESC=62
DOT=63
FALSE=64
FIRST=65
IN=66
IS=67
LAST=68
LIKE=69
NOT=70
NULL=71
NULLS=72
OR=73
PARAM=74
RLIKE=75
TRUE=76
EQ=77
CIEQ=78
NEQ=79
LT=80
LTE=81
GT=82
GTE=83
PLUS=84
MINUS=85
ASTERISK=86
SLASH=87
PERCENT=88
LEFT_BRACES=89
RIGHT_BRACES=90
NAMED_OR_POSITIONAL_PARAM=91
OPENING_BRACKET=92
CLOSING_BRACKET=93
LP=94
RP=95
UNQUOTED_IDENTIFIER=96
QUOTED_IDENTIFIER=97
EXPR_LINE_COMMENT=98
EXPR_MULTILINE_COMMENT=99
EXPR_WS=100
METADATA=101
UNQUOTED_SOURCE=102
FROM_LINE_COMMENT=103
FROM_MULTILINE_COMMENT=104
FROM_WS=105
FORK_WS=106
FORK_LINE_COMMENT=107
FORK_MULTILINE_COMMENT=108
JOIN=109
USING=110
JOIN_LINE_COMMENT=111
JOIN_MULTILINE_COMMENT=112
JOIN_WS=113
LOOKUP_LINE_COMMENT=114
LOOKUP_MULTILINE_COMMENT=115
LOOKUP_WS=116
LOOKUP_FIELD_LINE_COMMENT=117
LOOKUP_FIELD_MULTILINE_COMMENT=118
LOOKUP_FIELD_WS=119
METRICS_LINE_COMMENT=120
METRICS_MULTILINE_COMMENT=121
METRICS_WS=122
CLOSING_METRICS_LINE_COMMENT=123
CLOSING_METRICS_MULTILINE_COMMENT=124
CLOSING_METRICS_WS=125
MVEXPAND_LINE_COMMENT=126
MVEXPAND_MULTILINE_COMMENT=127
MVEXPAND_WS=128
ID_PATTERN=129
PROJECT_LINE_COMMENT=130
PROJECT_MULTILINE_COMMENT=131
PROJECT_WS=132
AS=133
RENAME_LINE_COMMENT=134
RENAME_MULTILINE_COMMENT=135
RENAME_WS=136
INFO=137
SHOW_LINE_COMMENT=138
SHOW_MULTILINE_COMMENT=139
SHOW_WS=140
'enrich'=5
'explain'=6
'dissect'=7
@ -152,51 +153,51 @@ SHOW_WS=139
'mv_expand'=24
'drop'=25
'keep'=26
'rename'=28
'show'=29
'on'=34
'with'=35
'|'=50
'by'=54
'and'=55
'asc'=56
'='=57
'::'=58
':'=59
','=60
'desc'=61
'.'=62
'false'=63
'first'=64
'in'=65
'is'=66
'last'=67
'like'=68
'not'=69
'null'=70
'nulls'=71
'or'=72
'?'=73
'rlike'=74
'true'=75
'=='=76
'=~'=77
'!='=78
'<'=79
'<='=80
'>'=81
'>='=82
'+'=83
'-'=84
'*'=85
'/'=86
'%'=87
'{'=88
'}'=89
']'=92
')'=94
'metadata'=100
'join'=108
'USING'=109
'as'=132
'info'=136
'rename'=29
'show'=30
'on'=35
'with'=36
'|'=51
'by'=55
'and'=56
'asc'=57
'='=58
'::'=59
':'=60
','=61
'desc'=62
'.'=63
'false'=64
'first'=65
'in'=66
'is'=67
'last'=68
'like'=69
'not'=70
'null'=71
'nulls'=72
'or'=73
'?'=74
'rlike'=75
'true'=76
'=='=77
'=~'=78
'!='=79
'<'=80
'<='=81
'>'=82
'>='=83
'+'=84
'-'=85
'*'=86
'/'=87
'%'=88
'{'=89
'}'=90
']'=93
')'=95
'metadata'=101
'join'=109
'USING'=110
'as'=133
'info'=137

View file

@ -0,0 +1,10 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
lexer grammar Rrf;
DEV_RRF : {this.isDevVersion()}? 'rrf' -> pushMode(EXPRESSION_MODE);

View file

@ -860,7 +860,12 @@ public class EsqlCapabilities {
/**
* Allow mixed numeric types in conditional functions - case, greatest and least
*/
MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST;
MIXED_NUMERIC_TYPES_IN_CASE_GREATEST_LEAST,
/**
* Support for RRF command
*/
RRF(Build.current().isSnapshot());
private final boolean enabled;

View file

@ -70,6 +70,7 @@ import org.elasticsearch.xpack.esql.index.IndexResolution;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
@ -83,6 +84,7 @@ import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.join.Join;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinConfig;
@ -501,6 +503,14 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
return resolveFork(f, context);
}
if (plan instanceof Dedup dedup) {
return resolveDedup(dedup, childrenOutput);
}
if (plan instanceof RrfScoreEval rrf) {
return resolveRrfScoreEval(rrf, childrenOutput);
}
return plan.transformExpressionsOnly(UnresolvedAttribute.class, ua -> maybeResolveAttribute(ua, childrenOutput));
}
@ -753,6 +763,54 @@ public class Analyzer extends ParameterizedRuleExecutor<LogicalPlan, AnalyzerCon
return new FieldAttribute(attribute.source(), attribute.name(), new PotentiallyUnmappedKeywordEsField(attribute.name()));
}
private LogicalPlan resolveDedup(Dedup dedup, List<Attribute> childrenOutput) {
List<NamedExpression> aggregates = dedup.finalAggs();
List<Attribute> groupings = dedup.groupings();
List<NamedExpression> newAggs = new ArrayList<>();
List<Attribute> newGroupings = new ArrayList<>();
for (NamedExpression agg : aggregates) {
var newAgg = (NamedExpression) agg.transformUp(UnresolvedAttribute.class, ua -> {
Expression ne = ua;
Attribute maybeResolved = maybeResolveAttribute(ua, childrenOutput);
if (maybeResolved != null) {
ne = maybeResolved;
}
return ne;
});
newAggs.add(newAgg);
}
for (Attribute attr : groupings) {
if (attr instanceof UnresolvedAttribute ua) {
newGroupings.add(resolveAttribute(ua, childrenOutput));
} else {
newGroupings.add(attr);
}
}
return new Dedup(dedup.source(), dedup.child(), newAggs, newGroupings);
}
private LogicalPlan resolveRrfScoreEval(RrfScoreEval rrf, List<Attribute> childrenOutput) {
Attribute scoreAttr = rrf.scoreAttribute();
Attribute forkAttr = rrf.forkAttribute();
if (scoreAttr instanceof UnresolvedAttribute ua) {
scoreAttr = resolveAttribute(ua, childrenOutput);
}
if (forkAttr instanceof UnresolvedAttribute ua) {
forkAttr = resolveAttribute(ua, childrenOutput);
}
if (forkAttr != rrf.forkAttribute() || scoreAttr != rrf.scoreAttribute()) {
return new RrfScoreEval(rrf.source(), rrf.child(), scoreAttr, forkAttr);
}
return rrf;
}
private Attribute maybeResolveAttribute(UnresolvedAttribute ua, List<Attribute> childrenOutput) {
return maybeResolveAttribute(ua, childrenOutput, log);
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -680,6 +680,18 @@ public class EsqlBaseParserBaseListener implements EsqlBaseParserListener {
* <p>The default implementation does nothing.</p>
*/
@Override public void exitForkSubQueryProcessingCommand(EsqlBaseParser.ForkSubQueryProcessingCommandContext ctx) { }
/**
* {@inheritDoc}
*
* <p>The default implementation does nothing.</p>
*/
@Override public void enterRrfCommand(EsqlBaseParser.RrfCommandContext ctx) { }
/**
* {@inheritDoc}
*
* <p>The default implementation does nothing.</p>
*/
@Override public void exitRrfCommand(EsqlBaseParser.RrfCommandContext ctx) { }
/**
* {@inheritDoc}
*

View file

@ -405,6 +405,13 @@ public class EsqlBaseParserBaseVisitor<T> extends AbstractParseTreeVisitor<T> im
* {@link #visitChildren} on {@code ctx}.</p>
*/
@Override public T visitForkSubQueryProcessingCommand(EsqlBaseParser.ForkSubQueryProcessingCommandContext ctx) { return visitChildren(ctx); }
/**
* {@inheritDoc}
*
* <p>The default implementation returns the result of calling
* {@link #visitChildren} on {@code ctx}.</p>
*/
@Override public T visitRrfCommand(EsqlBaseParser.RrfCommandContext ctx) { return visitChildren(ctx); }
/**
* {@inheritDoc}
*

View file

@ -581,6 +581,16 @@ public interface EsqlBaseParserListener extends ParseTreeListener {
* @param ctx the parse tree
*/
void exitForkSubQueryProcessingCommand(EsqlBaseParser.ForkSubQueryProcessingCommandContext ctx);
/**
* Enter a parse tree produced by {@link EsqlBaseParser#rrfCommand}.
* @param ctx the parse tree
*/
void enterRrfCommand(EsqlBaseParser.RrfCommandContext ctx);
/**
* Exit a parse tree produced by {@link EsqlBaseParser#rrfCommand}.
* @param ctx the parse tree
*/
void exitRrfCommand(EsqlBaseParser.RrfCommandContext ctx);
/**
* Enter a parse tree produced by the {@code matchExpression}
* labeled alternative in {@link EsqlBaseParser#booleanExpression}.

View file

@ -356,6 +356,12 @@ public interface EsqlBaseParserVisitor<T> extends ParseTreeVisitor<T> {
* @return the visitor result
*/
T visitForkSubQueryProcessingCommand(EsqlBaseParser.ForkSubQueryProcessingCommandContext ctx);
/**
* Visit a parse tree produced by {@link EsqlBaseParser#rrfCommand}.
* @param ctx the parse tree
* @return the visitor result
*/
T visitRrfCommand(EsqlBaseParser.RrfCommandContext ctx);
/**
* Visit a parse tree produced by the {@code matchExpression}
* labeled alternative in {@link EsqlBaseParser#booleanExpression}.

View file

@ -16,6 +16,7 @@ import org.elasticsearch.core.Tuple;
import org.elasticsearch.dissect.DissectException;
import org.elasticsearch.dissect.DissectParser;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
@ -40,9 +41,11 @@ import org.elasticsearch.xpack.esql.core.util.Holder;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.UnresolvedNamePattern;
import org.elasticsearch.xpack.esql.expression.function.UnresolvedFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Sum;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.ChangePoint;
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@ -61,6 +64,7 @@ import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
@ -655,10 +659,10 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
// align _fork id across all fork branches
Alias alias = null;
if (firstForkNameId == null) {
alias = new Alias(source(ctx), "_fork", literal);
alias = new Alias(source(ctx), Fork.FORK_FIELD, literal);
firstForkNameId = alias.id();
} else {
alias = new Alias(source(ctx), "_fork", literal, firstForkNameId);
alias = new Alias(source(ctx), Fork.FORK_FIELD, literal, firstForkNameId);
}
var finalAlias = alias;
@ -686,4 +690,29 @@ public class LogicalPlanBuilder extends ExpressionBuilder {
PlanFactory makePlan = typedParsing(this, ctx.forkSubQueryProcessingCommand(), PlanFactory.class);
return input -> makePlan.apply(lowerPlan.apply(input));
}
@Override
public PlanFactory visitRrfCommand(EsqlBaseParser.RrfCommandContext ctx) {
return input -> {
Source source = source(ctx);
Attribute scoreAttr = new UnresolvedAttribute(source, MetadataAttribute.SCORE);
Attribute forkAttr = new UnresolvedAttribute(source, Fork.FORK_FIELD);
Attribute idAttr = new UnresolvedAttribute(source, IdFieldMapper.NAME);
Attribute indexAttr = new UnresolvedAttribute(source, MetadataAttribute.INDEX);
List<NamedExpression> aggregates = List.of(
new Alias(source, MetadataAttribute.SCORE, new Sum(source, scoreAttr, new Literal(source, true, DataType.BOOLEAN)))
);
List<Attribute> groupings = List.of(idAttr, indexAttr);
LogicalPlan dedup = new Dedup(source, new RrfScoreEval(source, input, scoreAttr, forkAttr), aggregates, groupings);
List<Order> order = List.of(
new Order(source, scoreAttr, Order.OrderDirection.DESC, Order.NullsPosition.LAST),
new Order(source, idAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST),
new Order(source, indexAttr, Order.OrderDirection.ASC, Order.NullsPosition.LAST)
);
return new OrderBy(source, dedup, order);
};
}
}

View file

@ -0,0 +1,111 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.logical;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.capabilities.Resolvables;
import org.elasticsearch.xpack.esql.core.expression.Alias;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.NamedExpressions;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Values;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Removes the rows that contain the same values for a list specified fields.
* Dedup also receives a list of aggregates similar to {@link Aggregate STATS}.
* In the current implementation Dedup implements {@link SurrogateLogicalPlan} and actually expands to {@link Aggregate STATS}.
* At the moment this is only used in the planning of the RRF command, but could evolve as a standalone command.
*/
public class Dedup extends UnaryPlan implements SurrogateLogicalPlan {
private final List<NamedExpression> aggregates;
private final List<Attribute> groupings;
private List<Attribute> lazyOutput;
private List<NamedExpression> lazyFinalAggs;
public Dedup(Source source, LogicalPlan child, List<NamedExpression> aggregates, List<Attribute> groupings) {
super(source, child);
this.aggregates = aggregates;
this.groupings = groupings;
}
@Override
public String getWriteableName() {
throw new UnsupportedOperationException("not serialized");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("not serialized");
}
@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, Dedup::new, child(), aggregates, groupings);
}
@Override
public boolean expressionsResolved() {
return Resolvables.resolved(aggregates) && Resolvables.resolved(groupings);
}
@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new Dedup(source(), newChild, aggregates, groupings);
}
@Override
public LogicalPlan surrogate() {
return new Aggregate(source(), child(), Aggregate.AggregateType.STANDARD, new ArrayList<>(groupings), finalAggs());
}
public List<NamedExpression> aggregates() {
return aggregates;
}
public List<Attribute> groupings() {
return groupings;
}
public List<NamedExpression> finalAggs() {
if (lazyFinalAggs == null) {
lazyFinalAggs = new ArrayList<>(aggregates);
Set<String> names = new HashSet<>(aggregates.stream().map(att -> att.name()).toList());
Expression aggFilter = new Literal(source(), true, DataType.BOOLEAN);
for (Attribute attr : child().output()) {
if (names.contains(attr.name())) {
continue;
}
lazyFinalAggs.add(new Alias(source(), attr.name(), new Values(source(), attr, aggFilter)));
}
}
return lazyFinalAggs;
}
@Override
public List<Attribute> output() {
if (lazyOutput == null) {
lazyOutput = NamedExpressions.mergeOutputAttributes(finalAggs(), child().output());
}
return lazyOutput;
}
}

View file

@ -23,6 +23,7 @@ import java.util.stream.Stream;
* {@code FORK [WHERE content:"fox" ] [WHERE content:"dog"] }
*/
public class Fork extends UnaryPlan implements SurrogateLogicalPlan {
public static final String FORK_FIELD = "_fork";
private final List<LogicalPlan> subPlans;
List<Attribute> lazyOutput;

View file

@ -0,0 +1,96 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.logical;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.capabilities.PostAnalysisVerificationAware;
import org.elasticsearch.xpack.esql.common.Failures;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.xpack.esql.common.Failure.fail;
public class RrfScoreEval extends UnaryPlan implements PostAnalysisVerificationAware {
private final Attribute forkAttr;
private final Attribute scoreAttr;
public RrfScoreEval(Source source, LogicalPlan child, Attribute scoreAttr, Attribute forkAttr) {
super(source, child);
this.scoreAttr = scoreAttr;
this.forkAttr = forkAttr;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("not serialized");
}
@Override
public String getWriteableName() {
throw new UnsupportedOperationException("not serialized");
}
@Override
protected NodeInfo<? extends LogicalPlan> info() {
return NodeInfo.create(this, RrfScoreEval::new, child(), scoreAttr, forkAttr);
}
@Override
public boolean expressionsResolved() {
return scoreAttr.resolved() && forkAttr.resolved();
}
@Override
public UnaryPlan replaceChild(LogicalPlan newChild) {
return new RrfScoreEval(source(), newChild, scoreAttr, forkAttr);
}
public Attribute scoreAttribute() {
return scoreAttr;
}
public Attribute forkAttribute() {
return forkAttr;
}
@Override
public void postAnalysisVerification(Failures failures) {
if (this.child() instanceof Fork == false) {
failures.add(
fail(
this,
"Invalid use of RRF. RRF can only be used after FORK, but found {}",
child().sourceText().split(" ")[0].toUpperCase(Locale.ROOT)
)
);
}
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), scoreAttr, forkAttr);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
RrfScoreEval rrf = (RrfScoreEval) obj;
return child().equals(rrf.child()) && scoreAttr.equals(rrf.scoreAttribute()) && forkAttr.equals(forkAttribute());
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.plan.physical;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.AttributeSet;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import java.io.IOException;
import java.util.List;
public class RrfScoreEvalExec extends UnaryExec {
private final Attribute scoreAttr;
private final Attribute forkAttr;
public RrfScoreEvalExec(Source source, PhysicalPlan child, Attribute scoreAttr, Attribute forkAttr) {
super(source, child);
this.scoreAttr = scoreAttr;
this.forkAttr = forkAttr;
}
@Override
public String getWriteableName() {
throw new UnsupportedOperationException("not serialized");
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("not serialized");
}
@Override
protected NodeInfo<? extends PhysicalPlan> info() {
return NodeInfo.create(this, RrfScoreEvalExec::new, child(), scoreAttr, forkAttr);
}
@Override
public UnaryExec replaceChild(PhysicalPlan newChild) {
return new RrfScoreEvalExec(source(), newChild, scoreAttr, forkAttr);
}
@Override
protected AttributeSet computeReferences() {
return new AttributeSet(List.of(scoreAttr, forkAttr));
}
}

View file

@ -34,6 +34,7 @@ import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.Operator.OperatorFactory;
import org.elasticsearch.compute.operator.OutputOperator.OutputOperatorFactory;
import org.elasticsearch.compute.operator.RowInTableLookupOperator;
import org.elasticsearch.compute.operator.RrfScoreEvalOperator;
import org.elasticsearch.compute.operator.ShowOperator;
import org.elasticsearch.compute.operator.SinkOperator;
import org.elasticsearch.compute.operator.SinkOperator.SinkOperatorFactory;
@ -62,6 +63,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.TypedAttribute;
@ -75,6 +77,7 @@ import org.elasticsearch.xpack.esql.enrich.LookupFromIndexService;
import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
import org.elasticsearch.xpack.esql.evaluator.command.GrokEvaluatorExtracter;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.plan.logical.Fork;
import org.elasticsearch.xpack.esql.plan.physical.AggregateExec;
import org.elasticsearch.xpack.esql.plan.physical.ChangePointExec;
import org.elasticsearch.xpack.esql.plan.physical.DissectExec;
@ -97,6 +100,7 @@ import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
import org.elasticsearch.xpack.esql.planner.EsPhysicalOperationProviders.ShardContext;
@ -266,11 +270,40 @@ public class LocalExecutionPlanner {
return planExchangeSink(exchangeSink, context);
} else if (node instanceof MergeExec mergeExec) {
return planMerge(mergeExec, context);
} else if (node instanceof RrfScoreEvalExec rrf) {
return planRrfScoreEvalExec(rrf, context);
}
throw new EsqlIllegalArgumentException("unknown physical plan node [" + node.nodeName() + "]");
}
private PhysicalOperation planRrfScoreEvalExec(RrfScoreEvalExec rrf, LocalExecutionPlannerContext context) {
PhysicalOperation source = plan(rrf.child(), context);
int scorePosition = -1;
int forkPosition = -1;
int pos = 0;
for (Attribute attr : rrf.child().output()) {
if (attr.name().equals(Fork.FORK_FIELD)) {
forkPosition = pos;
}
if (attr.name().equals(MetadataAttribute.SCORE)) {
scorePosition = pos;
}
pos += 1;
}
if (scorePosition == -1) {
throw new IllegalStateException("can't find _score attribute position");
}
if (forkPosition == -1) {
throw new IllegalStateException("can'find _fork attribute position");
}
return source.with(new RrfScoreEvalOperator.Factory(forkPosition, scorePosition), source.layout);
}
private PhysicalOperation planAggregation(AggregateExec aggregate, LocalExecutionPlannerContext context) {
var source = plan(aggregate.child(), context);
return physicalOperationProviders.groupingPhysicalOperation(aggregate, source, context);

View file

@ -23,6 +23,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LeafPlan;
import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.MvExpand;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.UnaryPlan;
import org.elasticsearch.xpack.esql.plan.logical.local.LocalRelation;
import org.elasticsearch.xpack.esql.plan.logical.show.ShowInfo;
@ -37,6 +38,7 @@ import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
import org.elasticsearch.xpack.esql.plan.physical.ShowExec;
import org.elasticsearch.xpack.esql.planner.AbstractPhysicalOperationProviders;
@ -111,6 +113,10 @@ class MapperUtils {
);
}
if (p instanceof RrfScoreEval rrf) {
return new RrfScoreEvalExec(rrf.source(), child, rrf.scoreAttribute(), rrf.forkAttribute());
}
return unsupported(p);
}

View file

@ -28,6 +28,7 @@ import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.MapExpression;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
@ -51,6 +52,7 @@ import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.parser.QueryParams;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
import org.elasticsearch.xpack.esql.plan.logical.EsRelation;
import org.elasticsearch.xpack.esql.plan.logical.Eval;
@ -62,6 +64,7 @@ import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan;
import org.elasticsearch.xpack.esql.plan.logical.Lookup;
import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.join.StubRelation;
import org.elasticsearch.xpack.esql.plan.logical.local.EsqlProject;
@ -2908,6 +2911,91 @@ public class AnalyzerTests extends ESTestCase {
assertThat(pe.getMessage(), containsString("mismatched input 'me' expecting INTEGER_LITERAL"));
}
public void testValidRrf() {
assumeTrue("requires RRF capability", EsqlCapabilities.Cap.RRF.isEnabled());
LogicalPlan plan = analyze("""
from test metadata _id, _index, _score
| fork ( where first_name:"foo" )
( where first_name:"bar" )
| rrf
""");
Limit limit = as(plan, Limit.class);
OrderBy orderBy = as(limit.child(), OrderBy.class);
assertThat(orderBy.order().size(), equalTo(3));
assertThat(orderBy.order().get(0).child(), instanceOf(ReferenceAttribute.class));
assertThat(((ReferenceAttribute) orderBy.order().get(0).child()).name(), equalTo("_score"));
assertThat(orderBy.order().get(1).child(), instanceOf(ReferenceAttribute.class));
assertThat(((ReferenceAttribute) orderBy.order().get(1).child()).name(), equalTo("_id"));
assertThat(orderBy.order().get(2).child(), instanceOf(ReferenceAttribute.class));
assertThat(((ReferenceAttribute) orderBy.order().get(2).child()).name(), equalTo("_index"));
Dedup dedup = as(orderBy.child(), Dedup.class);
assertThat(dedup.groupings().size(), equalTo(2));
assertThat(dedup.aggregates().size(), equalTo(15));
RrfScoreEval rrf = as(dedup.child(), RrfScoreEval.class);
assertThat(rrf.scoreAttribute(), instanceOf(MetadataAttribute.class));
assertThat(rrf.scoreAttribute().name(), equalTo("_score"));
assertThat(rrf.forkAttribute(), instanceOf(ReferenceAttribute.class));
assertThat(rrf.forkAttribute().name(), equalTo("_fork"));
assertThat(rrf.child(), instanceOf(Fork.class));
}
public void testRrfError() {
assumeTrue("requires RRF capability", EsqlCapabilities.Cap.FORK.isEnabled());
var e = expectThrows(VerificationException.class, () -> analyze("""
from test
| rrf
"""));
assertThat(e.getMessage(), containsString("Unknown column [_score]"));
assertThat(e.getMessage(), containsString("Unknown column [_fork]"));
e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _score, _index, _id
| eval _fork = 1
| rrf
"""));
assertThat(e.getMessage(), containsString("RRF can only be used after FORK, but found EVAL"));
e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _id, _index, _score
| fork ( where first_name:"foo" )
( where first_name:"bar" )
| rrf
| rrf
"""));
assertThat(e.getMessage(), containsString("RRF can only be used after FORK, but found RRF"));
e = expectThrows(VerificationException.class, () -> analyze("""
from test
| FORK ( WHERE emp_no == 1 )
( WHERE emp_no > 1 )
| RRF
"""));
assertThat(e.getMessage(), containsString("Unknown column [_score]"));
e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _score, _id
| FORK ( WHERE emp_no == 1 )
( WHERE emp_no > 1 )
| RRF
"""));
assertThat(e.getMessage(), containsString("Unknown column [_index]"));
e = expectThrows(VerificationException.class, () -> analyze("""
from test metadata _score, _index
| FORK ( WHERE emp_no == 1 )
( WHERE emp_no > 1 )
| RRF
"""));
assertThat(e.getMessage(), containsString("Unknown column [_id]"));
}
// TODO There's too much boilerplate involved here! We need a better way of creating FieldCapabilitiesResponses from a mapping or index.
private static FieldCapabilitiesIndexResponse fieldCapabilitiesIndexResponse(
String indexName,

View file

@ -45,6 +45,7 @@ import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.Les
import org.elasticsearch.xpack.esql.expression.predicate.operator.comparison.LessThanOrEqual;
import org.elasticsearch.xpack.esql.plan.IndexPattern;
import org.elasticsearch.xpack.esql.plan.logical.Aggregate;
import org.elasticsearch.xpack.esql.plan.logical.Dedup;
import org.elasticsearch.xpack.esql.plan.logical.Dissect;
import org.elasticsearch.xpack.esql.plan.logical.Drop;
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
@ -63,6 +64,7 @@ import org.elasticsearch.xpack.esql.plan.logical.OrderBy;
import org.elasticsearch.xpack.esql.plan.logical.Project;
import org.elasticsearch.xpack.esql.plan.logical.Rename;
import org.elasticsearch.xpack.esql.plan.logical.Row;
import org.elasticsearch.xpack.esql.plan.logical.RrfScoreEval;
import org.elasticsearch.xpack.esql.plan.logical.UnresolvedRelation;
import org.elasticsearch.xpack.esql.plan.logical.join.JoinTypes;
import org.elasticsearch.xpack.esql.plan.logical.join.LookupJoin;
@ -3112,4 +3114,42 @@ public class StatementParserTests extends AbstractStatementParserTests {
static Alias alias(String name, Expression value) {
return new Alias(EMPTY, name, value);
}
public void testValidRrf() {
assumeTrue("RRF requires corresponding capability", EsqlCapabilities.Cap.RRF.isEnabled());
LogicalPlan plan = statement("""
FROM foo* METADATA _id, _index, _score
| FORK ( WHERE a:"baz" )
( WHERE b:"bar" )
| RRF
""");
var orderBy = as(plan, OrderBy.class);
assertThat(orderBy.order().size(), equalTo(3));
assertThat(orderBy.order().get(0).child(), instanceOf(UnresolvedAttribute.class));
assertThat(((UnresolvedAttribute) orderBy.order().get(0).child()).name(), equalTo("_score"));
assertThat(orderBy.order().get(1).child(), instanceOf(UnresolvedAttribute.class));
assertThat(((UnresolvedAttribute) orderBy.order().get(1).child()).name(), equalTo("_id"));
assertThat(orderBy.order().get(2).child(), instanceOf(UnresolvedAttribute.class));
assertThat(((UnresolvedAttribute) orderBy.order().get(2).child()).name(), equalTo("_index"));
var dedup = as(orderBy.child(), Dedup.class);
assertThat(dedup.groupings().size(), equalTo(2));
assertThat(dedup.groupings().get(0), instanceOf(UnresolvedAttribute.class));
assertThat(dedup.groupings().get(0).name(), equalTo("_id"));
assertThat(dedup.groupings().get(1), instanceOf(UnresolvedAttribute.class));
assertThat(dedup.groupings().get(1).name(), equalTo("_index"));
assertThat(dedup.aggregates().size(), equalTo(1));
assertThat(dedup.aggregates().get(0), instanceOf(Alias.class));
var rrfScoreEval = as(dedup.child(), RrfScoreEval.class);
assertThat(rrfScoreEval.scoreAttribute(), instanceOf(UnresolvedAttribute.class));
assertThat(rrfScoreEval.scoreAttribute().name(), equalTo("_score"));
assertThat(rrfScoreEval.forkAttribute(), instanceOf(UnresolvedAttribute.class));
assertThat(rrfScoreEval.forkAttribute().name(), equalTo("_fork"));
assertThat(rrfScoreEval.child(), instanceOf(Fork.class));
}
}

View file

@ -58,6 +58,7 @@ import org.elasticsearch.xpack.esql.plan.physical.MergeExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.type.EsFieldTests;
import org.mockito.exceptions.base.MockitoException;
import java.io.IOException;
@ -503,6 +504,9 @@ public class EsqlNodeSubclassTests<T extends B, B extends Node<B>> extends NodeS
if (argClass == Configuration.class) {
return randomConfiguration();
}
if (argClass == EsField.class) {
return EsFieldTests.randomEsField(4);
}
if (argClass == EsIndex.class) {
return randomEsIndex();
}