Fix inner hits + aggregations concurrency bug (#128036)

Fork InnerHitSubContext instances before source is fetched in 
aggregations to prevent inter-segment race conditions.

Relates to #122419
This commit is contained in:
Ben Chaplin 2025-06-02 16:44:53 -04:00 committed by GitHub
parent 4762111b44
commit 13bce60be9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 219 additions and 1 deletions

View file

@ -0,0 +1,6 @@
pr: 128036
summary: Fix inner hits + aggregations concurrency bug
area: Search
type: bug
issues:
- 122419

View file

@ -26,6 +26,8 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.TopHits;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.FieldSortBuilder;
@ -51,6 +53,7 @@ import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_T
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery;
import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.topHits;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCountAndNoFailures;
@ -64,6 +67,7 @@ import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ -698,4 +702,68 @@ public class InnerHitsIT extends ParentChildTestCase {
)
);
}
public void testTopHitsOnParentChild() throws Exception {
assertAcked(
prepareCreate("idx").setMapping(
jsonBuilder().startObject()
.startObject("_doc")
.startObject("properties")
.startObject("id")
.field("type", "keyword")
.endObject()
.startObject("join_field")
.field("type", "join")
.startObject("relations")
.field("parent", new String[] { "child1", "child2" })
.endObject()
.endObject()
.endObject()
.endObject()
.endObject()
)
);
ensureGreen("idx");
List<IndexRequestBuilder> requestBuilders = new ArrayList<>();
int numDocs = scaledRandomIntBetween(10, 100);
int child1 = 0;
int child2 = 0;
int[] child1InnerObjects = new int[numDocs];
int[] child2InnerObjects = new int[numDocs];
for (int parent = 0; parent < numDocs; parent++) {
String parentId = String.format(Locale.ENGLISH, "p_%03d", parent);
requestBuilders.add(createIndexRequest("idx", "parent", parentId, null));
int numChildDocs = child1InnerObjects[parent] = scaledRandomIntBetween(1, numDocs);
int limit = child1 + numChildDocs;
for (; child1 < limit; child1++) {
requestBuilders.add(createIndexRequest("idx", "child1", String.format(Locale.ENGLISH, "c1_%04d", child1), parentId));
}
numChildDocs = child2InnerObjects[parent] = scaledRandomIntBetween(1, numDocs);
limit = child2 + numChildDocs;
for (; child2 < limit; child2++) {
requestBuilders.add(createIndexRequest("idx", "child2", String.format(Locale.ENGLISH, "c2_%04d", child2), parentId));
}
}
indexRandom(true, requestBuilders);
ensureSearchable();
QueryBuilder hasChildQuery = hasChildQuery("child2", matchAllQuery(), ScoreMode.None).innerHit(new InnerHitBuilder().setSize(2));
AggregationBuilder topHitsAgg = topHits("top-children").size(3);
assertNoFailuresAndResponse(prepareSearch("idx").setQuery(hasChildQuery).addAggregation(topHitsAgg), response -> {
assertHitCount(response, numDocs);
TopHits topHits = response.getAggregations().get("top-children");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(3));
for (SearchHit hit : hits) {
SearchHits innerHits = hit.getInnerHits().get("child2");
assertThat(innerHits.getHits().length, lessThanOrEqualTo(2));
}
});
}
}

View file

@ -88,12 +88,35 @@ class ParentChildInnerHitContextBuilder extends InnerHitContextBuilder {
private final String typeName;
private final boolean fetchChildInnerHits;
private final Joiner joiner;
private final SearchExecutionContext searchExecutionContext;
JoinFieldInnerHitSubContext(String name, SearchContext context, String typeName, boolean fetchChildInnerHits, Joiner joiner) {
super(name, context);
this.typeName = typeName;
this.fetchChildInnerHits = fetchChildInnerHits;
this.joiner = joiner;
this.searchExecutionContext = null;
}
JoinFieldInnerHitSubContext(
JoinFieldInnerHitSubContext joinFieldInnerHitSubContext,
SearchExecutionContext searchExecutionContext
) {
super(joinFieldInnerHitSubContext);
this.typeName = joinFieldInnerHitSubContext.typeName;
this.fetchChildInnerHits = joinFieldInnerHitSubContext.fetchChildInnerHits;
this.joiner = joinFieldInnerHitSubContext.joiner;
this.searchExecutionContext = searchExecutionContext;
}
@Override
public JoinFieldInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) {
return new JoinFieldInnerHitSubContext(this, searchExecutionContext);
}
@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext();
}
@Override

