diff --git a/docs/changelog/128036.yaml b/docs/changelog/128036.yaml new file mode 100644 index 000000000000..94cc17b90444 --- /dev/null +++ b/docs/changelog/128036.yaml @@ -0,0 +1,6 @@ +pr: 128036 +summary: Fix inner hits + aggregations concurrency bug +area: Search +type: bug +issues: + - 122419 diff --git a/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/InnerHitsIT.java b/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/InnerHitsIT.java index 6d6072b2992c..b311b15d0af3 100644 --- a/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/InnerHitsIT.java +++ b/modules/parent-join/src/internalClusterTest/java/org/elasticsearch/join/query/InnerHitsIT.java @@ -26,6 +26,8 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.AggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.TopHits; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -51,6 +53,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_T import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery; +import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCountAndNoFailures; @@ -64,6 +67,7 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; @@ -698,4 +702,68 @@ public class InnerHitsIT extends ParentChildTestCase { ) ); } + + public void testTopHitsOnParentChild() throws Exception { + assertAcked( + prepareCreate("idx").setMapping( + jsonBuilder().startObject() + .startObject("_doc") + .startObject("properties") + .startObject("id") + .field("type", "keyword") + .endObject() + .startObject("join_field") + .field("type", "join") + .startObject("relations") + .field("parent", new String[] { "child1", "child2" }) + .endObject() + .endObject() + .endObject() + .endObject() + .endObject() + ) + ); + ensureGreen("idx"); + + List requestBuilders = new ArrayList<>(); + int numDocs = scaledRandomIntBetween(10, 100); + int child1 = 0; + int child2 = 0; + int[] child1InnerObjects = new int[numDocs]; + int[] child2InnerObjects = new int[numDocs]; + for (int parent = 0; parent < numDocs; parent++) { + String parentId = String.format(Locale.ENGLISH, "p_%03d", parent); + requestBuilders.add(createIndexRequest("idx", "parent", parentId, null)); + + int numChildDocs = child1InnerObjects[parent] = scaledRandomIntBetween(1, numDocs); + int limit = child1 + numChildDocs; + for (; child1 < limit; child1++) { + requestBuilders.add(createIndexRequest("idx", "child1", String.format(Locale.ENGLISH, "c1_%04d", child1), parentId)); + } + numChildDocs = child2InnerObjects[parent] = scaledRandomIntBetween(1, numDocs); + limit = child2 + numChildDocs; + for (; child2 < limit; child2++) { + requestBuilders.add(createIndexRequest("idx", "child2", String.format(Locale.ENGLISH, "c2_%04d", child2), parentId)); + } + } + + indexRandom(true, requestBuilders); + ensureSearchable(); + + QueryBuilder hasChildQuery = hasChildQuery("child2", matchAllQuery(), ScoreMode.None).innerHit(new InnerHitBuilder().setSize(2)); + AggregationBuilder topHitsAgg = topHits("top-children").size(3); + + assertNoFailuresAndResponse(prepareSearch("idx").setQuery(hasChildQuery).addAggregation(topHitsAgg), response -> { + assertHitCount(response, numDocs); + + TopHits topHits = response.getAggregations().get("top-children"); + SearchHits hits = topHits.getHits(); + assertThat(hits.getHits().length, equalTo(3)); + + for (SearchHit hit : hits) { + SearchHits innerHits = hit.getInnerHits().get("child2"); + assertThat(innerHits.getHits().length, lessThanOrEqualTo(2)); + } + }); + } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java index 460437e8541a..ad192bff2299 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/query/ParentChildInnerHitContextBuilder.java @@ -88,12 +88,35 @@ class ParentChildInnerHitContextBuilder extends InnerHitContextBuilder { private final String typeName; private final boolean fetchChildInnerHits; private final Joiner joiner; + private final SearchExecutionContext searchExecutionContext; JoinFieldInnerHitSubContext(String name, SearchContext context, String typeName, boolean fetchChildInnerHits, Joiner joiner) { super(name, context); this.typeName = typeName; this.fetchChildInnerHits = fetchChildInnerHits; this.joiner = joiner; + this.searchExecutionContext = null; + } + + JoinFieldInnerHitSubContext( + JoinFieldInnerHitSubContext joinFieldInnerHitSubContext, + SearchExecutionContext searchExecutionContext + ) { + super(joinFieldInnerHitSubContext); + this.typeName = joinFieldInnerHitSubContext.typeName; + this.fetchChildInnerHits = joinFieldInnerHitSubContext.fetchChildInnerHits; + this.joiner = joinFieldInnerHitSubContext.joiner; + this.searchExecutionContext = searchExecutionContext; + } + + @Override + public JoinFieldInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) { + return new JoinFieldInnerHitSubContext(this, searchExecutionContext); + } + + @Override + public SearchExecutionContext getSearchExecutionContext() { + return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext(); } @Override diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java index 34f258840e6c..58e639a260ab 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java @@ -19,7 +19,9 @@ import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.fielddata.ScriptDocValues; +import org.elasticsearch.index.query.InnerHitBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.plugins.Plugin; @@ -30,6 +32,7 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode; import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -45,6 +48,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightField; import org.elasticsearch.search.lookup.FieldLookup; import org.elasticsearch.search.lookup.LeafSearchLookup; import org.elasticsearch.search.rescore.QueryRescorerBuilder; +import org.elasticsearch.search.sort.NestedSortBuilder; import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortOrder; @@ -983,6 +987,67 @@ public class TopHitsIT extends ESIntegTestCase { ); } + public void testTopHitsOnInnerHits() { + QueryBuilder nestedQuery = nestedQuery("comments", matchQuery("comments.message", "text"), ScoreMode.Avg).innerHit( + new InnerHitBuilder().setSize(2) + ); + AggregationBuilder topHitsAgg = topHits("top-comments").size(3) + .sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments"))); + + assertNoFailuresAndResponse(prepareSearch("articles").setQuery(nestedQuery).addAggregation(topHitsAgg), response -> { + TopHits topHits = response.getAggregations().get("top-comments"); + SearchHits hits = topHits.getHits(); + assertThat(hits.getHits().length, equalTo(3)); + + for (SearchHit hit : hits) { + SearchHits innerHits = hit.getInnerHits().get("comments"); + assertThat(innerHits.getHits().length, lessThanOrEqualTo(2)); + for (SearchHit innerHit : innerHits) { + assertThat(innerHit.getNestedIdentity().getField().string(), equalTo("comments")); + Map source = innerHit.getSourceAsMap(); + assertTrue(source.containsKey("message")); + assertFalse(source.containsKey("reviewers")); + } + } + }); + } + + public void testTopHitsOnMultipleNestedInnerHits() { + QueryBuilder doubleNestedQuery = nestedQuery( + "comments", + nestedQuery("comments.reviewers", matchQuery("comments.reviewers.name", "user c"), ScoreMode.Avg).innerHit( + new InnerHitBuilder() + ), + ScoreMode.Avg + ).innerHit(new InnerHitBuilder("review")); + AggregationBuilder topHitsAgg = topHits("top-reviewers").size(2) + .sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments"))); + + assertNoFailuresAndResponse(prepareSearch("articles").setQuery(doubleNestedQuery).addAggregation(topHitsAgg), response -> { + TopHits topHits = response.getAggregations().get("top-reviewers"); + SearchHits hits = topHits.getHits(); + assertThat(hits.getHits().length, equalTo(1)); + + SearchHit hit = hits.getAt(0); + SearchHits innerHits = hit.getInnerHits().get("review"); + assertThat(innerHits.getHits().length, equalTo(2)); + + assertThat(innerHits.getAt(0).getId(), equalTo("1")); + assertThat(innerHits.getAt(0).getNestedIdentity().getField().string(), equalTo("comments")); + assertThat(innerHits.getAt(0).getNestedIdentity().getOffset(), equalTo(0)); + Map source0 = innerHits.getAt(0).getSourceAsMap(); + assertTrue(source0.containsKey("message")); + assertTrue(source0.containsKey("reviewers")); + + assertThat(innerHits.getAt(1).getId(), equalTo("1")); + assertThat(innerHits.getAt(1).getNestedIdentity().getField().string(), equalTo("comments")); + assertThat(innerHits.getAt(1).getNestedIdentity().getOffset(), equalTo(1)); + Map source1 = innerHits.getAt(1).getSourceAsMap(); + assertTrue(source1.containsKey("message")); + assertTrue(source1.containsKey("reviewers")); + }); + } + public void testUseMaxDocInsteadOfSize() throws Exception { updateIndexSettings( Settings.builder().put(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING.getKey(), ArrayUtil.MAX_ARRAY_LENGTH), diff --git a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java index 523572c25ae2..34c5ede62a65 100644 --- a/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/NestedQueryBuilder.java @@ -396,6 +396,7 @@ public class NestedQueryBuilder extends AbstractQueryBuilder private final NestedObjectMapper parentObjectMapper; private final NestedObjectMapper childObjectMapper; + private final SearchExecutionContext searchExecutionContext; NestedInnerHitSubContext( String name, @@ -406,6 +407,24 @@ public class NestedQueryBuilder extends AbstractQueryBuilder super(name, context); this.parentObjectMapper = parentObjectMapper; this.childObjectMapper = childObjectMapper; + this.searchExecutionContext = null; + } + + NestedInnerHitSubContext(NestedInnerHitSubContext nestedInnerHitSubContext, SearchExecutionContext searchExecutionContext) { + super(nestedInnerHitSubContext); + this.parentObjectMapper = nestedInnerHitSubContext.parentObjectMapper; + this.childObjectMapper = nestedInnerHitSubContext.childObjectMapper; + this.searchExecutionContext = searchExecutionContext; + } + + @Override + public NestedInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) { + return new NestedInnerHitSubContext(this, searchExecutionContext); + } + + @Override + public SearchExecutionContext getSearchExecutionContext() { + return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext(); } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java index 1a5f784027ae..0d21f09e699b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/TopHitsAggregator.java @@ -41,6 +41,8 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector; import org.elasticsearch.search.aggregations.LeafBucketCollectorBase; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.fetch.FetchSearchResult; +import org.elasticsearch.search.fetch.subphase.InnerHitsContext; +import org.elasticsearch.search.fetch.subphase.InnerHitsContext.InnerHitSubContext; import org.elasticsearch.search.internal.SubSearchContext; import org.elasticsearch.search.profile.ProfileResult; import org.elasticsearch.search.rescore.RescoreContext; @@ -223,16 +225,40 @@ class TopHitsAggregator extends MetricsAggregator { private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) { // Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet. SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext()); + // InnerHitSubContext is not thread-safe, so we fork it as well to support concurrent execution + InnerHitsContext innerHitsContext = new InnerHitsContext( + getForkedInnerHits(subSearchContext.innerHits().getInnerHits(), searchExecutionContext) + ); + SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) { @Override public SearchExecutionContext getSearchExecutionContext() { return searchExecutionContext; } + + @Override + public InnerHitsContext innerHits() { + return innerHitsContext; + } }; + fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null); return fetchSubSearchContext.fetchResult(); } + private static Map getForkedInnerHits( + Map originalInnerHits, + SearchExecutionContext searchExecutionContext + ) { + Map forkedInnerHits = new HashMap<>(); + for (Map.Entry entry : originalInnerHits.entrySet()) { + var forkedContext = entry.getValue().copyWithSearchExecutionContext(searchExecutionContext); + forkedInnerHits.put(entry.getKey(), forkedContext); + } + + return forkedInnerHits; + } + @Override public InternalTopHits buildEmptyAggregation() { TopDocs topDocs; diff --git a/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsContext.java b/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsContext.java index 9a38f603478c..d0ee05f40d01 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsContext.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/subphase/InnerHitsContext.java @@ -22,6 +22,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore; +import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SubSearchContext; @@ -44,7 +45,7 @@ public final class InnerHitsContext { this.innerHits = new HashMap<>(); } - InnerHitsContext(Map innerHits) { + public InnerHitsContext(Map innerHits) { this.innerHits = Objects.requireNonNull(innerHits); } @@ -84,6 +85,14 @@ public final class InnerHitsContext { this.context = context; } + public InnerHitSubContext(InnerHitSubContext innerHitSubContext) { + super(innerHitSubContext); + this.name = innerHitSubContext.name; + this.context = innerHitSubContext.context; + } + + public abstract InnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext); + public abstract TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException; public String getName() { diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 2559966ba5e7..5e451e2e79f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -145,6 +145,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType; import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase; import org.elasticsearch.search.fetch.subphase.FetchSourcePhase; +import org.elasticsearch.search.fetch.subphase.InnerHitsContext; import org.elasticsearch.search.internal.AliasFilter; import org.elasticsearch.search.internal.ContextIndexSearcher; import org.elasticsearch.search.internal.SearchContext; @@ -525,6 +526,7 @@ public abstract class AggregatorTestCase extends ESTestCase { when(ctx.indexShard()).thenReturn(indexShard); when(ctx.newSourceLoader(null)).thenAnswer(inv -> searchExecutionContext.newSourceLoader(null, false)); when(ctx.newIdLoader()).thenReturn(IdLoader.fromLeafStoredFieldLoader()); + when(ctx.innerHits()).thenReturn(new InnerHitsContext()); var res = new SubSearchContext(ctx); releasables.add(res); // TODO: nasty workaround for not getting the standard resource handling behavior of a real search context return res;