Use DelayableWriteable with batched query execution (#126864)

We should use DelayableWriteable here as well just like we do with per-shard results.
The heap savings of making use of this tool are quite significant at times and without
using it we could actually regress in terms of heap use relative to non-batched execution
in corner cases of a low but larger than one number of shards per node.
This commit is contained in:
Armin Braun 2025-04-17 01:27:53 +02:00 committed by GitHub
parent f4fe57516e
commit 880aa52e27
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 40 additions and 43 deletions

View file

@ -224,6 +224,7 @@ public class TransportVersions {
public static final TransportVersion ESQL_REPORT_SHARD_PARTITIONING = def(9_050_0_00);
public static final TransportVersion ESQL_QUERY_PLANNING_DURATION = def(9_051_0_00);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED = def(9_052_0_00);
public static final TransportVersion BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE = def(9_053_0_00);
/*
* STOP! READ THIS FIRST! No, really,

View file

@ -12,6 +12,7 @@ package org.elasticsearch.action.search;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.SearchPhaseController.TopDocsStats;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
@ -25,7 +26,6 @@ import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
@ -220,7 +220,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
}
final int resultSize = buffer.size() + (mergeResult == null ? 0 : 1) + batchedResults.size();
final List<TopDocs> topDocsList = hasTopDocs ? new ArrayList<>(resultSize) : null;
final Deque<InternalAggregations> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
final Deque<DelayableWriteable<InternalAggregations>> aggsList = hasAggs ? new ArrayDeque<>(resultSize) : null;
// consume partial merge result from the un-batched execution path that is used for BwC, shard-level retries, and shard level
// execution for shards on the coordinating node itself
if (mergeResult != null) {
@ -253,7 +253,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
}
@Override
public InternalAggregations next() {
public DelayableWriteable<InternalAggregations> next() {
return aggsList.pollFirst();
}
},
@ -300,7 +300,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
private static void consumePartialMergeResult(
MergeResult partialResult,
List<TopDocs> topDocsList,
Collection<InternalAggregations> aggsList
Collection<DelayableWriteable<InternalAggregations>> aggsList
) {
if (topDocsList != null) {
topDocsList.add(partialResult.reducedTopDocs);
@ -310,7 +310,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
}
}
private static void addAggsToList(MergeResult partialResult, Collection<InternalAggregations> aggsList) {
private static void addAggsToList(MergeResult partialResult, Collection<DelayableWriteable<InternalAggregations>> aggsList) {
var aggs = partialResult.reducedAggs;
if (aggs != null) {
aggsList.add(aggs);
@ -382,45 +382,34 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
}
// we leave the results un-serialized because serializing is slow but we compute the serialized
// size as an estimate of the memory used by the newly reduced aggregations.
return new MergeResult(processedShards, newTopDocs, newAggs, newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0);
return new MergeResult(
processedShards,
newTopDocs,
newAggs == null ? null : DelayableWriteable.referencing(newAggs),
newAggs != null ? DelayableWriteable.getSerializedSize(newAggs) : 0
);
}
private static InternalAggregations aggregate(
Iterator<QuerySearchResult> toConsume,
Iterator<InternalAggregations> partialResults,
Iterator<DelayableWriteable<InternalAggregations>> partialResults,
int resultSetSize,
AggregationReduceContext reduceContext
) {
interface ReleasableIterator extends Iterator<InternalAggregations>, Releasable {}
try (var aggsIter = new ReleasableIterator() {
private Releasable toRelease;
@Override
public void close() {
Releasables.close(toRelease);
}
@Override
public boolean hasNext() {
return toConsume.hasNext();
}
@Override
public InternalAggregations next() {
var res = toConsume.next().consumeAggs();
Releasables.close(toRelease);
toRelease = res;
return res.expand();
}
}) {
return InternalAggregations.topLevelReduce(
partialResults.hasNext() ? Iterators.concat(partialResults, aggsIter) : aggsIter,
resultSetSize,
reduceContext
);
try {
Iterator<InternalAggregations> aggsIter = Iterators.map(toConsume, r -> {
try (var res = r.consumeAggs()) {
return res.expand();
}
});
return InternalAggregations.topLevelReduce(partialResults.hasNext() ? Iterators.concat(Iterators.map(partialResults, r -> {
try (r) {
return r.expand();
}
}), aggsIter) : aggsIter, resultSetSize, reduceContext);
} finally {
toConsume.forEachRemaining(QuerySearchResult::releaseAggs);
partialResults.forEachRemaining(Releasable::close);
}
}
@ -648,23 +637,30 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
record MergeResult(
List<SearchShard> processedShards,
TopDocs reducedTopDocs,
@Nullable InternalAggregations reducedAggs,
@Nullable DelayableWriteable<InternalAggregations> reducedAggs,
long estimatedSize
) implements Writeable {
static MergeResult readFrom(StreamInput in) throws IOException {
return new MergeResult(
List.of(),
Lucene.readTopDocsIncludingShardIndex(in),
in.readOptionalWriteable(InternalAggregations::readFrom),
in.readVLong()
);
return new MergeResult(List.of(), Lucene.readTopDocsIncludingShardIndex(in), in.readOptionalWriteable(i -> {
if (i.getTransportVersion().onOrAfter(TransportVersions.BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE)) {
return DelayableWriteable.delayed(InternalAggregations::readFrom, i);
} else {
return DelayableWriteable.referencing(InternalAggregations.readFrom(i));
}
}), in.readVLong());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
Lucene.writeTopDocsIncludingShardIndex(out, reducedTopDocs);
out.writeOptionalWriteable(reducedAggs);
out.writeOptionalWriteable(
reducedAggs == null
? null
: (out.getTransportVersion().onOrAfter(TransportVersions.BATCHED_QUERY_EXECUTION_DELAYABLE_WRITABLE)
? reducedAggs
: reducedAggs.expand())
);
out.writeVLong(estimatedSize);
}
}