Support value retrieval in top_hits (#95828)

This is used when the `top_hits` output is passed to pipeline
aggregators like bucket selectors. The logic retrieves the requested
field from the source of the first SearchHit. This implies that (a) the
spec of the wrapping aggregator (e.g. `bucket_path`) points to an
appropriate field using a bracketed reference (e.g.
`my_top_hits[my_metric]`) and (b) the `top_hits` contains a `size: 1`
setting.

This PR also includes extensions to YAML tests for `top_metrics` and
`top_hits` to cover the cases where these are used in pipeline
aggregations through `bucket_selector`, similar to a HAVING clause in
SQL.

Related to https://github.com/elastic/elasticsearch/issues/73429.
This commit is contained in:
Kostas Krikellas 2023-05-15 16:21:11 +03:00 committed by GitHub
parent a02ffaf351
commit deffa800db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 659 additions and 2 deletions

View file

@ -0,0 +1,5 @@
pr: 95828
summary: Support value retrieval in `top_hits`
area: Aggregations
type: enhancement
issues: []

View file

@ -416,3 +416,44 @@ Which returns the much more expected:
}
----
// TESTRESPONSE
===== Use in pipeline aggregations
`top_metrics` can be used in pipeline aggregations that consume a single value per bucket, such as `bucket_selector`
that applies per bucket filtering, similar to using a HAVING clause in SQL. This requires setting `size` to 1, and
specifying the right path for the (single) metric to be passed to the wrapping aggregator. For example:
[source,console]
----
POST /test*/_search?filter_path=aggregations
{
"aggs": {
"ip": {
"terms": {
"field": "ip"
},
"aggs": {
"tm": {
"top_metrics": {
"metrics": {"field": "m"},
"sort": {"s": "desc"},
"size": 1
}
},
"having_tm": {
"bucket_selector": {
"buckets_path": {
"top_m": "tm[m]"
},
"script": "params.top_m < 1000"
}
}
}
}
}
}
----
// TEST[continued]
The `bucket_path` uses the `top_metrics` name `tm` and a keyword for the metric providing the aggregate value,
namely `m`.

View file

@ -415,3 +415,56 @@ the second slow of the `nested_child_field` field:
...
--------------------------------------------------
// NOTCONSOLE
==== Use in pipeline aggregations
`top_hits` can be used in pipeline aggregations that consume a single value per bucket, such as `bucket_selector`
that applies per bucket filtering, similar to using a HAVING clause in SQL. This requires setting `size` to 1, and
specifying the right path for the value to be passed to the wrapping aggregator. The latter can be a `_source`, a
`_sort` or a `_score` value. For example:
[source,console]
--------------------------------------------------
POST /sales/_search?size=0
{
"aggs": {
"top_tags": {
"terms": {
"field": "type",
"size": 3
},
"aggs": {
"top_sales_hits": {
"top_hits": {
"sort": [
{
"date": {
"order": "desc"
}
}
],
"_source": {
"includes": [ "date", "price" ]
},
"size": 1
}
},
"having.top_salary": {
"bucket_selector": {
"buckets_path": {
"tp": "top_sales_hits[_source.price]"
},
"script": "params.tp < 180"
}
}
}
}
}
}
--------------------------------------------------
// TEST[setup:sales]
The `bucket_path` uses the `top_hits` name `top_sales_hits` and a keyword for the field providing the aggregate value,
namely `_source` field `price` in the example above. Other options include `top_sales_hits[_sort]`, for filtering on the
sort value `date` above, and `top_sales_hits[_score]`, for filtering on the score of the top hit.

View file