View file

@ -19,7 +19,9 @@ import org.elasticsearch.common.document.DocumentField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.fielddata.ScriptDocValues;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.plugins.Plugin;
@ -30,6 +32,7 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -45,6 +48,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.lookup.FieldLookup;
import org.elasticsearch.search.lookup.LeafSearchLookup;
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.sort.NestedSortBuilder;
import org.elasticsearch.search.sort.ScriptSortBuilder.ScriptSortType;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
@ -983,6 +987,67 @@ public class TopHitsIT extends ESIntegTestCase {
);
}
public void testTopHitsOnInnerHits() {
QueryBuilder nestedQuery = nestedQuery("comments", matchQuery("comments.message", "text"), ScoreMode.Avg).innerHit(
new InnerHitBuilder().setSize(2)
);
AggregationBuilder topHitsAgg = topHits("top-comments").size(3)
.sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments")));
assertNoFailuresAndResponse(prepareSearch("articles").setQuery(nestedQuery).addAggregation(topHitsAgg), response -> {
TopHits topHits = response.getAggregations().get("top-comments");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(3));
for (SearchHit hit : hits) {
SearchHits innerHits = hit.getInnerHits().get("comments");
assertThat(innerHits.getHits().length, lessThanOrEqualTo(2));
for (SearchHit innerHit : innerHits) {
assertThat(innerHit.getNestedIdentity().getField().string(), equalTo("comments"));
Map<String, Object> source = innerHit.getSourceAsMap();
assertTrue(source.containsKey("message"));
assertFalse(source.containsKey("reviewers"));
}
}
});
}
public void testTopHitsOnMultipleNestedInnerHits() {
QueryBuilder doubleNestedQuery = nestedQuery(
"comments",
nestedQuery("comments.reviewers", matchQuery("comments.reviewers.name", "user c"), ScoreMode.Avg).innerHit(
new InnerHitBuilder()
),
ScoreMode.Avg
).innerHit(new InnerHitBuilder("review"));
AggregationBuilder topHitsAgg = topHits("top-reviewers").size(2)
.sort(SortBuilders.fieldSort("comments.date").order(SortOrder.ASC).setNestedSort(new NestedSortBuilder("comments")));
assertNoFailuresAndResponse(prepareSearch("articles").setQuery(doubleNestedQuery).addAggregation(topHitsAgg), response -> {
TopHits topHits = response.getAggregations().get("top-reviewers");
SearchHits hits = topHits.getHits();
assertThat(hits.getHits().length, equalTo(1));
SearchHit hit = hits.getAt(0);
SearchHits innerHits = hit.getInnerHits().get("review");
assertThat(innerHits.getHits().length, equalTo(2));
assertThat(innerHits.getAt(0).getId(), equalTo("1"));
assertThat(innerHits.getAt(0).getNestedIdentity().getField().string(), equalTo("comments"));
assertThat(innerHits.getAt(0).getNestedIdentity().getOffset(), equalTo(0));
Map<String, Object> source0 = innerHits.getAt(0).getSourceAsMap();
assertTrue(source0.containsKey("message"));
assertTrue(source0.containsKey("reviewers"));
assertThat(innerHits.getAt(1).getId(), equalTo("1"));
assertThat(innerHits.getAt(1).getNestedIdentity().getField().string(), equalTo("comments"));
assertThat(innerHits.getAt(1).getNestedIdentity().getOffset(), equalTo(1));
Map<String, Object> source1 = innerHits.getAt(1).getSourceAsMap();
assertTrue(source1.containsKey("message"));
assertTrue(source1.containsKey("reviewers"));
});
}
public void testUseMaxDocInsteadOfSize() throws Exception {
updateIndexSettings(
Settings.builder().put(IndexSettings.MAX_INNER_RESULT_WINDOW_SETTING.getKey(), ArrayUtil.MAX_ARRAY_LENGTH),

View file

@ -396,6 +396,7 @@ public class NestedQueryBuilder extends AbstractQueryBuilder<NestedQueryBuilder>
private final NestedObjectMapper parentObjectMapper;
private final NestedObjectMapper childObjectMapper;
private final SearchExecutionContext searchExecutionContext;
NestedInnerHitSubContext(
String name,
@ -406,6 +407,24 @@ public class NestedQueryBuilder extends AbstractQueryBuilder<NestedQueryBuilder>
super(name, context);
this.parentObjectMapper = parentObjectMapper;
this.childObjectMapper = childObjectMapper;
this.searchExecutionContext = null;
}
NestedInnerHitSubContext(NestedInnerHitSubContext nestedInnerHitSubContext, SearchExecutionContext searchExecutionContext) {
super(nestedInnerHitSubContext);
this.parentObjectMapper = nestedInnerHitSubContext.parentObjectMapper;
this.childObjectMapper = nestedInnerHitSubContext.childObjectMapper;
this.searchExecutionContext = searchExecutionContext;
}
@Override
public NestedInnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext) {
return new NestedInnerHitSubContext(this, searchExecutionContext);
}
@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext != null ? searchExecutionContext : super.getSearchExecutionContext();
}
@Override

