Fix AsyncSearchActionIT tests (#127010)

Missed a spot here when moving this to delayed deserialization, we can leak pending batch results here on exceptions.

closes #126994
closes #126995
closes #126975
closes #126999
closes #127001
closes #126974
closes #127008
This commit is contained in:
Armin Braun 2025-04-17 19:31:58 +02:00 committed by GitHub
parent b6c9b9b54d
commit 5a3c9e7dc1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 11 additions and 25 deletions

View file

@ -408,27 +408,6 @@ tests:
- class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests
method: testStdinWithMultipleValues
issue: https://github.com/elastic/elasticsearch/issues/126882
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testDeleteCancelRunningTask
issue: https://github.com/elastic/elasticsearch/issues/126994
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testMaxResponseSize
issue: https://github.com/elastic/elasticsearch/issues/126995
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testRemoveAsyncIndex
issue: https://github.com/elastic/elasticsearch/issues/126975
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testCleanupOnFailure
issue: https://github.com/elastic/elasticsearch/issues/126999
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testUpdateStoreKeepAlive
issue: https://github.com/elastic/elasticsearch/issues/127001
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testRestartAfterCompletion
issue: https://github.com/elastic/elasticsearch/issues/126974
- class: org.elasticsearch.xpack.search.AsyncSearchActionIT
method: testDeleteCleanupIndex
issue: https://github.com/elastic/elasticsearch/issues/127008
- class: org.elasticsearch.packaging.test.DockerTests
method: test024InstallPluginFromArchiveUsingConfigFile
issue: https://github.com/elastic/elasticsearch/issues/126936

View file

@ -26,6 +26,7 @@ import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
@ -162,7 +163,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
consume(querySearchResult, next);
}
private final List<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayList<>();
private final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults = new ArrayDeque<>();
/**
* Unlinks partial merge results from this instance and returns them as a partial merge result to be sent to the coordinating node.
@ -214,7 +215,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
buffer.sort(RESULT_COMPARATOR);
final TopDocsStats topDocsStats = this.topDocsStats;
var mergeResult = this.mergeResult;
final List<Tuple<TopDocsStats, MergeResult>> batchedResults;
final ArrayDeque<Tuple<TopDocsStats, MergeResult>> batchedResults;
synchronized (this.batchedResults) {
batchedResults = this.batchedResults;
}
@ -226,8 +227,8 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
if (mergeResult != null) {
consumePartialMergeResult(mergeResult, topDocsList, aggsList);
}
for (int i = 0; i < batchedResults.size(); i++) {
Tuple<TopDocsStats, MergeResult> batchedResult = batchedResults.set(i, null);
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
topDocsStats.add(batchedResult.v1());
consumePartialMergeResult(batchedResult.v2(), topDocsList, aggsList);
}
@ -528,6 +529,12 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
querySearchResult.releaseAggs();
}
}
synchronized (this.batchedResults) {
Tuple<TopDocsStats, MergeResult> batchedResult;
while ((batchedResult = batchedResults.poll()) != null) {
Releasables.close(batchedResult.v2().reducedAggs());
}
}
}
private synchronized void onMergeFailure(Exception exc) {