@ -428,3 +428,295 @@ synthetic _source:
- match:
aggregations.page.buckets.1.top_hits.hits.hits.0._source:
extra: bar
---
"having":
- skip:
version: " - 8.8.99"
reason: fixed in 8.9.0
- do:
bulk:
index: test2
refresh: true
body:
- { index: { } }
- { gender: 1, salary: 1000, birth_date: 1981 }
- { index: { } }
- { gender: 1, salary: 2000, birth_date: 1982 }
- { index: { } }
- { gender: 1, salary: 3000, birth_date: 1981 }
- { index: { } }
- { gender: 1, salary: 4000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 6000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 7000, birth_date: 1981 }
- { index: { } }
- { gender: 2, salary: 8000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 9000, birth_date: 1981 }
# Similar to a SQL query for top salaries per birth date, grouped by gender:
#
# SELECT gender, FIRST(salary, birth_date) as first
# FROM test_emp
# GROUP BY gender
# ORDER BY salary desc
- do:
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 1
- length: { aggregations.genders.buckets: 2}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982}
- match: { aggregations.genders.buckets.1.top_salary_hits.hits.total.value: 4}
- match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.gender: 2}
- match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.salary: 9000}
- match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.birth_date: 1981}
# Similar to a SQL query with HAVING clause:
#
# SELECT gender, FIRST(salary, birth_date) as first
# FROM test_emp
# GROUP BY gender
# HAVING first < 80000
# ORDER BY salary desc
- do:
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[_source.salary]
script: "params.ts < 8000"
# The bucket for gender '2' gets filtered out by the selector.
- length: { aggregations.genders.buckets: 1}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982}
- do:
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[_sort]
script: "params.ts < 8000"
# Bucket path on sort value works the same.
- length: { aggregations.genders.buckets: 1 }
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4 }
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1 }
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982 }
- do:
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[_score]
script: "params.ts < 8000"
# Bucket path on score is supported. Here, score is NaN so all values are filtered out.
- length: { aggregations.genders.buckets: 0 }
- do:
bulk:
index: test2
body:
- { index: { } }
- { gender: 3 }
- do:
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[_source.salary]
script: "params.ts < 8000"
# Empty bucket for gender '3' affects nothing.
- length: { aggregations.genders.buckets: 1}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000}
- match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982}
- do:
catch: /path not supported for \[top_salary_hits\]:\ \[_source.nosuchfield\]./
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[_source.nosuchfield]
script: "params.ts < 8000"
- do:
catch: / No aggregation found for path \[nosuchagg\[_source.salary\]\]/
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: nosuchagg[_source.salary]
script: "params.ts < 8000"
- do:
catch: /property paths for top_hits \[top_salary_hits\] require configuring it with size to 1/
search:
index: test2
size: 0
body:
aggs:
genders:
terms:
field: gender
aggs:
top_salary_hits:
top_hits:
sort:
salary:
order: "desc"
_source:
includes:
- gender
- salary
- birth_date
size: 10
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary_hits[salary]
script: "params.ts < 8000"

View file