View file

@ -41,6 +41,8 @@ import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.LeafBucketCollectorBase;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext.InnerHitSubContext;
import org.elasticsearch.search.internal.SubSearchContext;
import org.elasticsearch.search.profile.ProfileResult;
import org.elasticsearch.search.rescore.RescoreContext;
@ -223,16 +225,40 @@ class TopHitsAggregator extends MetricsAggregator {
private static FetchSearchResult runFetchPhase(SubSearchContext subSearchContext, int[] docIdsToLoad) {
// Fork the search execution context for each slice, because the fetch phase does not support concurrent execution yet.
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(subSearchContext.getSearchExecutionContext());
// InnerHitSubContext is not thread-safe, so we fork it as well to support concurrent execution
InnerHitsContext innerHitsContext = new InnerHitsContext(
getForkedInnerHits(subSearchContext.innerHits().getInnerHits(), searchExecutionContext)
);
SubSearchContext fetchSubSearchContext = new SubSearchContext(subSearchContext) {
@Override
public SearchExecutionContext getSearchExecutionContext() {
return searchExecutionContext;
}
@Override
public InnerHitsContext innerHits() {
return innerHitsContext;
}
};
fetchSubSearchContext.fetchPhase().execute(fetchSubSearchContext, docIdsToLoad, null);
return fetchSubSearchContext.fetchResult();
}
private static Map<String, InnerHitSubContext> getForkedInnerHits(
Map<String, InnerHitSubContext> originalInnerHits,
SearchExecutionContext searchExecutionContext
) {
Map<String, InnerHitSubContext> forkedInnerHits = new HashMap<>();
for (Map.Entry<String, InnerHitSubContext> entry : originalInnerHits.entrySet()) {
var forkedContext = entry.getValue().copyWithSearchExecutionContext(searchExecutionContext);
forkedInnerHits.put(entry.getKey(), forkedContext);
}
return forkedInnerHits;
}
@Override
public InternalTopHits buildEmptyAggregation() {
TopDocs topDocs;

View file

@ -22,6 +22,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.SubSearchContext;
@ -44,7 +45,7 @@ public final class InnerHitsContext {
this.innerHits = new HashMap<>();
}
InnerHitsContext(Map<String, InnerHitSubContext> innerHits) {
public InnerHitsContext(Map<String, InnerHitSubContext> innerHits) {
this.innerHits = Objects.requireNonNull(innerHits);
}
@ -84,6 +85,14 @@ public final class InnerHitsContext {
this.context = context;
}
public InnerHitSubContext(InnerHitSubContext innerHitSubContext) {
super(innerHitSubContext);
this.name = innerHitSubContext.name;
this.context = innerHitSubContext.context;
}
public abstract InnerHitSubContext copyWithSearchExecutionContext(SearchExecutionContext searchExecutionContext);
public abstract TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException;
public String getName() {

View file

@ -145,6 +145,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.search.fetch.subphase.FetchDocValuesPhase;
import org.elasticsearch.search.fetch.subphase.FetchSourcePhase;
import org.elasticsearch.search.fetch.subphase.InnerHitsContext;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
@ -525,6 +526,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
when(ctx.indexShard()).thenReturn(indexShard);
when(ctx.newSourceLoader(null)).thenAnswer(inv -> searchExecutionContext.newSourceLoader(null, false));
when(ctx.newIdLoader()).thenReturn(IdLoader.fromLeafStoredFieldLoader());
when(ctx.innerHits()).thenReturn(new InnerHitsContext());
var res = new SubSearchContext(ctx);
releasables.add(res); // TODO: nasty workaround for not getting the standard resource handling behavior of a real search context
return res;