From deffa800dba1e2d0df341dec71c57cd521d69397 Mon Sep 17 00:00:00 2001 From: Kostas Krikellas <131142368+kkrik-es@users.noreply.github.com> Date: Mon, 15 May 2023 16:21:11 +0300 Subject: [PATCH] Support value retrieval in top_hits (#95828) This is used when the `top_hits` output is passed to pipeline aggregators like bucket selectors. The logic retrieves the requested field from the source of the first SearchHit. This implies that (a) the spec of the wrapping aggregator (e.g. `bucket_path`) points to an appropriate field using a bracketed reference (e.g. `my_top_hits[my_metric]`) and (b) the `top_hits` contains a `size: 1` setting. This PR also includes extensions to YAML tests for `top_metrics` and `top_hits` to cover the cases where these are used in pipeline aggregations through `bucket_selector`, similar to a HAVING clause in SQL. Related to https://github.com/elastic/elasticsearch/issues/73429. --- docs/changelog/95828.yaml | 5 + .../metrics/top-metrics-aggregation.asciidoc | 41 +++ .../metrics/tophits-aggregation.asciidoc | 53 ++++ .../test/aggregations/top_hits.yml | 292 ++++++++++++++++++ .../aggregations/metrics/InternalTopHits.java | 44 ++- .../metrics/InternalTopHitsTests.java | 30 ++ .../test/analytics/top_metrics.yml | 196 ++++++++++++ 7 files changed, 659 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/95828.yaml diff --git a/docs/changelog/95828.yaml b/docs/changelog/95828.yaml new file mode 100644 index 000000000000..ebe327432128 --- /dev/null +++ b/docs/changelog/95828.yaml @@ -0,0 +1,5 @@ +pr: 95828 +summary: Support value retrieval in `top_hits` +area: Aggregations +type: enhancement +issues: [] diff --git a/docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc b/docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc index 559d229c54cd..4cc9d3850919 100644 --- a/docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/top-metrics-aggregation.asciidoc @@ -416,3 +416,44 @@ Which returns the much more expected: } ---- // TESTRESPONSE + +===== Use in pipeline aggregations + +`top_metrics` can be used in pipeline aggregations that consume a single value per bucket, such as `bucket_selector` +that applies per bucket filtering, similar to using a HAVING clause in SQL. This requires setting `size` to 1, and +specifying the right path for the (single) metric to be passed to the wrapping aggregator. For example: + +[source,console] +---- +POST /test*/_search?filter_path=aggregations +{ + "aggs": { + "ip": { + "terms": { + "field": "ip" + }, + "aggs": { + "tm": { + "top_metrics": { + "metrics": {"field": "m"}, + "sort": {"s": "desc"}, + "size": 1 + } + }, + "having_tm": { + "bucket_selector": { + "buckets_path": { + "top_m": "tm[m]" + }, + "script": "params.top_m < 1000" + } + } + } + } + } +} +---- +// TEST[continued] + +The `bucket_path` uses the `top_metrics` name `tm` and a keyword for the metric providing the aggregate value, +namely `m`. diff --git a/docs/reference/aggregations/metrics/tophits-aggregation.asciidoc b/docs/reference/aggregations/metrics/tophits-aggregation.asciidoc index 10ce5763b409..515ad38bd814 100644 --- a/docs/reference/aggregations/metrics/tophits-aggregation.asciidoc +++ b/docs/reference/aggregations/metrics/tophits-aggregation.asciidoc @@ -415,3 +415,56 @@ the second slow of the `nested_child_field` field: ... -------------------------------------------------- // NOTCONSOLE + +==== Use in pipeline aggregations + +`top_hits` can be used in pipeline aggregations that consume a single value per bucket, such as `bucket_selector` +that applies per bucket filtering, similar to using a HAVING clause in SQL. This requires setting `size` to 1, and +specifying the right path for the value to be passed to the wrapping aggregator. The latter can be a `_source`, a +`_sort` or a `_score` value. For example: + +[source,console] +-------------------------------------------------- +POST /sales/_search?size=0 +{ + "aggs": { + "top_tags": { + "terms": { + "field": "type", + "size": 3 + }, + "aggs": { + "top_sales_hits": { + "top_hits": { + "sort": [ + { + "date": { + "order": "desc" + } + } + ], + "_source": { + "includes": [ "date", "price" ] + }, + "size": 1 + } + }, + "having.top_salary": { + "bucket_selector": { + "buckets_path": { + "tp": "top_sales_hits[_source.price]" + }, + "script": "params.tp < 180" + } + } + } + } + } +} + +-------------------------------------------------- +// TEST[setup:sales] + +The `bucket_path` uses the `top_hits` name `top_sales_hits` and a keyword for the field providing the aggregate value, +namely `_source` field `price` in the example above. Other options include `top_sales_hits[_sort]`, for filtering on the +sort value `date` above, and `top_sales_hits[_score]`, for filtering on the score of the top hit. diff --git a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/top_hits.yml b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/top_hits.yml index 282034f6b1e5..14b2abac443f 100644 --- a/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/top_hits.yml +++ b/modules/aggregations/src/yamlRestTest/resources/rest-api-spec/test/aggregations/top_hits.yml @@ -428,3 +428,295 @@ synthetic _source: - match: aggregations.page.buckets.1.top_hits.hits.hits.0._source: extra: bar + + +--- +"having": + - skip: + version: " - 8.8.99" + reason: fixed in 8.9.0 + - do: + bulk: + index: test2 + refresh: true + body: + - { index: { } } + - { gender: 1, salary: 1000, birth_date: 1981 } + - { index: { } } + - { gender: 1, salary: 2000, birth_date: 1982 } + - { index: { } } + - { gender: 1, salary: 3000, birth_date: 1981 } + - { index: { } } + - { gender: 1, salary: 4000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 6000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 7000, birth_date: 1981 } + - { index: { } } + - { gender: 2, salary: 8000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 9000, birth_date: 1981 } + + # Similar to a SQL query for top salaries per birth date, grouped by gender: + # + # SELECT gender, FIRST(salary, birth_date) as first + # FROM test_emp + # GROUP BY gender + # ORDER BY salary desc + - do: + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 1 + + - length: { aggregations.genders.buckets: 2} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982} + - match: { aggregations.genders.buckets.1.top_salary_hits.hits.total.value: 4} + - match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.gender: 2} + - match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.salary: 9000} + - match: { aggregations.genders.buckets.1.top_salary_hits.hits.hits.0._source.birth_date: 1981} + + # Similar to a SQL query with HAVING clause: + # + # SELECT gender, FIRST(salary, birth_date) as first + # FROM test_emp + # GROUP BY gender + # HAVING first < 80000 + # ORDER BY salary desc + - do: + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[_source.salary] + script: "params.ts < 8000" + + # The bucket for gender '2' gets filtered out by the selector. + - length: { aggregations.genders.buckets: 1} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982} + + - do: + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[_sort] + script: "params.ts < 8000" + + # Bucket path on sort value works the same. + - length: { aggregations.genders.buckets: 1 } + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4 } + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1 } + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982 } + + - do: + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[_score] + script: "params.ts < 8000" + + # Bucket path on score is supported. Here, score is NaN so all values are filtered out. + - length: { aggregations.genders.buckets: 0 } + + - do: + bulk: + index: test2 + body: + - { index: { } } + - { gender: 3 } + - do: + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[_source.salary] + script: "params.ts < 8000" + + # Empty bucket for gender '3' affects nothing. + - length: { aggregations.genders.buckets: 1} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.total.value: 4} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.gender: 1} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.salary: 4000} + - match: { aggregations.genders.buckets.0.top_salary_hits.hits.hits.0._source.birth_date: 1982} + + - do: + catch: /path not supported for \[top_salary_hits\]:\ \[_source.nosuchfield\]./ + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[_source.nosuchfield] + script: "params.ts < 8000" + + - do: + catch: / No aggregation found for path \[nosuchagg\[_source.salary\]\]/ + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: nosuchagg[_source.salary] + script: "params.ts < 8000" + + - do: + catch: /property paths for top_hits \[top_salary_hits\] require configuring it with size to 1/ + search: + index: test2 + size: 0 + body: + aggs: + genders: + terms: + field: gender + aggs: + top_salary_hits: + top_hits: + sort: + salary: + order: "desc" + _source: + includes: + - gender + - salary + - birth_date + size: 10 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary_hits[salary] + script: "params.ts < 8000" diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java index 03c35eda056e..18b1f44ce5d7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/InternalTopHits.java @@ -28,6 +28,7 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; /** @@ -181,13 +182,52 @@ public class InternalTopHits extends InternalAggregation implements TopHits { return true; } + // Supported property prefixes. + private static final String SOURCE = "_source"; + private static final String SORT_VALUE = "_sort"; + private static final String SCORE = "_score"; + @Override public Object getProperty(List path) { if (path.isEmpty()) { return this; - } else { - throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); } + if (path.size() != 1) { + throw new IllegalArgumentException( + "property paths for top_hits [" + + getName() + + "] can only contain a single field in _source, score or sort values, got " + + path + ); + } + + String[] tokens = path.get(0).toLowerCase(Locale.ROOT).split(":|>|\\."); + if (searchHits.getHits().length > 1) { + throw new IllegalArgumentException("property paths for top_hits [" + getName() + "] require configuring it with size to 1"); + } + SearchHit topHit = searchHits.getAt(0); + if (tokens[0].equals(SORT_VALUE)) { + Object[] sortValues = topHit.getSortValues(); + if (sortValues != null) { + if (sortValues.length != 1) { + throw new IllegalArgumentException( + "property path for top_hits [\" + getName() + \"] requires a single sort value, got " + sortValues.length + ); + } + return sortValues[0]; + } + } else if (tokens[0].equals(SCORE)) { + return topHit.getScore(); + } else if (tokens[0].equals(SOURCE)) { + Map sourceAsMap = topHit.getSourceAsMap(); + if (sourceAsMap != null) { + Object property = sourceAsMap.get(tokens[1]); + if (property != null) { + return property; + } + } + } + throw new IllegalArgumentException("path not supported for [" + getName() + "]: " + path); } @Override diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalTopHitsTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalTopHitsTests.java index 945cba465b4e..35fe9c400888 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalTopHitsTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/metrics/InternalTopHitsTests.java @@ -24,16 +24,19 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Tuple; +import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.ParsedAggregation; import org.elasticsearch.search.aggregations.support.SamplingContext; +import org.elasticsearch.search.lookup.Source; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.InternalAggregationTestCase; import org.elasticsearch.test.NotEqualMessageBuilder; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; @@ -275,6 +278,33 @@ public class InternalTopHitsTests extends InternalAggregationTestCase internalTopHits.getProperty(List.of("nosuchfield"))); + expectThrows(IllegalArgumentException.class, () -> internalTopHits.getProperty(List.of("too", "many", "fields"))); + + // Sort value retrieval requires a single value. + hit.sortValues(new Object[] { 10.0, 20.0 }, new DocValueFormat[] { DocValueFormat.RAW, DocValueFormat.RAW }); + expectThrows(IllegalArgumentException.class, () -> internalTopHits.getProperty(List.of("_sort"))); + + // Two SearchHit instances are not allowed, only the first will be used without assertion. + hits = new SearchHits(new SearchHit[] { hit, hit }, null, 0); + InternalTopHits internalTopHits3 = new InternalTopHits("test", 0, 0, null, hits, null); + expectThrows(IllegalArgumentException.class, () -> internalTopHits3.getProperty(List.of("foo"))); + } + @Override protected boolean supportsSampling() { return true; diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/top_metrics.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/top_metrics.yml index 07cddcbf9eea..61ea58ee067a 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/top_metrics.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/analytics/top_metrics.yml @@ -757,3 +757,199 @@ - length: { hits.hits: 2 } - match: { aggregations.the_counter_top_metrics.top.0.sort: [1] } - match: { aggregations.the_counter_top_metrics.top.0.metrics.counter_field: 2 } + + +--- +"having": + - do: + bulk: + index: test + refresh: true + body: + - { index: { } } + - { gender: 1, salary: 1000, birth_date: 1981 } + - { index: { } } + - { gender: 1, salary: 2000, birth_date: 1982 } + - { index: { } } + - { gender: 1, salary: 3000, birth_date: 1981 } + - { index: { } } + - { gender: 1, salary: 4000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 6000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 7000, birth_date: 1981 } + - { index: { } } + - { gender: 2, salary: 8000, birth_date: 1982 } + - { index: { } } + - { gender: 2, salary: 9000, birth_date: 1981 } + + # Similar to a SQL query for top salaries per birth date, grouped by gender: + # + # SELECT gender, FIRST(salary, birth_date) as first + # FROM test_emp + # GROUP BY gender + # ORDER BY gender + - do: + search: + index: test + size: 0 + body: + aggs: + groupby: + composite: + sources: + - gender_comp: + terms: + field: gender + order: asc + aggs: + top_salary: + top_metrics: + metrics: + field: salary + sort: + birth_date: asc + size: 1 + + - length: { aggregations.groupby.buckets: 2} + - match: { aggregations.groupby.buckets.0.key.gender_comp: 1} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000} + - match: { aggregations.groupby.buckets.1.key.gender_comp: 2} + - match: { aggregations.groupby.buckets.1.top_salary.top.0.sort.0: 1981} + - match: { aggregations.groupby.buckets.1.top_salary.top.0.metrics.salary: 7000} + + # Similar to a SQL query with HAVING clause: + # + # SELECT gender, FIRST(salary, birth_date) as first + # FROM test_emp + # GROUP BY gender + # HAVING first < 80000 + # ORDER BY gender + - do: + search: + index: test + size: 0 + body: + aggs: + groupby: + composite: + sources: + - gender_comp: + terms: + field: gender + order: asc + aggs: + top_salary: + top_metrics: + metrics: + field: salary + sort: + birth_date: asc + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary[salary] + script: "params.ts < 6000" + + # The bucket for gender '2' gets filtered out by the selector. + - length: { aggregations.groupby.buckets: 1} + - match: { aggregations.groupby.buckets.0.key.gender_comp: 1} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000} + + - do: + bulk: + index: test + body: + - { index: { } } + - { gender: 3 } + - do: + search: + index: test + size: 0 + body: + aggs: + groupby: + composite: + sources: + - gender_comp: + terms: + field: gender + order: asc + aggs: + top_salary: + top_metrics: + metrics: + field: salary + sort: + birth_date: asc + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary[salary] + script: "params.ts < 6000" + + # Empty bucket for gender '3' doesn't impact results. + - length: { aggregations.groupby.buckets: 1} + - match: { aggregations.groupby.buckets.0.key.gender_comp: 1} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.sort.0: 1981} + - match: { aggregations.groupby.buckets.0.top_salary.top.0.metrics.salary: 1000} + + - do: + catch: /path not supported for \[top_salary\]:\ \[nosuchfield\]./ + search: + index: test + size: 0 + body: + aggs: + groupby: + composite: + sources: + - gender_comp: + terms: + field: gender + order: asc + aggs: + top_salary: + top_metrics: + metrics: + field: salary + sort: + birth_date: asc + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: top_salary[nosuchfield] + script: "params.ts < 6000" + + - do: + catch: /No aggregation found for path \[nosuchagg\[salary\]\]/ + search: + index: test + size: 0 + body: + aggs: + groupby: + composite: + sources: + - gender_comp: + terms: + field: gender + order: asc + aggs: + top_salary: + top_metrics: + metrics: + field: salary + sort: + birth_date: asc + size: 1 + having.top_salary: + bucket_selector: + buckets_path: + ts: nosuchagg[salary] + script: "params.ts < 6000"