@ -28,6 +28,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
/**
@ -181,13 +182,52 @@ public class InternalTopHits extends InternalAggregation implements TopHits {
return true;
}
// Supported property prefixes.
private static final String SOURCE = "_source";
private static final String SORT_VALUE = "_sort";
private static final String SCORE = "_score";
@Override
public Object getProperty(List<String> path) {
if (path.isEmpty()) {
return this;
} else {
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
if (path.size() != 1) {
throw new IllegalArgumentException(
"property paths for top_hits ["
+ getName()
+ "] can only contain a single field in _source, score or sort values, got "
+ path
);
}
String[] tokens = path.get(0).toLowerCase(Locale.ROOT).split(":|>|\\.");
if (searchHits.getHits().length > 1) {
throw new IllegalArgumentException("property paths for top_hits [" + getName() + "] require configuring it with size to 1");
}
SearchHit topHit = searchHits.getAt(0);
if (tokens[0].equals(SORT_VALUE)) {
Object[] sortValues = topHit.getSortValues();
if (sortValues != null) {
if (sortValues.length != 1) {
throw new IllegalArgumentException(
"property path for top_hits [\" + getName() + \"] requires a single sort value, got " + sortValues.length
);
}
return sortValues[0];
}
} else if (tokens[0].equals(SCORE)) {
return topHit.getScore();
} else if (tokens[0].equals(SOURCE)) {
Map<String, Object> sourceAsMap = topHit.getSourceAsMap();
if (sourceAsMap != null) {
Object property = sourceAsMap.get(tokens[1]);
if (property != null) {
return property;
}
}
}
throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path);
}
@Override

View file

@ -24,16 +24,19 @@ import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.support.SamplingContext;
import org.elasticsearch.search.lookup.Source;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.test.NotEqualMessageBuilder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import java.io.IOException;
@ -275,6 +278,33 @@ public class InternalTopHitsTests extends InternalAggregationTestCase<InternalTo
assertEqualsWithErrorMessageFromXContent(expectedHits, actualHits);
}
public void testGetProperty() {
// Create a SearchHit containing: { "foo": 1000.0 } and use it to initialize an InternalTopHits instance.
SearchHit hit = new SearchHit(0);
hit = hit.sourceRef(Source.fromMap(Map.of("foo", 1000.0), XContentType.YAML).internalSourceRef());
hit.sortValues(new Object[] { 10.0 }, new DocValueFormat[] { DocValueFormat.RAW });
hit.score(1.0f);
SearchHits hits = new SearchHits(new SearchHit[] { hit }, null, 0);
InternalTopHits internalTopHits = new InternalTopHits("test", 0, 0, null, hits, null);
assertEquals(internalTopHits, internalTopHits.getProperty(Collections.emptyList()));
assertEquals(1000.0, internalTopHits.getProperty(List.of("_source.foo")));
assertEquals(10.0, internalTopHits.getProperty(List.of("_sort")));
assertEquals(1.0f, internalTopHits.getProperty(List.of("_score")));
expectThrows(IllegalArgumentException.class, () -> internalTopHits.getProperty(List.of("nosuchfield")));
expectThrows(IllegalArgumentException.class, () -> internalTopHits.getProperty(List.of("too", "many", "fields")));
// Sort value retrieval requires a single value.
hit.sortValues(new Object[] { 10.0, 20.0 }, new DocValueFormat[] { DocValueFormat.RAW, DocValueFormat.RAW });
expectThrows(IllegalArgumentException.class, () -> internalTopHits.getProperty(List.of("_sort")));
// Two SearchHit instances are not allowed, only the first will be used without assertion.
hits = new SearchHits(new SearchHit[] { hit, hit }, null, 0);
InternalTopHits internalTopHits3 = new InternalTopHits("test", 0, 0, null, hits, null);
expectThrows(IllegalArgumentException.class, () -> internalTopHits3.getProperty(List.of("foo")));
}
@Override
protected boolean supportsSampling() {
return true;

View file

@ -757,3 +757,199 @@
- length: { hits.hits: 2 }
- match: { aggregations.the_counter_top_metrics.top.0.sort: [1] }
- match: { aggregations.the_counter_top_metrics.top.0.metrics.counter_field: 2 }
---
"having":
- do:
bulk:
index: test
refresh: true
body:
- { index: { } }
- { gender: 1, salary: 1000, birth_date: 1981 }
- { index: { } }
- { gender: 1, salary: 2000, birth_date: 1982 }
- { index: { } }
- { gender: 1, salary: 3000, birth_date: 1981 }
- { index: { } }
- { gender: 1, salary: 4000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 6000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 7000, birth_date: 1981 }
- { index: { } }
- { gender: 2, salary: 8000, birth_date: 1982 }
- { index: { } }
- { gender: 2, salary: 9000, birth_date: 1981 }
# Similar to a SQL query for top salaries per birth date, grouped by gender:
#
# SELECT gender, FIRST(salary, birth_date) as first
# FROM test_emp
# GROUP BY gender
# ORDER BY gender
- do:
search:
index: test
size: 0
body:
aggs:
groupby:
composite:
sources:
- gender_comp:
terms:
field: gender
order: asc
aggs:
top_salary:
top_metrics:
metrics:
field: salary
sort:
birth_date: asc
size: 1
- length: { aggregations.groupby.buckets: 2}
- match: { aggregations.groupby.buckets.0.key.gender_comp: 1}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000}
- match: { aggregations.groupby.buckets.1.key.gender_comp: 2}
- match: { aggregations.groupby.buckets.1.top_salary.top.0.sort.0: 1981}
- match: { aggregations.groupby.buckets.1.top_salary.top.0.metrics.salary: 7000}
# Similar to a SQL query with HAVING clause:
#
# SELECT gender, FIRST(salary, birth_date) as first
# FROM test_emp
# GROUP BY gender
# HAVING first < 80000
# ORDER BY gender
- do:
search:
index: test
size: 0
body:
aggs:
groupby:
composite:
sources:
- gender_comp:
terms:
field: gender
order: asc
aggs:
top_salary:
top_metrics:
metrics:
field: salary
sort:
birth_date: asc
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary[salary]
script: "params.ts < 6000"
# The bucket for gender '2' gets filtered out by the selector.
- length: { aggregations.groupby.buckets: 1}
- match: { aggregations.groupby.buckets.0.key.gender_comp: 1}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000}
- do:
bulk:
index: test
body:
- { index: { } }
- { gender: 3 }
- do:
search:
index: test
size: 0
body:
aggs:
groupby:
composite:
sources:
- gender_comp:
terms:
field: gender
order: asc
aggs:
top_salary:
top_metrics:
metrics:
field: salary
sort:
birth_date: asc
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary[salary]
script: "params.ts < 6000"
# Empty bucket for gender '3' doesn't impact results.
- length: { aggregations.groupby.buckets: 1}
- match: { aggregations.groupby.buckets.0.key.gender_comp: 1}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981}
- match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000}
- do:
catch: /path not supported for \[top_salary\]:\ \[nosuchfield\]./
search:
index: test
size: 0
body:
aggs:
groupby:
composite:
sources:
- gender_comp:
terms:
field: gender
order: asc
aggs:
top_salary:
top_metrics:
metrics:
field: salary
sort:
birth_date: asc
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: top_salary[nosuchfield]
script: "params.ts < 6000"
- do:
catch: /No aggregation found for path \[nosuchagg\[salary\]\]/
search:
index: test
size: 0
body:
aggs:
groupby:
composite:
sources:
- gender_comp:
terms:
field: gender
order: asc
aggs:
top_salary:
top_metrics:
metrics:
field: salary
sort:
birth_date: asc
size: 1
having.top_salary:
bucket_selector:
buckets_path:
ts: nosuchagg[salary]
script: "params.ts < 6000"