diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java index 9fd319f9e9b1..672f2db7c29e 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/TermsReduceBenchmark.java @@ -111,7 +111,7 @@ public class TermsReduceBenchmark { dict[i] = new BytesRef(Long.toString(rand.nextLong())); } for (int i = 0; i < numShards; i++) { - aggsList.add(InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, true)))); + aggsList.add(InternalAggregations.from(newTerms(rand, dict, true))); } } @@ -124,7 +124,7 @@ public class TermsReduceBenchmark { for (BytesRef term : randomTerms) { InternalAggregations subAggs; if (withNested) { - subAggs = InternalAggregations.from(Collections.singletonList(newTerms(rand, dict, false))); + subAggs = InternalAggregations.from(newTerms(rand, dict, false)); } else { subAggs = InternalAggregations.EMPTY; } diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java index 6065dedc8de4..1e2846f9ba23 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/search/aggregations/bucket/terms/StringTermsSerializationBenchmark.java @@ -50,13 +50,13 @@ public class StringTermsSerializationBenchmark { @Setup public void initResults() { - results = DelayableWriteable.referencing(InternalAggregations.from(List.of(newTerms(true)))); + results = DelayableWriteable.referencing(InternalAggregations.from(newTerms(true))); } private StringTerms newTerms(boolean withNested) { List resultBuckets = new ArrayList<>(buckets); for (int i = 0; i < buckets; i++) { - InternalAggregations inner = withNested ? InternalAggregations.from(List.of(newTerms(false))) : InternalAggregations.EMPTY; + InternalAggregations inner = withNested ? InternalAggregations.from(newTerms(false)) : InternalAggregations.EMPTY; resultBuckets.add(new StringTerms.Bucket(new BytesRef("test" + i), i, inner, false, 0, DocValueFormat.RAW)); } return new StringTerms( diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java index edb7ec4cffce..c6fceb330f49 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -414,7 +414,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat Bucket lastBucket = null; ListIterator iter = list.listIterator(); - InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(bucketInfo.emptySubAggregations), reduceContext); + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(bucketInfo.emptySubAggregations, reduceContext); // Add the empty buckets within the data, // e.g. if the data series is [1,2,3,7] there're 3 empty buckets that will be created for 4,5,6 diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index c4669b1c2522..ac472cbdd0c3 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -222,7 +222,7 @@ public class InternalTimeSeries extends InternalMultiBucketAggregation aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toCollection(ArrayList::new)); - aggs.add(new Derivative(name(), gradient, xDiff, formatter, metadata())); - Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); - newBuckets.add(newBucket); + newBuckets.add( + factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append( + bucket.getAggregations(), + new Derivative(name(), gradient, xDiff, formatter, metadata()) + ) + ) + ); } else { newBuckets.add(bucket); } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java index 626d9c675af3..ad21d149e933 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/pipeline/MovFnPipelineAggregator.java @@ -25,8 +25,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -117,12 +115,11 @@ public class MovFnPipelineAggregator extends PipelineAggregator { vars, values.subList(fromIndex, toIndex).stream().mapToDouble(Double::doubleValue).toArray() ); - - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .map(InternalAggregation.class::cast) - .collect(Collectors.toCollection(ArrayList::new)); - aggs.add(new InternalSimpleValue(name(), result, formatter, metadata())); - newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); + newBucket = factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), result, formatter, metadata())) + ); index++; } newBuckets.add(newBucket); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java index 9ed62add775c..5f81913be32f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalAggregations.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.DelayableWriteable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator; @@ -47,7 +48,7 @@ public final class InternalAggregations implements Iterable /** * Constructs a new aggregation. */ - public InternalAggregations(List aggregations) { + private InternalAggregations(List aggregations) { this.aggregations = aggregations; if (aggregations.isEmpty()) { aggregationsAsMap = Map.of(); @@ -70,14 +71,15 @@ public final class InternalAggregations implements Iterable } private Map asMap() { - if (aggregationsAsMap == null) { + var res = aggregationsAsMap; + if (res == null) { Map newAggregationsAsMap = Maps.newMapWithExpectedSize(aggregations.size()); for (InternalAggregation aggregation : aggregations) { newAggregationsAsMap.put(aggregation.getName(), aggregation); } - this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap); + res = this.aggregationsAsMap = unmodifiableMap(newAggregationsAsMap); } - return aggregationsAsMap; + return res; } /** @@ -121,13 +123,27 @@ public final class InternalAggregations implements Iterable return builder; } + public static InternalAggregations from(InternalAggregation aggregation) { + return new InternalAggregations(List.of(aggregation)); + } + public static InternalAggregations from(List aggregations) { if (aggregations.isEmpty()) { return EMPTY; } + if (aggregations.size() == 1) { + return from(aggregations.getFirst()); + } return new InternalAggregations(aggregations); } + public static InternalAggregations append(InternalAggregations aggs, InternalAggregation toAppend) { + if (aggs.aggregations.isEmpty()) { + return from(toAppend); + } + return new InternalAggregations(CollectionUtils.appendToCopyNoNullElements(aggs.aggregations, toAppend)); + } + public static InternalAggregations readFrom(StreamInput in) throws IOException { return from(in.readNamedWriteableCollectionAsList(InternalAggregation.class)); } @@ -227,19 +243,7 @@ public final class InternalAggregations implements Iterable } // handle special case when there is just one aggregation if (aggregationsList.size() == 1) { - final List internalAggregations = aggregationsList.get(0).asList(); - final List reduced = new ArrayList<>(internalAggregations.size()); - for (InternalAggregation aggregation : internalAggregations) { - if (aggregation.mustReduceOnSingleInternalAgg()) { - try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) { - aggregatorReducer.accept(aggregation); - reduced.add(aggregatorReducer.get()); - } - } else { - reduced.add(aggregation); - } - } - return from(reduced); + return reduce(aggregationsList.getFirst(), context); } // general case try (AggregatorsReducer reducer = new AggregatorsReducer(aggregationsList.get(0), context, aggregationsList.size())) { @@ -250,6 +254,29 @@ public final class InternalAggregations implements Iterable } } + public static InternalAggregations reduce(InternalAggregations aggregations, AggregationReduceContext context) { + final List internalAggregations = aggregations.asList(); + int size = internalAggregations.size(); + if (size == 0) { + return EMPTY; + } + boolean noneReduced = true; + final List reduced = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + InternalAggregation aggregation = internalAggregations.get(i); + if (aggregation.mustReduceOnSingleInternalAgg()) { + noneReduced = false; + try (AggregatorReducer aggregatorReducer = aggregation.getReducer(context.forAgg(aggregation.getName()), 1)) { + aggregatorReducer.accept(aggregation); + reduced.add(aggregatorReducer.get()); + } + } else { + reduced.add(aggregation); + } + } + return noneReduced ? aggregations : from(reduced); + } + /** * Finalizes the sampling for all the internal aggregations * @param samplingContext the sampling context diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index d2badbeec462..8ce7cc757126 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -377,7 +377,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation< iterateEmptyBuckets(list, list.listIterator(), counter); reduceContext.consumeBucketsAndMaybeBreak(counter.size); - InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(List.of(emptyBucketInfo.subAggregations), reduceContext); + InternalAggregations reducedEmptySubAggs = InternalAggregations.reduce(emptyBucketInfo.subAggregations, reduceContext); ListIterator iter = list.listIterator(); iterateEmptyBuckets(list, iter, new LongConsumer() { private int size = 0; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index cb2150736374..73602ac024b9 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -349,10 +349,7 @@ public class InternalHistogram extends InternalMultiBucketAggregation iter = list.listIterator(); iterateEmptyBuckets(list, iter, new DoubleConsumer() { private int size; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java index 95709f278747..224a39bb73d0 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/BucketScriptPipelineAggregator.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -80,13 +78,11 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator { if (returned == null) { newBuckets.add(bucket); } else { - final List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toCollection(ArrayList::new)); - - InternalSimpleValue simpleValue = new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata()); - aggs.add(simpleValue); InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket( - InternalAggregations.from(aggs), + InternalAggregations.append( + bucket.getAggregations(), + new InternalSimpleValue(name(), returned.doubleValue(), formatter, metadata()) + ), bucket ); newBuckets.add(newBucket); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java index 670479ab2f0a..1b14ae6f5f11 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/CumulativeSumPipelineAggregator.java @@ -21,8 +21,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -53,12 +51,13 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator { if (thisBucketValue != null && thisBucketValue.isInfinite() == false && thisBucketValue.isNaN() == false) { sum += thisBucketValue; } - - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toCollection(ArrayList::new)); - aggs.add(new InternalSimpleValue(name(), sum, formatter, metadata())); - Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); - newBuckets.add(newBucket); + newBuckets.add( + factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), sum, formatter, metadata())) + ) + ); } return factory.createAggregation(newBuckets); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffPipelineAggregator.java index 91c64c87c331..839d71559885 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SerialDiffPipelineAggregator.java @@ -23,8 +23,6 @@ import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -84,11 +82,11 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator { // Both have values, calculate diff and replace the "empty" bucket if (Double.isNaN(thisBucketValue) == false && Double.isNaN(lagValue) == false) { double diff = thisBucketValue - lagValue; - - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toCollection(ArrayList::new)); - aggs.add(new InternalSimpleValue(name(), diff, formatter, metadata())); - newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); + newBucket = factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append(bucket.getAggregations(), new InternalSimpleValue(name(), diff, formatter, metadata())) + ); } newBuckets.add(newBucket); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java index 6852356c1e45..fe38e1fa6294 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/SiblingPipelineAggregator.java @@ -9,11 +9,11 @@ package org.elasticsearch.search.aggregations.pipeline; +import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.search.aggregations.AggregationReduceContext; import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregations; -import java.util.List; import java.util.Map; public abstract class SiblingPipelineAggregator extends PipelineAggregator { @@ -23,11 +23,11 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator { @Override public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) { - return aggregation.copyWithRewritenBuckets(aggregations -> { - List aggs = aggregations.copyResults(); - aggs.add(doReduce(aggregations, reduceContext)); - return InternalAggregations.from(aggs); - }); + return aggregation.copyWithRewritenBuckets( + aggregations -> InternalAggregations.from( + CollectionUtils.appendToCopyNoNullElements(aggregations.copyResults(), doReduce(aggregations, reduceContext)) + ) + ); } public abstract InternalAggregation doReduce(InternalAggregations aggregations, AggregationReduceContext context); diff --git a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java index 330058b16a81..21b43636222f 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/search/SearchResponseUtils.java @@ -986,7 +986,7 @@ public enum SearchResponseUtils { } } } - return new InternalAggregations(aggregations); + return InternalAggregations.from(aggregations); } private static final InstantiatingObjectParser PROFILE_RESULT_PARSER; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java index e71cedf38188..b4682ecc9be6 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/cumulativecardinality/CumulativeCardinalityPipelineAggregator.java @@ -25,8 +25,6 @@ import org.elasticsearch.search.aggregations.support.AggregationPath; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator { private final DocValueFormat formatter; @@ -57,12 +55,16 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator hll.merge(0, bucketHll, 0); cardinality = hll.cardinality(0); } - - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toList()); - aggs.add(new InternalSimpleLongValue(name(), cardinality, formatter, metadata())); - Bucket newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); - newBuckets.add(newBucket); + newBuckets.add( + factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append( + bucket.getAggregations(), + new InternalSimpleLongValue(name(), cardinality, formatter, metadata()) + ) + ) + ); } return factory.createAggregation(newBuckets); } finally { diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/movingPercentiles/MovingPercentilesPipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/movingPercentiles/MovingPercentilesPipelineAggregator.java index 663299df54f8..15abd65fcc08 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/movingPercentiles/MovingPercentilesPipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/movingPercentiles/MovingPercentilesPipelineAggregator.java @@ -27,7 +27,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; public class MovingPercentilesPipelineAggregator extends PipelineAggregator { @@ -101,9 +100,14 @@ public class MovingPercentilesPipelineAggregator extends PipelineAggregator { } if (state != null) { - List aggs = bucket.getAggregations().asList().stream().collect(Collectors.toList()); - aggs.add(new InternalTDigestPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata())); - newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); + newBucket = factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append( + bucket.getAggregations(), + new InternalTDigestPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()) + ) + ); } newBuckets.add(newBucket); index++; @@ -147,9 +151,14 @@ public class MovingPercentilesPipelineAggregator extends PipelineAggregator { } if (state != null) { - List aggs = new ArrayList<>(bucket.getAggregations().asList()); - aggs.add(new InternalHDRPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata())); - newBucket = factory.createBucket(factory.getKey(bucket), bucket.getDocCount(), InternalAggregations.from(aggs)); + newBucket = factory.createBucket( + factory.getKey(bucket), + bucket.getDocCount(), + InternalAggregations.append( + bucket.getAggregations(), + new InternalHDRPercentiles(name(), config.keys, state, config.keyed, config.formatter, metadata()) + ) + ); } newBuckets.add(newBucket); index++; diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java index adb8b691a83e..a338b8d98d21 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/normalize/NormalizePipelineAggregator.java @@ -21,8 +21,6 @@ import java.util.List; import java.util.Map; import java.util.function.DoubleUnaryOperator; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.resolveBucketValue; @@ -70,11 +68,15 @@ public class NormalizePipelineAggregator extends PipelineAggregator { normalizedBucketValue = method.applyAsDouble(values[i]); } - List aggs = StreamSupport.stream(bucket.getAggregations().spliterator(), false) - .collect(Collectors.toList()); - aggs.add(new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata())); - InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(InternalAggregations.from(aggs), bucket); - newBuckets.add(newBucket); + newBuckets.add( + originalAgg.createBucket( + InternalAggregations.append( + bucket.getAggregations(), + new InternalSimpleValue(name(), normalizedBucketValue, formatter, metadata()) + ), + bucket + ) + ); } return originalAgg.create(newBuckets); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/inference/InferencePipelineAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/inference/InferencePipelineAggregator.java index fd5c66399c72..14b1aacf549e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/inference/InferencePipelineAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/inference/InferencePipelineAggregator.java @@ -100,12 +100,15 @@ public class InferencePipelineAggregator extends PipelineAggregator { } catch (Exception e) { inference = new WarningInferenceResults(e.getMessage()); } - - final List aggs = new ArrayList<>(bucket.getAggregations().asList()); - InternalInferenceAggregation aggResult = new InternalInferenceAggregation(name(), metadata(), inference); - aggs.add(aggResult); - InternalMultiBucketAggregation.InternalBucket newBucket = originalAgg.createBucket(InternalAggregations.from(aggs), bucket); - newBuckets.add(newBucket); + newBuckets.add( + originalAgg.createBucket( + InternalAggregations.append( + bucket.getAggregations(), + new InternalInferenceAggregation(name(), metadata(), inference) + ), + bucket + ) + ); } // the model is released at the end of this block.