Optimize InternalAggregations construction a little (#120868)

We can streamline and optimize this logic a little to
see less copying and more compact results.
This commit is contained in:
Armin Braun 2025-01-28 11:50:47 +01:00 committed by GitHub
parent ddc2362592
commit 453db3fd71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 134 additions and 101 deletions

View file

@ -111,7 +111,7 @@ public class TermsReduceBenchmark {
dict[i] = new BytesRef(Long.toString(rand.nextLong()));
}
for (int i = 0; i < numShards; i++) {
aggsList.add(InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, true))));
aggsList.add(InternalAggregations.from(newTerms(rand, dict, true)));
}
}
@ -124,7 +124,7 @@ public class TermsReduceBenchmark {
for (BytesRef term : randomTerms) {
InternalAggregations subAggs;
if (withNested) {
subAggs = InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, false)));
subAggs = InternalAggregations.from(newTerms(rand, dict, false));
} else {
subAggs = InternalAggregations.EMPTY;
}

View file

@ -50,13 +50,13 @@ public class StringTermsSerializationBenchmark {
@Setup
public void initResults() {
results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true))));
results = DelayableWriteable.referencing(InternalAggregations.from(newTerms(true)));
}
private StringTerms newTerms(boolean withNested) {
List<StringTerms.Bucket> resultBuckets = new ArrayList<>(buckets);
for (int i = 0; i < buckets; i++) {
InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY;
InternalAggregations inner = withNested ? InternalAggregations.from(newTerms(false)) : InternalAggregations.EMPTY;
resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW));
}
return new StringTerms(

View file

@ -414,7 +414,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
Bucket lastBucket = null;
ListIterator<Bucket> iter = list.listIterator();
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(bucketInfo.emptySubAggregations, reduceContext);
// Add the empty buckets within the data,
// e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6

View file

@ -222,7 +222,7 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation<InternalT
InternalBucket reducedBucket;
if (bucketsWithSameKey.size() == 1) {
reducedBucket = bucketsWithSameKey.get(0);
reducedBucket.aggregations = InternalAggregations.reduce(List.of(reducedBucket.aggregations), reduceContext);
reducedBucket.aggregations = InternalAggregations.reduce(reducedBucket.aggregations, reduceContext);
} else {
reducedBucket = reduceBucket(bucketsWithSameKey, reduceContext);
}

View file

@ -22,8 +22,6 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -69,11 +67,16 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
if (xAxisUnits != null) {
xDiff = (thisBucketKey.doubleValue() - lastBucketKey.doubleValue()) / xAxisUnits;
}
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
newBuckets.add(
factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(
bucket.getAggregations(),
new Derivative(name(), gradient, xDiff, formatter, metadata())
)
)
);
} else {
newBuckets.add(bucket);
}

View file

@ -25,8 +25,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -117,12 +115,11 @@ public class MovFnPipelineAggregator extends PipelineAggregator {
vars,
values.subList(fromIndex, toIndex).stream().mapToDouble(Double::doubleValue).toArray()
);
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.map(InternalAggregation.class::cast)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new InternalSimpleValue(name(), result, formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBucket = factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), result, formatter, metadata()))
);
index++;
}
newBuckets.add(newBucket);

View file

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
@ -47,7 +48,7 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
/**
* Constructs a new aggregation.
*/
public InternalAggregations(List<InternalAggregation> aggregations) {
private InternalAggregations(List<InternalAggregation> aggregations) {
this.aggregations = aggregations;
if (aggregations.isEmpty()) {
aggregationsAsMap = Map.of();
@ -70,14 +71,15 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
}
private Map<String, InternalAggregation> asMap() {
if (aggregationsAsMap == null) {
var res = aggregationsAsMap;
if (res == null) {
Map<String, InternalAggregation> newAggregationsAsMap = Maps.newMapWithExpectedSize(aggregations.size());
for (InternalAggregation aggregation : aggregations) {
newAggregationsAsMap.put(aggregation.getName(), aggregation);
}
this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
res = this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap);
}
return aggregationsAsMap;
return res;
}
/**
@ -121,13 +123,27 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
return builder;
}
public static InternalAggregations from(InternalAggregation aggregation) {
return new InternalAggregations(List.of(aggregation));
}
public static InternalAggregations from(List<InternalAggregation> aggregations) {
if (aggregations.isEmpty()) {
return EMPTY;
}
if (aggregations.size() == 1) {
return from(aggregations.getFirst());
}
return new InternalAggregations(aggregations);
}
public static InternalAggregations append(InternalAggregations aggs, InternalAggregation toAppend) {
if (aggs.aggregations.isEmpty()) {
return from(toAppend);
}
return new InternalAggregations(CollectionUtils.appendToCopyNoNullElements(aggs.aggregations, toAppend));
}
public static InternalAggregations readFrom(StreamInput in) throws IOException {
return from(in.readNamedWriteableCollectionAsList(InternalAggregation.class));
}
@ -227,19 +243,7 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
}
// handle special case when there is just one aggregation
if (aggregationsList.size() == 1) {
final List<InternalAggregation> internalAggregations = aggregationsList.get(0).asList();
final List<InternalAggregation> reduced = new ArrayList<>(internalAggregations.size());
for (InternalAggregation aggregation : internalAggregations) {
if (aggregation.mustReduceOnSingleInternalAgg()) {
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
aggregatorReducer.accept(aggregation);
reduced.add(aggregatorReducer.get());
}
} else {
reduced.add(aggregation);
}
}
return from(reduced);
return reduce(aggregationsList.getFirst(), context);
}
// general case
try (AggregatorsReducer reducer = new AggregatorsReducer(aggregationsList.get(0), context, aggregationsList.size())) {
@ -250,6 +254,29 @@ public final class InternalAggregations implements Iterable<InternalAggregation>
}
}
public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) {
final List<InternalAggregation> internalAggregations = aggregations.asList();
int size = internalAggregations.size();
if (size == 0) {
return EMPTY;
}
boolean noneReduced = true;
final List<InternalAggregation> reduced = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
InternalAggregation aggregation = internalAggregations.get(i);
if (aggregation.mustReduceOnSingleInternalAgg()) {
noneReduced = false;
try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) {
aggregatorReducer.accept(aggregation);
reduced.add(aggregatorReducer.get());
}
} else {
reduced.add(aggregation);
}
}
return noneReduced ? aggregations : from(reduced);
}
/**
* Finalizes the sampling for all the internal aggregations
* @param samplingContext the sampling context

View file

@ -377,7 +377,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
iterateEmptyBuckets(list, list.listIterator(), counter);
reduceContext.consumeBucketsAndMaybeBreak(counter.size);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, new LongConsumer() {
private int size = 0;

View file

@ -349,10 +349,7 @@ public class InternalHistogram extends InternalMultiBucketAggregation<InternalHi
/*
* Now that we're sure we have space we allocate all the buckets.
*/
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(
Collections.singletonList(emptyBucketInfo.subAggregations),
reduceContext
);
InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext);
ListIterator<Bucket> iter = list.listIterator();
iterateEmptyBuckets(list, iter, new DoubleConsumer() {
private int size;

View file

@ -22,8 +22,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -80,13 +78,11 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
if (returned == null) {
newBuckets.add(bucket);
} else {
final List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toCollection(ArrayList::new));
InternalSimpleValue simpleValue = new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata());
aggs.add(simpleValue);
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(
InternalAggregations.from(aggs),
InternalAggregations.append(
bucket.getAggregations(),
new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata())
),
bucket
);
newBuckets.add(newBucket);

View file

@ -21,8 +21,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -53,12 +51,13 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
if (thisBucketValue != null && thisBucketValue.isInfinite() == false && thisBucketValue.isNaN() == false) {
sum += thisBucketValue;
}
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new InternalSimpleValue(name(), sum, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
newBuckets.add(
factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), sum, formatter, metadata()))
)
);
}
return factory.createAggregation(newBuckets);
}

View file

@ -23,8 +23,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -84,11 +82,11 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
// Both have values, calculate diff and replace the "empty" bucket
if (Double.isNaN(thisBucketValue) == false && Double.isNaN(lagValue) == false) {
double diff = thisBucketValue - lagValue;
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toCollection(ArrayList::new));
aggs.add(new InternalSimpleValue(name(), diff, formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBucket = factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), diff, formatter, metadata()))
);
}
newBuckets.add(newBucket);

View file

@ -9,11 +9,11 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import java.util.List;
import java.util.Map;
public abstract class SiblingPipelineAggregator extends PipelineAggregator {
@ -23,11 +23,11 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
@Override
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
return aggregation.copyWithRewritenBuckets(aggregations -> {
List<InternalAggregation> aggs = aggregations.copyResults();
aggs.add(doReduce(aggregations, reduceContext));
return InternalAggregations.from(aggs);
});
return aggregation.copyWithRewritenBuckets(
aggregations -> InternalAggregations.from(
CollectionUtils.appendToCopyNoNullElements(aggregations.copyResults(), doReduce(aggregations, reduceContext))
)
);
}
public abstract InternalAggregation doReduce(InternalAggregations aggregations, AggregationReduceContext context);

View file

@ -986,7 +986,7 @@ public enum SearchResponseUtils {
}
}
}
return new InternalAggregations(aggregations);
return InternalAggregations.from(aggregations);
}
private static final InstantiatingObjectParser<ProfileResult, Void> PROFILE_RESULT_PARSER;

View file

@ -25,8 +25,6 @@ import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator {
private final DocValueFormat formatter;
@ -57,12 +55,16 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator
hll.merge(0, bucketHll, 0);
cardinality = hll.cardinality(0);
}
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toList());
aggs.add(new InternalSimpleLongValue(name(), cardinality, formatter, metadata()));
Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBuckets.add(newBucket);
newBuckets.add(
factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(
bucket.getAggregations(),
new InternalSimpleLongValue(name(), cardinality, formatter, metadata())
)
)
);
}
return factory.createAggregation(newBuckets);
} finally {

View file

@ -27,7 +27,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
public class MovingPercentilesPipelineAggregator extends PipelineAggregator {
@ -101,9 +100,14 @@ public class MovingPercentilesPipelineAggregator extends PipelineAggregator {
}
if (state != null) {
List<InternalAggregation> aggs = bucket.getAggregations().asList().stream().collect(Collectors.toList());
aggs.add(new InternalTDigestPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBucket = factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(
bucket.getAggregations(),
new InternalTDigestPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata())
)
);
}
newBuckets.add(newBucket);
index++;
@ -147,9 +151,14 @@ public class MovingPercentilesPipelineAggregator extends PipelineAggregator {
}
if (state != null) {
List<InternalAggregation> aggs = new ArrayList<>(bucket.getAggregations().asList());
aggs.add(new InternalHDRPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()));
newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs));
newBucket = factory.createBucket(
factory.getKey(bucket),
bucket.getDocCount(),
InternalAggregations.append(
bucket.getAggregations(),
new InternalHDRPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata())
)
);
}
newBuckets.add(newBucket);
index++;

View file

@ -21,8 +21,6 @@ import java.util.List;
import java.util.Map;
import java.util.function.DoubleUnaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue;
@ -70,11 +68,15 @@ public class NormalizePipelineAggregator extends PipelineAggregator {
normalizedBucketValue = method.applyAsDouble(values[i]);
}
List<InternalAggregation> aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false)
.collect(Collectors.toList());
aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata()));
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(InternalAggregations.from(aggs), bucket);
newBuckets.add(newBucket);
newBuckets.add(
originalAgg.createBucket(
InternalAggregations.append(
bucket.getAggregations(),
new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata())
),
bucket
)
);
}
return originalAgg.create(newBuckets);

View file

@ -100,12 +100,15 @@ public class InferencePipelineAggregator extends PipelineAggregator {
} catch (Exception e) {
inference = new WarningInferenceResults(e.getMessage());
}
final List<InternalAggregation> aggs = new ArrayList<>(bucket.getAggregations().asList());
InternalInferenceAggregation aggResult = new InternalInferenceAggregation(name(), metadata(), inference);
aggs.add(aggResult);
InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(InternalAggregations.from(aggs), bucket);
newBuckets.add(newBucket);
newBuckets.add(
originalAgg.createBucket(
InternalAggregations.append(
bucket.getAggregations(),
new InternalInferenceAggregation(name(), metadata(), inference)
),
bucket
)
);
}
// the model is released at the end of this block.