From 9296fb40ff2f30a6f53428ae2cfe09446dffe38f Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 19 Nov 2024 12:26:37 +0100 Subject: [PATCH] Use LongArray instead of long[] for owning ordinals when building Internal aggregations (#116874) This commit changes the signature of InternalAggregation#buildAggregations(long[]) to InternalAggregation#buildAggregations(LongArray) to avoid allocations of humongous arrays. --- .../adjacency/AdjacencyMatrixAggregator.java | 84 ++++---- .../AutoDateHistogramAggregator.java | 6 +- .../timeseries/TimeSeriesAggregator.java | 68 ++++--- .../ChildrenToParentAggregator.java | 3 +- .../aggregations/ParentJoinAggregator.java | 10 +- .../ParentToChildrenAggregator.java | 3 +- .../action/search/TransportSearchIT.java | 3 +- .../aggregations/AdaptingAggregator.java | 7 +- .../search/aggregations/Aggregator.java | 8 +- .../aggregations/NonCollectingAggregator.java | 7 +- .../bucket/BestBucketsDeferringCollector.java | 26 +-- .../bucket/BucketsAggregator.java | 191 ++++++++++-------- .../bucket/DeferableBucketAggregator.java | 7 +- .../bucket/DeferringBucketCollector.java | 8 +- .../bucket/composite/CompositeAggregator.java | 75 +++---- .../countedterms/CountedTermsAggregator.java | 124 +++++++----- .../bucket/filter/FiltersAggregator.java | 3 +- .../bucket/geogrid/GeoGridAggregator.java | 57 +++--- .../bucket/global/GlobalAggregator.java | 5 +- .../AbstractHistogramAggregator.java | 3 +- .../histogram/DateHistogramAggregator.java | 3 +- .../DateRangeHistogramAggregator.java | 3 +- .../VariableWidthHistogramAggregator.java | 52 ++--- .../bucket/missing/MissingAggregator.java | 3 +- .../bucket/nested/NestedAggregator.java | 3 +- .../nested/ReverseNestedAggregator.java | 3 +- .../bucket/prefix/IpPrefixAggregator.java | 96 +++++---- .../bucket/range/BinaryRangeAggregator.java | 3 +- .../bucket/range/RangeAggregator.java | 3 +- .../sampler/BestDocsDeferringCollector.java | 3 +- .../bucket/sampler/SamplerAggregator.java | 3 +- .../random/RandomSamplerAggregator.java | 3 +- .../GlobalOrdinalsStringTermsAggregator.java | 108 +++++----- .../terms/InternalSignificantTerms.java | 2 +- .../bucket/terms/LongRareTermsAggregator.java | 115 ++++++----- .../terms/MapStringTermsAggregator.java | 87 ++++---- .../bucket/terms/NumericTermsAggregator.java | 102 +++++----- .../terms/StringRareTermsAggregator.java | 124 +++++++----- .../metrics/MetricsAggregator.java | 9 +- .../aggregation/ProfilingAggregator.java | 3 +- .../aggregations/AdaptingAggregatorTests.java | 3 +- .../aggregations/AggregatorBaseTests.java | 3 +- .../BestBucketsDeferringCollectorTests.java | 20 +- .../bucket/BucketsAggregatorTests.java | 3 +- .../BestDocsDeferringCollectorTests.java | 3 +- .../multiterms/MultiTermsAggregator.java | 95 +++++---- .../CategorizeTextAggregator.java | 48 ++--- .../mr/DelegatingCircuitBreakerService.java | 6 +- .../mr/ItemSetMapReduceAggregator.java | 7 +- 49 files changed, 875 insertions(+), 739 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java index 2b4fea0327e8..29e8aec00a02 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/AdjacencyMatrixAggregator.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -177,65 +178,66 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { // Buckets are ordered into groups - [keyed filters] [key1&key2 intersects] - int maxOrd = owningBucketOrds.length * totalNumKeys; - int totalBucketsToBuild = 0; - for (int ord = 0; ord < maxOrd; ord++) { + long maxOrd = owningBucketOrds.size() * totalNumKeys; + long totalBucketsToBuild = 0; + for (long ord = 0; ord < maxOrd; ord++) { if (bucketDocCount(ord) > 0) { totalBucketsToBuild++; } } - long[] bucketOrdsToBuild = new long[totalBucketsToBuild]; - int builtBucketIndex = 0; - for (int ord = 0; ord < maxOrd; ord++) { - if (bucketDocCount(ord) > 0) { - bucketOrdsToBuild[builtBucketIndex++] = ord; - } - } - assert builtBucketIndex == totalBucketsToBuild; - builtBucketIndex = 0; - var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < owningBucketOrds.length; owningBucketOrdIdx++) { - List buckets = new ArrayList<>(filters.length); - for (int i = 0; i < keys.length; i++) { - long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], i); - long docCount = bucketDocCount(bucketOrd); - // Empty buckets are not returned because this aggregation will commonly be used under a - // a date-histogram where we will look for transactions over time and can expect many - // empty buckets. - if (docCount > 0) { - InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket( - keys[i], - docCount, - bucketSubAggs.apply(builtBucketIndex++) - ); - buckets.add(bucket); + try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) { + int builtBucketIndex = 0; + for (int ord = 0; ord < maxOrd; ord++) { + if (bucketDocCount(ord) > 0) { + bucketOrdsToBuild.set(builtBucketIndex++, ord); } } - int pos = keys.length; - for (int i = 0; i < keys.length; i++) { - for (int j = i + 1; j < keys.length; j++) { - long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], pos); + assert builtBucketIndex == totalBucketsToBuild; + builtBucketIndex = 0; + var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) { + List buckets = new ArrayList<>(filters.length); + for (int i = 0; i < keys.length; i++) { + long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i); long docCount = bucketDocCount(bucketOrd); - // Empty buckets are not returned due to potential for very sparse matrices + // Empty buckets are not returned because this aggregation will commonly be used under a + // a date-histogram where we will look for transactions over time and can expect many + // empty buckets. if (docCount > 0) { - String intersectKey = keys[i] + separator + keys[j]; InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket( - intersectKey, + keys[i], docCount, bucketSubAggs.apply(builtBucketIndex++) ); buckets.add(bucket); } - pos++; } + int pos = keys.length; + for (int i = 0; i < keys.length; i++) { + for (int j = i + 1; j < keys.length; j++) { + long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), pos); + long docCount = bucketDocCount(bucketOrd); + // Empty buckets are not returned due to potential for very sparse matrices + if (docCount > 0) { + String intersectKey = keys[i] + separator + keys[j]; + InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket( + intersectKey, + docCount, + bucketSubAggs.apply(builtBucketIndex++) + ); + buckets.add(bucket); + } + pos++; + } + } + results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata()); } - results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata()); + assert builtBucketIndex == totalBucketsToBuild; + return results; } - assert builtBucketIndex == totalBucketsToBuild; - return results; } @Override diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index d4e1c2928c44..6add1b0ac4a1 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -141,7 +141,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { protected final InternalAggregation[] buildAggregations( LongKeyedBucketOrds bucketOrds, LongToIntFunction roundingIndexFor, - long[] owningBucketOrds + LongArray owningBucketOrds ) throws IOException { return buildAggregationsForVariableBuckets( owningBucketOrds, @@ -324,7 +324,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds); } @@ -594,7 +594,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { /* * Rebucket before building the aggregation to build as small as result * as possible. diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java index c74637330dd7..1263d4282a18 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/TimeSeriesAggregator.java @@ -11,6 +11,8 @@ package org.elasticsearch.aggregations.bucket.timeseries; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.index.mapper.RoutingPathFields; @@ -30,6 +32,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceConfig; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -67,42 +70,43 @@ public class TimeSeriesAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { BytesRef spare = new BytesRef(); - InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - List buckets = new ArrayList<>(); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - ordsEnum.readValue(spare); - InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( - BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here. - docCount, - null, - keyed - ); - bucket.bucketOrd = ordsEnum.ord(); - buckets.add(bucket); - if (buckets.size() >= size) { - break; + try (ObjectArray allBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) { + for (long ordIdx = 0; ordIdx < allBucketsPerOrd.size(); ordIdx++) { + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + List buckets = new ArrayList<>(); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + ordsEnum.readValue(spare); + InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket( + BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here. + docCount, + null, + keyed + ); + bucket.bucketOrd = ordsEnum.ord(); + buckets.add(bucket); + if (buckets.size() >= size) { + break; + } } + // NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp). + // _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value + // changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making + // the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore + // because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp). + buckets.sort(Comparator.comparing(bucket -> bucket.key)); + allBucketsPerOrd.set(ordIdx, buckets.toArray(new InternalTimeSeries.InternalBucket[0])); } - // NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp). - // _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value - // changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making - // the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore - // because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp). - buckets.sort(Comparator.comparing(bucket -> bucket.key)); - allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]); - } - buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(allBucketsPerOrd[ordIdx]); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx)); + } + return result; } - return result; } @Override @@ -185,7 +189,7 @@ public class TimeSeriesAggregator extends BucketsAggregator { } InternalTimeSeries buildResult(InternalTimeSeries.InternalBucket[] topBuckets) { - return new InternalTimeSeries(name, List.of(topBuckets), keyed, metadata()); + return new InternalTimeSeries(name, Arrays.asList(topBuckets), keyed, metadata()); } @FunctionalInterface diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java index 6985f6da98cf..12489ad37aab 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ChildrenToParentAggregator.java @@ -9,6 +9,7 @@ package org.elasticsearch.join.aggregations; import org.apache.lucene.search.Query; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.CardinalityUpperBound; @@ -44,7 +45,7 @@ public class ChildrenToParentAggregator extends ParentJoinAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalParent( diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java index 60412179807a..1b99d2b34046 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentJoinAggregator.java @@ -21,6 +21,7 @@ import org.apache.lucene.util.Bits; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.AggregationExecutionContext; @@ -115,7 +116,7 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements } @Override - protected void prepareSubAggs(long[] ordsToCollect) throws IOException { + protected void prepareSubAggs(LongArray ordsToCollect) throws IOException { IndexReader indexReader = searcher().getIndexReader(); for (LeafReaderContext ctx : indexReader.leaves()) { Scorer childDocsScorer = outFilter.scorer(ctx); @@ -153,9 +154,10 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements * structure that maps a primitive long to a list of primitive * longs. */ - for (long owningBucketOrd : ordsToCollect) { - if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) { - collectBucket(sub, docId, owningBucketOrd); + for (long ord = 0; ord < ordsToCollect.size(); ord++) { + long ordToCollect = ordsToCollect.get(ord); + if (collectionStrategy.exists(ordToCollect, globalOrdinal)) { + collectBucket(sub, docId, ordToCollect); } } } diff --git a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java index d8a061a2de6d..939107f87715 100644 --- a/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java +++ b/modules/parent-join/src/main/java/org/elasticsearch/join/aggregations/ParentToChildrenAggregator.java @@ -9,6 +9,7 @@ package org.elasticsearch.join.aggregations; import org.apache.lucene.search.Query; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.CardinalityUpperBound; @@ -40,7 +41,7 @@ public class ParentToChildrenAggregator extends ParentJoinAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalChildren( diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java index a1395f81eb09..67576059de1e 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -669,7 +670,7 @@ public class TransportSearchIT extends ESIntegTestCase { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) { return new InternalAggregation[] { buildEmptyAggregation() }; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java index b4d5512331b4..d08a76e51c6b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AdaptingAggregator.java @@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree; @@ -98,10 +99,10 @@ public abstract class AdaptingAggregator extends Aggregator { } @Override - public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { result[ordIdx] = adapt(delegateResults[ordIdx]); } return result; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java index 0d36469dddfd..aa8d9fba554c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/Aggregator.java @@ -13,6 +13,8 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.search.aggregations.support.AggregationPath; import org.elasticsearch.search.sort.SortOrder; @@ -142,7 +144,7 @@ public abstract class Aggregator extends BucketCollector implements Releasable { * @return the results for each ordinal, in the same order as the array * of ordinals */ - public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException; + public abstract InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException; /** * Release this aggregation and its sub-aggregations. @@ -153,11 +155,11 @@ public abstract class Aggregator extends BucketCollector implements Releasable { * Build the result of this aggregation if it is at the "top level" * of the aggregation tree. If, instead, it is a sub-aggregation of * another aggregation then the aggregation that contains it will call - * {@link #buildAggregations(long[])}. + * {@link #buildAggregations(LongArray)}. */ public final InternalAggregation buildTopLevel() throws IOException { assert parent() == null; - return buildAggregations(new long[] { 0 })[0]; + return buildAggregations(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true))[0]; } /** diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java index 8accc6b15d82..4da2d10cfc0c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/NonCollectingAggregator.java @@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.support.AggregationContext; import java.io.IOException; @@ -39,9 +40,9 @@ public abstract class NonCollectingAggregator extends AggregatorBase { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { results[ordIdx] = buildEmptyAggregation(); } return results; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java index 231130c92034..44d76d31be0e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollector.java @@ -20,6 +20,7 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.util.packed.PackedInts; import org.apache.lucene.util.packed.PackedLongValues; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -146,7 +147,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { * Replay the wrapped collector, but only on a selection of buckets. */ @Override - public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { + public void prepareSelectedBuckets(LongArray selectedBuckets) throws IOException { if (finished == false) { throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called"); } @@ -154,9 +155,9 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { throw new IllegalStateException("Already been replayed"); } - this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE); - for (long ord : selectedBuckets) { - this.selectedBuckets.add(ord); + this.selectedBuckets = new LongHash(selectedBuckets.size(), BigArrays.NON_RECYCLING_INSTANCE); + for (long i = 0; i < selectedBuckets.size(); i++) { + this.selectedBuckets.add(selectedBuckets.get(i)); } boolean needsScores = scoreMode().needsScores(); @@ -232,21 +233,22 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector { * been collected directly. */ @Override - public Aggregator wrap(final Aggregator in) { + public Aggregator wrap(final Aggregator in, BigArrays bigArrays) { return new WrappedAggregator(in) { @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { if (selectedBuckets == null) { throw new IllegalStateException("Collection has not been replayed yet."); } - long[] rebasedOrds = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]); - if (rebasedOrds[ordIdx] == -1) { - throw new IllegalStateException("Cannot build for a bucket which has not been collected"); + try (LongArray rebasedOrds = bigArrays.newLongArray(owningBucketOrds.size())) { + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + rebasedOrds.set(ordIdx, selectedBuckets.find(owningBucketOrds.get(ordIdx))); + if (rebasedOrds.get(ordIdx) == -1) { + throw new IllegalStateException("Cannot build for a bucket which has not been collected"); + } } + return in.buildAggregations(rebasedOrds); } - return in.buildAggregations(rebasedOrds); } }; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java index e6c26c427880..252eb0877d02 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java @@ -10,7 +10,9 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.index.LeafReaderContext; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.IntArray; import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.search.aggregations.AggregationErrors; import org.elasticsearch.search.aggregations.Aggregator; @@ -155,22 +157,22 @@ public abstract class BucketsAggregator extends AggregatorBase { /** * Hook to allow taking an action before building the sub agg results. */ - protected void prepareSubAggs(long[] ordsToCollect) throws IOException {} + protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {} /** * Build the results of the sub-aggregations of the buckets at each of * the provided ordinals. *

* Most aggregations should probably use something like - * {@link #buildSubAggsForAllBuckets(Object[][], ToLongFunction, BiConsumer)} - * or {@link #buildAggregationsForVariableBuckets(long[], LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)} - * or {@link #buildAggregationsForFixedBucketCount(long[], int, BucketBuilderForFixedCount, Function)} - * or {@link #buildAggregationsForSingleBucket(long[], SingleBucketResultBuilder)} + * {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)} + * or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)} + * or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)} + * or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)} * instead of calling this directly. * @return the sub-aggregation results in the same order as the provided * array of ordinals */ - protected final IntFunction buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException { + protected final IntFunction buildSubAggsForBuckets(LongArray bucketOrdsToCollect) throws IOException { prepareSubAggs(bucketOrdsToCollect); InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][]; for (int i = 0; i < subAggregators.length; i++) { @@ -204,26 +206,28 @@ public abstract class BucketsAggregator extends AggregatorBase { * @param setAggs how to set the sub-aggregation results on a bucket */ protected final void buildSubAggsForAllBuckets( - B[][] buckets, + ObjectArray buckets, ToLongFunction bucketToOrd, BiConsumer setAggs ) throws IOException { - int totalBucketOrdsToCollect = 0; - for (B[] bucketsForOneResult : buckets) { - totalBucketOrdsToCollect += bucketsForOneResult.length; + long totalBucketOrdsToCollect = 0; + for (long b = 0; b < buckets.size(); b++) { + totalBucketOrdsToCollect += buckets.get(b).length; } - long[] bucketOrdsToCollect = new long[totalBucketOrdsToCollect]; - int s = 0; - for (B[] bucketsForOneResult : buckets) { - for (B bucket : bucketsForOneResult) { - bucketOrdsToCollect[s++] = bucketToOrd.applyAsLong(bucket); + + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalBucketOrdsToCollect)) { + int s = 0; + for (long ord = 0; ord < buckets.size(); ord++) { + for (B bucket : buckets.get(ord)) { + bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket)); + } } - } - var results = buildSubAggsForBuckets(bucketOrdsToCollect); - s = 0; - for (B[] bucket : buckets) { - for (int b = 0; b < bucket.length; b++) { - setAggs.accept(bucket[b], results.apply(s++)); + var results = buildSubAggsForBuckets(bucketOrdsToCollect); + s = 0; + for (long ord = 0; ord < buckets.size(); ord++) { + for (B value : buckets.get(ord)) { + setAggs.accept(value, results.apply(s++)); + } } } } @@ -237,37 +241,38 @@ public abstract class BucketsAggregator extends AggregatorBase { * @param resultBuilder how to build a result from buckets */ protected final InternalAggregation[] buildAggregationsForFixedBucketCount( - long[] owningBucketOrds, + LongArray owningBucketOrds, int bucketsPerOwningBucketOrd, BucketBuilderForFixedCount bucketBuilder, Function, InternalAggregation> resultBuilder ) throws IOException { - int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd; - long[] bucketOrdsToCollect = new long[totalBuckets]; - int bucketOrdIdx = 0; - for (long owningBucketOrd : owningBucketOrds) { - long ord = owningBucketOrd * bucketsPerOwningBucketOrd; - for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { - bucketOrdsToCollect[bucketOrdIdx++] = ord++; + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) { + int bucketOrdIdx = 0; + for (long i = 0; i < owningBucketOrds.size(); i++) { + long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd; + for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { + bucketOrdsToCollect.set(bucketOrdIdx++, ord++); + } } - } - bucketOrdIdx = 0; - var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) { - List buckets = new ArrayList<>(bucketsPerOwningBucketOrd); - for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { - buckets.add( - bucketBuilder.build( - offsetInOwningOrd, - bucketDocCount(bucketOrdsToCollect[bucketOrdIdx]), - subAggregationResults.apply(bucketOrdIdx++) - ) - ); + bucketOrdIdx = 0; + var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) { + List buckets = new ArrayList<>(bucketsPerOwningBucketOrd); + for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) { + buckets.add( + bucketBuilder.build( + offsetInOwningOrd, + bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)), + subAggregationResults.apply(bucketOrdIdx++) + ) + ); + } + results[owningOrdIdx] = resultBuilder.apply(buckets); } - results[owningOrdIdx] = resultBuilder.apply(buckets); + return results; } - return results; } @FunctionalInterface @@ -280,17 +285,19 @@ public abstract class BucketsAggregator extends AggregatorBase { * @param owningBucketOrds owning bucket ordinals for which to build the results * @param resultBuilder how to build a result from the sub aggregation results */ - protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder) - throws IOException { + protected final InternalAggregation[] buildAggregationsForSingleBucket( + LongArray owningBucketOrds, + SingleBucketResultBuilder resultBuilder + ) throws IOException { /* * It'd be entirely reasonable to call * `consumeBucketsAndMaybeBreak(owningBucketOrds.length)` * here but we don't because single bucket aggs never have. */ var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults.apply(ordIdx)); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx)); } return results; } @@ -307,54 +314,60 @@ public abstract class BucketsAggregator extends AggregatorBase { * @param bucketOrds hash of values to the bucket ordinal */ protected final InternalAggregation[] buildAggregationsForVariableBuckets( - long[] owningBucketOrds, + LongArray owningBucketOrds, LongKeyedBucketOrds bucketOrds, BucketBuilderForVariable bucketBuilder, ResultBuilderForVariable resultBuilder ) throws IOException { long totalOrdsToCollect = 0; - final int[] bucketsInOrd = new int[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - bucketsInOrd[ordIdx] = (int) bucketCount; - totalOrdsToCollect += bucketCount; - } - if (totalOrdsToCollect > Integer.MAX_VALUE) { - // TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is not - // much the user can do about that. If this occurs with any frequency, we should do something about it. - throw new IllegalArgumentException( - "Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]" - ); - } - long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; - int b = 0; - for (long owningBucketOrd : owningBucketOrds) { - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - while (ordsEnum.next()) { - bucketOrdsToCollect[b++] = ordsEnum.ord(); + try (IntArray bucketsInOrd = bigArrays().newIntArray(owningBucketOrds.size())) { + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)); + bucketsInOrd.set(ordIdx, (int) bucketCount); + totalOrdsToCollect += bucketCount; } - } - var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - b = 0; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - List buckets = new ArrayList<>(bucketsInOrd[ordIdx]); - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - if (bucketOrdsToCollect[b] != ordsEnum.ord()) { - // If we hit this, something has gone horribly wrong and we need to investigate - throw AggregationErrors.iterationOrderChangedWithoutMutating( - bucketOrds.toString(), - ordsEnum.ord(), - bucketOrdsToCollect[b] - ); + if (totalOrdsToCollect > Integer.MAX_VALUE) { + // TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is + // not + // much the user can do about that. If this occurs with any frequency, we should do something about it. + throw new IllegalArgumentException( + "Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]" + ); + } + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) { + int b = 0; + for (long i = 0; i < owningBucketOrds.size(); i++) { + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i)); + while (ordsEnum.next()) { + bucketOrdsToCollect.set(b++, ordsEnum.ord()); + } } - buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))); + var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + b = 0; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + List buckets = new ArrayList<>(bucketsInOrd.get(ordIdx)); + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) { + // If we hit this, something has gone horribly wrong and we need to investigate + throw AggregationErrors.iterationOrderChangedWithoutMutating( + bucketOrds.toString(), + ordsEnum.ord(), + bucketOrdsToCollect.get(b) + ); + } + buckets.add( + bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++)) + ); + } + results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets); + } + return results; } - results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets); } - return results; } @FunctionalInterface diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java index 84a15b6d1c0e..64744b705e22 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferableBucketAggregator.java @@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.bucket; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.BucketCollector; @@ -65,7 +66,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator { } deferredAggregations.add(subAggregators[i]); deferredAggregationNames.add(subAggregators[i].name()); - subAggregators[i] = deferringCollector.wrap(subAggregators[i]); + subAggregators[i] = deferringCollector.wrap(subAggregators[i], bigArrays()); } else { collectors.add(subAggregators[i]); } @@ -87,7 +88,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator { /** * Build the {@link DeferringBucketCollector}. The default implementation * replays all hits against the buckets selected by - * {#link {@link DeferringBucketCollector#prepareSelectedBuckets(long...)}. + * {#link {@link DeferringBucketCollector#prepareSelectedBuckets(LongArray)}. */ protected DeferringBucketCollector buildDeferringCollector() { return new BestBucketsDeferringCollector(topLevelQuery(), searcher(), descendsFromGlobalAggregator(parent())); @@ -107,7 +108,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator { } @Override - protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException { + protected final void prepareSubAggs(LongArray bucketOrdsToCollect) throws IOException { if (deferringCollector != null) { deferringCollector.prepareSelectedBuckets(bucketOrdsToCollect); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java index 44cff2651e27..468fec29a942 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/DeferringBucketCollector.java @@ -10,6 +10,8 @@ package org.elasticsearch.search.aggregations.bucket; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.BucketCollector; @@ -37,13 +39,13 @@ public abstract class DeferringBucketCollector extends BucketCollector { /** * Replay the deferred hits on the selected buckets. */ - public abstract void prepareSelectedBuckets(long... selectedBuckets) throws IOException; + public abstract void prepareSelectedBuckets(LongArray selectedBuckets) throws IOException; /** * Wrap the provided aggregator so that it behaves (almost) as if it had * been collected directly. */ - public Aggregator wrap(final Aggregator in) { + public Aggregator wrap(final Aggregator in, BigArrays bigArrays) { return new WrappedAggregator(in); } @@ -80,7 +82,7 @@ public abstract class DeferringBucketCollector extends BucketCollector { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return in.buildAggregations(owningBucketOrds); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java index 9ee15306ce63..0baecf6e3f92 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java @@ -35,6 +35,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.RoaringDocIdSet; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Strings; import org.elasticsearch.index.IndexSortConfig; @@ -184,50 +185,51 @@ public final class CompositeAggregator extends BucketsAggregator implements Size } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { // Composite aggregator must be at the top of the aggregation tree - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L; + assert owningBucketOrds.size() == 1 && owningBucketOrds.get(0) == 0L; if (deferredCollectors != NO_OP_BUCKET_COLLECTOR) { // Replay all documents that contain at least one top bucket (collected during the first pass). runDeferredCollections(); } - int num = Math.min(size, (int) queue.size()); + final int num = Math.min(size, (int) queue.size()); final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num]; - long[] bucketOrdsToCollect = new long[(int) queue.size()]; - for (int i = 0; i < queue.size(); i++) { - bucketOrdsToCollect[i] = i; + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(queue.size())) { + for (int i = 0; i < queue.size(); i++) { + bucketOrdsToCollect.set(i, i); + } + var subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); + while (queue.size() > 0) { + int slot = queue.pop(); + CompositeKey key = queue.toCompositeKey(slot); + InternalAggregations aggs = subAggsForBuckets.apply(slot); + long docCount = queue.getDocCount(slot); + buckets[(int) queue.size()] = new InternalComposite.InternalBucket( + sourceNames, + formats, + key, + reverseMuls, + missingOrders, + docCount, + aggs + ); + } + CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; + return new InternalAggregation[] { + new InternalComposite( + name, + size, + sourceNames, + formats, + Arrays.asList(buckets), + lastBucket, + reverseMuls, + missingOrders, + earlyTerminated, + metadata() + ) }; } - var subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect); - while (queue.size() > 0) { - int slot = queue.pop(); - CompositeKey key = queue.toCompositeKey(slot); - InternalAggregations aggs = subAggsForBuckets.apply(slot); - long docCount = queue.getDocCount(slot); - buckets[(int) queue.size()] = new InternalComposite.InternalBucket( - sourceNames, - formats, - key, - reverseMuls, - missingOrders, - docCount, - aggs - ); - } - CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null; - return new InternalAggregation[] { - new InternalComposite( - name, - size, - sourceNames, - formats, - Arrays.asList(buckets), - lastBucket, - reverseMuls, - missingOrders, - earlyTerminated, - metadata() - ) }; } @Override @@ -244,6 +246,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size false, metadata() ); + } private void finishLeaf() { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java index af4d60bf424a..05fce2cff64d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/countedterms/CountedTermsAggregator.java @@ -13,6 +13,8 @@ import org.apache.lucene.index.DocValues; import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.AggregationExecutionContext; @@ -108,70 +110,80 @@ class CountedTermsAggregator extends TermsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - StringTerms.Bucket[][] topBucketsPerOrd = new StringTerms.Bucket[owningBucketOrds.length][]; - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try ( + LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size()); + ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) + ) { + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - // as users can't control sort order, in practice we'll always sort by doc count descending - try ( - BucketPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - StringTerms.Bucket spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - Supplier emptyBucketBuilder = () -> new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (spare == null) { - spare = emptyBucketBuilder.get(); + // as users can't control sort order, in practice we'll always sort by doc count descending + try ( + BucketPriorityQueue ordered = new BucketPriorityQueue<>( + size, + bigArrays(), + partiallyBuiltBucketComparator + ) + ) { + StringTerms.Bucket spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + Supplier emptyBucketBuilder = () -> new StringTerms.Bucket( + new BytesRef(), + 0, + null, + false, + 0, + format + ); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + ordsEnum.readValue(spare.getTermBytes()); + spare.setDocCount(docCount); + spare.setBucketOrd(ordsEnum.ord()); + spare = ordered.insertWithOverflow(spare); } - ordsEnum.readValue(spare.getTermBytes()); - spare.setDocCount(docCount); - spare.setBucketOrd(ordsEnum.ord()); - spare = ordered.insertWithOverflow(spare); - } - topBucketsPerOrd[ordIdx] = new StringTerms.Bucket[(int) ordered.size()]; - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd[ordIdx][i] = ordered.pop(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount(); - topBucketsPerOrd[ordIdx][i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd[ordIdx][i].getTermBytes())); + topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]); + for (int i = (int) ordered.size() - 1; i >= 0; --i) { + topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); + otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); + topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes())); + } } } - } - buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - final BucketOrder reduceOrder; - if (isKeyOrder(order) == false) { - reduceOrder = InternalOrder.key(true); - Arrays.sort(topBucketsPerOrd[ordIdx], reduceOrder.comparator()); - } else { - reduceOrder = order; + buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + final BucketOrder reduceOrder; + if (isKeyOrder(order) == false) { + reduceOrder = InternalOrder.key(true); + Arrays.sort(topBucketsPerOrd.get(ordIdx), reduceOrder.comparator()); + } else { + reduceOrder = order; + } + result[ordIdx] = new StringTerms( + name, + reduceOrder, + order, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + metadata(), + format, + bucketCountThresholds.getShardSize(), + false, + otherDocCounts.get(ordIdx), + Arrays.asList(topBucketsPerOrd.get(ordIdx)), + null + ); } - result[ordIdx] = new StringTerms( - name, - reduceOrder, - order, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - metadata(), - format, - bucketCountThresholds.getShardSize(), - false, - otherDocCounts[ordIdx], - Arrays.asList(topBucketsPerOrd[ordIdx]), - null - ); + return result; } - return result; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java index fede97c7fdde..69eff3630a8f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -20,6 +20,7 @@ import org.apache.lucene.search.Scorer; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.AggregationExecutionContext; @@ -208,7 +209,7 @@ public abstract class FiltersAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForFixedBucketCount( owningBucketOrds, filters.size() + (otherBucketKey == null ? 0 : 1), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java index cde26bb2214e..0e63e26e77a5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/GeoGridAggregator.java @@ -12,6 +12,8 @@ import org.apache.lucene.index.DocValues; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -132,39 +134,40 @@ public abstract class GeoGridAggregator> extends Bu protected abstract InternalGeoGridBucket newEmptyBucket(); @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize); + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try (ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) { + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize); - try (BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, bigArrays())) { - InternalGeoGridBucket spare = null; - LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - if (spare == null) { - spare = newEmptyBucket(); + try (BucketPriorityQueue ordered = new BucketPriorityQueue<>(size, bigArrays())) { + InternalGeoGridBucket spare = null; + LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + while (ordsEnum.next()) { + if (spare == null) { + spare = newEmptyBucket(); + } + + // need a special function to keep the source bucket + // up-to-date so it can get the appropriate key + spare.hashAsLong = ordsEnum.value(); + spare.docCount = bucketDocCount(ordsEnum.ord()); + spare.bucketOrd = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); } - // need a special function to keep the source bucket - // up-to-date so it can get the appropriate key - spare.hashAsLong = ordsEnum.value(); - spare.docCount = bucketDocCount(ordsEnum.ord()); - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - } - - topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[(int) ordered.size()]; - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd[ordIdx][i] = ordered.pop(); + topBucketsPerOrd.set(ordIdx, new InternalGeoGridBucket[(int) ordered.size()]); + for (int i = (int) ordered.size() - 1; i >= 0; --i) { + topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); + } } } + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata()); + } + return results; } - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata()); - } - return results; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java index b5d3485e72f8..b83001c34377 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/global/GlobalAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.Weight; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.CardinalityUpperBound; @@ -62,8 +63,8 @@ public final class GlobalAggregator extends BucketsAggregator implements SingleB } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0 : "global aggregator can only be a top level aggregator"; + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + assert owningBucketOrds.size() == 1 && owningBucketOrds.get(0) == 0 : "global aggregator can only be a top level aggregator"; return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalGlobal( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java index b81d8b002b6b..ed687df6377d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AbstractHistogramAggregator.java @@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.histogram; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.aggregations.Aggregator; @@ -79,7 +80,7 @@ public abstract class AbstractHistogramAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { double roundKey = Double.longBitsToDouble(bucketValue); double key = roundKey * interval + offset; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 86c320d8dc31..cc2db63fa5ec 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -17,6 +17,7 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.common.Rounding; import org.elasticsearch.common.Rounding.DateTimeUnit; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; @@ -337,7 +338,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> { return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults); }, (owningBucketOrd, buckets) -> { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java index 2bfd85e5fe03..f385f7c34f6b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/DateRangeHistogramAggregator.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.common.Rounding; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.FieldData; @@ -163,7 +164,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForVariableBuckets( owningBucketOrds, bucketOrds, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java index 1afb06067f77..86ec1666e2ce 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.InPlaceMergeSorter; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.DoubleArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -565,35 +566,36 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { int numClusters = collector.finalNumBuckets(); - long[] bucketOrdsToCollect = new long[numClusters]; - for (int i = 0; i < numClusters; i++) { - bucketOrdsToCollect[i] = i; + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(numClusters)) { + for (int i = 0; i < numClusters; i++) { + bucketOrdsToCollect.set(i, i); + } + + var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + + List buckets = new ArrayList<>(numClusters); + for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) { + buckets.add(collector.buildBucket(bucketOrd, subAggregationResults.apply(bucketOrd))); + } + + Function, InternalAggregation> resultBuilder = bucketsToFormat -> { + // The contract of the histogram aggregation is that shards must return + // buckets ordered by centroid in ascending order + CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator()); + + InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( + buildEmptySubAggregations() + ); + + return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata()); + }; + + return new InternalAggregation[] { resultBuilder.apply(buckets) }; } - var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - - List buckets = new ArrayList<>(numClusters); - for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) { - buckets.add(collector.buildBucket(bucketOrd, subAggregationResults.apply(bucketOrd))); - } - - Function, InternalAggregation> resultBuilder = bucketsToFormat -> { - // The contract of the histogram aggregation is that shards must return - // buckets ordered by centroid in ascending order - CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator()); - - InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo( - buildEmptySubAggregations() - ); - - return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata()); - }; - - return new InternalAggregation[] { resultBuilder.apply(buckets) }; - } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java index 5c8f8ab9c562..b49668e45b88 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/missing/MissingAggregator.java @@ -8,6 +8,7 @@ */ package org.elasticsearch.search.aggregations.bucket.missing; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.fielddata.DocValueBits; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -67,7 +68,7 @@ public class MissingAggregator extends BucketsAggregator implements SingleBucket } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalMissing( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java index 0fbb9745aa40..23a2d6380c29 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/NestedAggregator.java @@ -21,6 +21,7 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.search.join.BitSetProducer; import org.apache.lucene.util.BitSet; import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.mapper.NestedObjectMapper; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -124,7 +125,7 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalNested( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java index 0e3e4679c7a2..2477b67367e1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/nested/ReverseNestedAggregator.java @@ -13,6 +13,7 @@ import org.apache.lucene.search.Query; import org.apache.lucene.search.join.BitSetProducer; import org.apache.lucene.util.BitSet; import org.elasticsearch.common.lucene.search.Queries; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.mapper.NestedObjectMapper; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; @@ -86,7 +87,7 @@ public class ReverseNestedAggregator extends BucketsAggregator implements Single } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalReverseNested( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java index 9548cd871e16..e8ba0393208a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/IpPrefixAggregator.java @@ -12,6 +12,8 @@ package org.elasticsearch.search.aggregations.bucket.prefix; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.CollectionUtil; +import org.elasticsearch.common.util.IntArray; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.FieldData; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; @@ -160,57 +162,63 @@ public final class IpPrefixAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { long totalOrdsToCollect = 0; - final int[] bucketsInOrd = new int[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); - bucketsInOrd[ordIdx] = (int) bucketCount; - totalOrdsToCollect += bucketCount; - } - - long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect]; - int b = 0; - for (long owningBucketOrd : owningBucketOrds) { - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); - while (ordsEnum.next()) { - bucketOrdsToCollect[b++] = ordsEnum.ord(); + try (IntArray bucketsInOrd = bigArrays().newIntArray(owningBucketOrds.size())) { + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)); + bucketsInOrd.set(ordIdx, (int) bucketCount); + totalOrdsToCollect += bucketCount; } - } - var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - b = 0; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - List buckets = new ArrayList<>(bucketsInOrd[ordIdx]); - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - long ordinal = ordsEnum.ord(); - if (bucketOrdsToCollect[b] != ordinal) { - throw AggregationErrors.iterationOrderChangedWithoutMutating(bucketOrds.toString(), ordinal, bucketOrdsToCollect[b]); + try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) { + int b = 0; + for (long i = 0; i < owningBucketOrds.size(); i++) { + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i)); + while (ordsEnum.next()) { + bucketOrdsToCollect.set(b++, ordsEnum.ord()); + } } - BytesRef ipAddress = new BytesRef(); - ordsEnum.readValue(ipAddress); - long docCount = bucketDocCount(ordinal); - buckets.add( - new InternalIpPrefix.Bucket( - config.format(), - BytesRef.deepCopyOf(ipAddress), - keyed, - ipPrefix.isIpv6, - ipPrefix.prefixLength, - ipPrefix.appendPrefixLength, - docCount, - subAggregationResults.apply(b++) - ) - ); - // NOTE: the aggregator is expected to return sorted results - CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + b = 0; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + List buckets = new ArrayList<>(bucketsInOrd.get(ordIdx)); + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx)); + while (ordsEnum.next()) { + long ordinal = ordsEnum.ord(); + if (bucketOrdsToCollect.get(b) != ordinal) { + throw AggregationErrors.iterationOrderChangedWithoutMutating( + bucketOrds.toString(), + ordinal, + bucketOrdsToCollect.get(b) + ); + } + BytesRef ipAddress = new BytesRef(); + ordsEnum.readValue(ipAddress); + long docCount = bucketDocCount(ordinal); + buckets.add( + new InternalIpPrefix.Bucket( + config.format(), + BytesRef.deepCopyOf(ipAddress), + keyed, + ipPrefix.isIpv6, + ipPrefix.prefixLength, + ipPrefix.appendPrefixLength, + docCount, + subAggregationResults.apply(b++) + ) + ); + + // NOTE: the aggregator is expected to return sorted results + CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator()); + } + results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata()); + } + return results; } - results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata()); } - return results; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java index 6119af3cb6a5..9bde8d007c1b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/BinaryRangeAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.index.SortedDocValues; import org.apache.lucene.index.SortedSetDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.fielddata.FieldData; import org.elasticsearch.index.fielddata.SortedBinaryDocValues; import org.elasticsearch.search.DocValueFormat; @@ -359,7 +360,7 @@ public final class BinaryRangeAggregator extends BucketsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForFixedBucketCount( owningBucketOrds, ranges.length, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java index 6d63bb786c29..0654a788a10a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/RangeAggregator.java @@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.index.fielddata.FieldData; import org.elasticsearch.index.fielddata.NumericDoubleValues; @@ -531,7 +532,7 @@ public abstract class RangeAggregator extends BucketsAggregator { @Override @SuppressWarnings("unchecked") - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForFixedBucketCount( owningBucketOrds, ranges.length, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java index 37cee75c11b4..70f72fafba7b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollector.java @@ -19,6 +19,7 @@ import org.apache.lucene.search.TopScoreDocCollectorManager; import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -120,7 +121,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme } @Override - public void prepareSelectedBuckets(long... selectedBuckets) throws IOException { + public void prepareSelectedBuckets(LongArray selectedBuckets) { // no-op - deferred aggs processed in postCollection call } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java index 78b2cdfe7655..a4c06a194fbf 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/SamplerAggregator.java @@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler; import org.apache.lucene.misc.search.DiversifiedTopDocsCollector; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.Releasables; import org.elasticsearch.search.aggregations.AggregationExecutionContext; @@ -212,7 +213,7 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalSampler( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java index fc03786356f8..921cbb96385a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/sampler/random/RandomSamplerAggregator.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -60,7 +61,7 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return buildAggregationsForSingleBucket( owningBucketOrds, (owningBucketOrd, subAggregationResults) -> new InternalRandomSampler( diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 0f7c61dc9f25..d04d7528ea93 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -191,7 +192,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return resultStrategy.buildAggregations(owningBucketOrds); } @@ -696,61 +697,66 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr B extends InternalMultiBucketAggregation.InternalBucket, TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable { - private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + if (valueCount == 0) { // no context in this reader - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - results[ordIdx] = buildNoValuesResult(owningBucketOrds[ordIdx]); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = buildNoValuesResult(owningBucketOrds.get(ordIdx)); } return results; } + try ( + LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true); + ObjectArray topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.size()) + ) { + GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; + for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) { + final int size; + if (bucketCountThresholds.getMinDocCount() == 0) { + // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns + size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); + } else { + size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); + } + try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { + final long finalOrdIdx = ordIdx; + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + BucketUpdater updater = bucketUpdater(owningBucketOrd, lookupGlobalOrd); + collectionStrategy.forEach(owningBucketOrd, new BucketInfoConsumer() { + TB spare = null; - B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length); - long[] otherDocCount = new long[owningBucketOrds.length]; - GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - final int size; - if (bucketCountThresholds.getMinDocCount() == 0) { - // if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns - size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize()); - } else { - size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize()); - } - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - final int finalOrdIdx = ordIdx; - BucketUpdater updater = bucketUpdater(owningBucketOrds[ordIdx], lookupGlobalOrd); - collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() { - TB spare = null; - - @Override - public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { - otherDocCount[finalOrdIdx] += docCount; - if (docCount >= bucketCountThresholds.getShardMinDocCount()) { - if (spare == null) { - spare = buildEmptyTemporaryBucket(); + @Override + public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException { + otherDocCount.increment(finalOrdIdx, docCount); + if (docCount >= bucketCountThresholds.getShardMinDocCount()) { + if (spare == null) { + spare = buildEmptyTemporaryBucket(); + } + updater.updateBucket(spare, globalOrd, bucketOrd, docCount); + spare = ordered.insertWithOverflow(spare); } - updater.updateBucket(spare, globalOrd, bucketOrd, docCount); - spare = ordered.insertWithOverflow(spare); } - } - }); + }); - // Get the top buckets - topBucketsPreOrd[ordIdx] = buildBuckets((int) ordered.size()); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); - otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount(); + // Get the top buckets + topBucketsPreOrd.set(ordIdx, buildBuckets((int) ordered.size())); + for (int i = (int) ordered.size() - 1; i >= 0; --i) { + B bucket = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); + topBucketsPreOrd.get(ordIdx)[i] = bucket; + otherDocCount.increment(ordIdx, -bucket.getDocCount()); + } } } - } - buildSubAggs(topBucketsPreOrd); + buildSubAggs(topBucketsPreOrd); - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - results[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCount[ordIdx], topBucketsPreOrd[ordIdx]); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPreOrd.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCount.get(ordIdx), topBucketsPreOrd.get(ordIdx)); + } + return results; } - return results; } /** @@ -785,7 +791,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr /** * Build an array to hold the "top" buckets for each ordinal. */ - abstract B[][] buildTopBucketsPerOrd(int size); + abstract ObjectArray buildTopBucketsPerOrd(long size); /** * Build an array of buckets for a particular ordinal to collect the @@ -802,7 +808,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(B[][] topBucketsPreOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException; /** * Turn the buckets into an aggregation result. @@ -841,8 +847,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new StringTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -879,7 +885,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void buildSubAggs(StringTerms.Bucket[][] topBucketsPreOrd) throws IOException { + void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } @@ -973,8 +979,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new SignificantStringTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -1026,7 +1032,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr } @Override - void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOException { + void buildSubAggs(ObjectArray topBucketsPreOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index a60911b46684..eeb7305ac51f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -62,7 +62,7 @@ public abstract class InternalSignificantTerms builtBuckets = new ArrayList<>(); - LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]); - while (collectedBuckets.next()) { - long docCount = bucketDocCount(collectedBuckets.ord()); - // if the key is below threshold, reinsert into the new ords - if (docCount <= maxDocCount) { - LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format); - bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value()); - mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd; - builtBuckets.add(bucket); - keepCount++; - } else { - filters[owningOrdIdx].add(collectedBuckets.value()); + try ( + ObjectArray rarestPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()); + ObjectArray filters = bigArrays().newObjectArray(owningBucketOrds.size()) + ) { + try (LongArray mergeMap = bigArrays().newLongArray(bucketOrds.size())) { + mergeMap.fill(0, mergeMap.size(), -1); + long keepCount = 0; + long offset = 0; + for (long owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.size(); owningOrdIdx++) { + try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, bigArrays())) { + filters.set(owningOrdIdx, newFilter()); + List builtBuckets = new ArrayList<>(); + LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds.get(owningOrdIdx)); + while (collectedBuckets.next()) { + long docCount = bucketDocCount(collectedBuckets.ord()); + // if the key is below threshold, reinsert into the new ords + if (docCount <= maxDocCount) { + LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format); + bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value()); + mergeMap.set(collectedBuckets.ord(), bucket.bucketOrd); + builtBuckets.add(bucket); + keepCount++; + } else { + filters.get(owningOrdIdx).add(collectedBuckets.value()); + } + } + rarestPerOrd.set(owningOrdIdx, builtBuckets.toArray(LongRareTerms.Bucket[]::new)); + offset += bucketsInThisOwningBucketToCollect.size(); } } - rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(LongRareTerms.Bucket[]::new); - offset += bucketsInThisOwningBucketToCollect.size(); - } - } - /* - * Only merge/delete the ordinals if we have actually deleted one, - * to save on some redundant work. - */ - if (keepCount != mergeMap.length) { - LongUnaryOperator howToMerge = b -> mergeMap[(int) b]; - rewriteBuckets(offset, howToMerge); - if (deferringCollector() != null) { - ((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge); + /* + * Only merge/delete the ordinals if we have actually deleted one, + * to save on some redundant work. + */ + if (keepCount != mergeMap.size()) { + LongUnaryOperator howToMerge = mergeMap::get; + rewriteBuckets(offset, howToMerge); + if (deferringCollector() != null) { + ((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge); + } + } } - } - /* - * Now build the results! - */ - buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator()); - result[ordIdx] = new LongRareTerms( - name, - ORDER, - metadata(), - format, - Arrays.asList(rarestPerOrd[ordIdx]), - maxDocCount, - filters[ordIdx] - ); + /* + * Now build the results! + */ + buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + LongRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx); + Arrays.sort(buckets, ORDER.comparator()); + result[ordIdx] = new LongRareTerms( + name, + ORDER, + metadata(), + format, + Arrays.asList(buckets), + maxDocCount, + filters.get(ordIdx) + ); + } + return result; } - return result; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java index 76202b6386a7..c02ed5509e6a 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java @@ -18,6 +18,7 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -117,7 +118,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return resultStrategy.buildAggregations(owningBucketOrds); } @@ -282,45 +283,49 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato implements Releasable { - private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx], excludeDeletedDocs); - int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); + private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try ( + LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); + ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(Math.toIntExact(owningBucketOrds.size())) + ) { + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + long owningOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningOrd, excludeDeletedDocs); + int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; + try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { + B spare = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); } - if (spare == null) { - spare = emptyBucketBuilder.get(); - } - updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } - topBucketsPerOrd[ordIdx] = buildBuckets((int) ordered.size()); - for (int i = (int) ordered.size() - 1; i >= 0; --i) { - topBucketsPerOrd[ordIdx][i] = ordered.pop(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount(); - finalizeBucket(topBucketsPerOrd[ordIdx][i]); + topBucketsPerOrd.set(ordIdx, buildBuckets((int) ordered.size())); + for (int i = (int) ordered.size() - 1; i >= 0; --i) { + topBucketsPerOrd.get(ordIdx)[i] = ordered.pop(); + otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount()); + finalizeBucket(topBucketsPerOrd.get(ordIdx)[i]); + } } } - } - buildSubAggs(topBucketsPerOrd); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + buildSubAggs(topBucketsPerOrd); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); + } + return result; } - return result; } /** @@ -361,7 +366,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato /** * Build an array to hold the "top" buckets for each ordinal. */ - abstract B[][] buildTopBucketsPerOrd(int size); + abstract ObjectArray buildTopBucketsPerOrd(long size); /** * Build an array of buckets for a particular ordinal to collect the @@ -379,7 +384,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; /** * Turn the buckets into an aggregation result. @@ -501,8 +506,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato } @Override - StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new StringTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -521,7 +526,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato } @Override - void buildSubAggs(StringTerms.Bucket[][] topBucketsPerOrd) throws IOException { + void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); } @@ -637,8 +642,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato } @Override - SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new SignificantStringTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -657,7 +662,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato } @Override - void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOException { + void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java index d39348d80df1..e10f0b894402 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/NumericTermsAggregator.java @@ -15,6 +15,7 @@ import org.apache.lucene.index.SortedNumericDocValues; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.NumericUtils; import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -39,7 +40,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSource; import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; @@ -136,7 +136,7 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { return resultStrategy.buildAggregations(owningBucketOrds); } @@ -163,48 +163,52 @@ public final class NumericTermsAggregator extends TermsAggregator { abstract class ResultStrategy implements Releasable { - private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length); - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx], excludeDeletedDocs); - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try ( + LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); + ObjectArray topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.size()) + ) { + for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { - B spare = null; - BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + try (ObjectArrayPriorityQueue ordered = buildPriorityQueue(size)) { + B spare = null; + BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + Supplier emptyBucketBuilder = emptyBucketBuilder(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + spare = emptyBucketBuilder.get(); + } + updateBucket(spare, ordsEnum, docCount); + spare = ordered.insertWithOverflow(spare); } - if (spare == null) { - spare = emptyBucketBuilder.get(); - } - updateBucket(spare, ordsEnum, docCount); - spare = ordered.insertWithOverflow(spare); - } - // Get the top buckets - B[] bucketsForOrd = buildBuckets((int) ordered.size()); - topBucketsPerOrd[ordIdx] = bucketsForOrd; - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd[ordIdx][b] = ordered.pop(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); + // Get the top buckets + B[] bucketsForOrd = buildBuckets((int) ordered.size()); + topBucketsPerOrd.set(ordIdx, bucketsForOrd); + for (int b = (int) ordered.size() - 1; b >= 0; --b) { + topBucketsPerOrd.get(ordIdx)[b] = ordered.pop(); + otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[b].getDocCount()); + } } } - } - buildSubAggs(topBucketsPerOrd); + buildSubAggs(topBucketsPerOrd); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); + } + return result; } - return result; } /** @@ -227,7 +231,7 @@ public final class NumericTermsAggregator extends TermsAggregator { /** * Build an array to hold the "top" buckets for each ordinal. */ - abstract B[][] buildTopBucketsPerOrd(int size); + abstract ObjectArray buildTopBucketsPerOrd(long size); /** * Build an array of buckets for a particular ordinal. These arrays @@ -258,7 +262,7 @@ public final class NumericTermsAggregator extends TermsAggregator { * Build the sub-aggregations into the buckets. This will usually * delegate to {@link #buildSubAggsForAllBuckets}. */ - abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException; + abstract void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException; /** * Collect extra entries for "zero" hit documents if they were requested @@ -297,7 +301,7 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - final void buildSubAggs(B[][] topBucketsPerOrd) throws IOException { + final void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } @@ -356,8 +360,8 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - LongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new LongTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -397,7 +401,7 @@ public final class NumericTermsAggregator extends TermsAggregator { bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - List.of(topBuckets), + Arrays.asList(topBuckets), null ); } @@ -438,8 +442,8 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - DoubleTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new DoubleTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -479,7 +483,7 @@ public final class NumericTermsAggregator extends TermsAggregator { bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - List.of(topBuckets), + Arrays.asList(topBuckets), null ); } @@ -551,8 +555,8 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - SignificantLongTerms.Bucket[][] buildTopBucketsPerOrd(int size) { - return new SignificantLongTerms.Bucket[size][]; + ObjectArray buildTopBucketsPerOrd(long size) { + return bigArrays().newObjectArray(size); } @Override @@ -583,7 +587,7 @@ public final class NumericTermsAggregator extends TermsAggregator { } @Override - void buildSubAggs(SignificantLongTerms.Bucket[][] topBucketsPerOrd) throws IOException { + void buildSubAggs(ObjectArray topBucketsPerOrd) throws IOException { buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); } @@ -601,7 +605,7 @@ public final class NumericTermsAggregator extends TermsAggregator { subsetSizes.get(owningBucketOrd), supersetSize, significanceHeuristic, - List.of(topBuckets) + Arrays.asList(topBuckets) ); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java index 2bc2833f0ddc..7200c33c71f7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringRareTermsAggregator.java @@ -12,6 +12,8 @@ import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.SetBackedScalingCuckooFilter; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fielddata.FieldData; @@ -119,72 +121,82 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { /* * Collect the list of buckets, populate the filter with terms * that are too frequent, and figure out how to merge sub-buckets. */ - StringRareTerms.Bucket[][] rarestPerOrd = new StringRareTerms.Bucket[owningBucketOrds.length][]; - SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length]; - long keepCount = 0; - long[] mergeMap = new long[(int) bucketOrds.size()]; - Arrays.fill(mergeMap, -1); - long offset = 0; - for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) { - try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(1, bigArrays())) { - filters[owningOrdIdx] = newFilter(); - List builtBuckets = new ArrayList<>(); - BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]); - BytesRef scratch = new BytesRef(); - while (collectedBuckets.next()) { - collectedBuckets.readValue(scratch); - long docCount = bucketDocCount(collectedBuckets.ord()); - // if the key is below threshold, reinsert into the new ords - if (docCount <= maxDocCount) { - StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(scratch), docCount, null, format); - bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(scratch); - mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd; - builtBuckets.add(bucket); - keepCount++; - } else { - filters[owningOrdIdx].add(scratch); + try ( + ObjectArray rarestPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()); + ObjectArray filters = bigArrays().newObjectArray(owningBucketOrds.size()) + ) { + try (LongArray mergeMap = bigArrays().newLongArray(bucketOrds.size())) { + mergeMap.fill(0, mergeMap.size(), -1); + long keepCount = 0; + long offset = 0; + for (long owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.size(); owningOrdIdx++) { + try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(1, bigArrays())) { + filters.set(owningOrdIdx, newFilter()); + List builtBuckets = new ArrayList<>(); + BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds.get(owningOrdIdx)); + BytesRef scratch = new BytesRef(); + while (collectedBuckets.next()) { + collectedBuckets.readValue(scratch); + long docCount = bucketDocCount(collectedBuckets.ord()); + // if the key is below threshold, reinsert into the new ords + if (docCount <= maxDocCount) { + StringRareTerms.Bucket bucket = new StringRareTerms.Bucket( + BytesRef.deepCopyOf(scratch), + docCount, + null, + format + ); + bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(scratch); + mergeMap.set(collectedBuckets.ord(), bucket.bucketOrd); + builtBuckets.add(bucket); + keepCount++; + } else { + filters.get(owningOrdIdx).add(scratch); + } + } + rarestPerOrd.set(owningOrdIdx, builtBuckets.toArray(StringRareTerms.Bucket[]::new)); + offset += bucketsInThisOwningBucketToCollect.size(); } } - rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(StringRareTerms.Bucket[]::new); - offset += bucketsInThisOwningBucketToCollect.size(); - } - } - /* - * Only merge/delete the ordinals if we have actually deleted one, - * to save on some redundant work. - */ - if (keepCount != mergeMap.length) { - LongUnaryOperator howToMerge = b -> mergeMap[(int) b]; - rewriteBuckets(offset, howToMerge); - if (deferringCollector() != null) { - ((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge); + /* + * Only merge/delete the ordinals if we have actually deleted one, + * to save on some redundant work. + */ + if (keepCount != mergeMap.size()) { + LongUnaryOperator howToMerge = mergeMap::get; + rewriteBuckets(offset, howToMerge); + if (deferringCollector() != null) { + ((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge); + } + } } - } - /* - * Now build the results! - */ - buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator()); - result[ordIdx] = new StringRareTerms( - name, - ORDER, - metadata(), - format, - Arrays.asList(rarestPerOrd[ordIdx]), - maxDocCount, - filters[ordIdx] - ); + /* + * Now build the results! + */ + buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + StringRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx); + Arrays.sort(buckets, ORDER.comparator()); + result[ordIdx] = new StringRareTerms( + name, + ORDER, + metadata(), + format, + Arrays.asList(buckets), + maxDocCount, + filters.get(ordIdx) + ); + } + return result; } - return result; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java index 8742136c86ec..0d767e356108 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/MetricsAggregator.java @@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.metrics; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.AggregatorBase; import org.elasticsearch.search.aggregations.AggregatorFactories; @@ -36,10 +37,10 @@ public abstract class MetricsAggregator extends AggregatorBase { public abstract InternalAggregation buildAggregation(long owningBucketOrd) throws IOException; @Override - public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - results[ordIdx] = buildAggregation(owningBucketOrds[ordIdx]); + public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = buildAggregation(owningBucketOrds.get(ordIdx)); } return results; } diff --git a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java index fff1990c2975..90e84acc7cad 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/profile/aggregation/ProfilingAggregator.java @@ -10,6 +10,7 @@ package org.elasticsearch.search.profile.aggregation; import org.apache.lucene.search.ScoreMode; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.InternalAggregation; @@ -68,7 +69,7 @@ public class ProfilingAggregator extends Aggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { Timer timer = profileBreakdown.getNewTimer(AggregationTimingType.BUILD_AGGREGATION); InternalAggregation[] result; timer.start(); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AdaptingAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AdaptingAggregatorTests.java index 125b2d20cf9f..6e9bb596e944 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AdaptingAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AdaptingAggregatorTests.java @@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceTestCase; @@ -113,7 +114,7 @@ public class AdaptingAggregatorTests extends MapperServiceTestCase { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) { return new InternalAggregation[] { null }; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorBaseTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorBaseTests.java index 8d3fe0f7f6e7..2d0622dbb632 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorBaseTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/AggregatorBaseTests.java @@ -15,6 +15,7 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperService; @@ -47,7 +48,7 @@ public class AggregatorBaseTests extends MapperServiceTestCase { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) { throw new UnsupportedOperationException(); } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollectorTests.java index 9b6ea7272d0f..e796cee92c0d 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BestBucketsDeferringCollectorTests.java @@ -28,6 +28,8 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.search.aggregations.AggregationExecutionContext; import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.BucketCollector; @@ -77,7 +79,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase { collector.preCollection(); indexSearcher.search(termQuery, collector.asCollector()); collector.postCollection(); - collector.prepareSelectedBuckets(0); + collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true)); assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size()); for (ScoreDoc scoreDoc : topDocs.scoreDocs) { @@ -91,7 +93,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase { collector.preCollection(); indexSearcher.search(new MatchAllDocsQuery(), collector.asCollector()); collector.postCollection(); - collector.prepareSelectedBuckets(0); + collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true)); assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size()); for (ScoreDoc scoreDoc : topDocs.scoreDocs) { @@ -141,7 +143,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase { } } }, (deferringCollector, finalCollector) -> { - deferringCollector.prepareSelectedBuckets(0, 8, 9); + deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9)); equalTo(Map.of(0L, List.of(0, 1, 2, 3, 4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9))); }); @@ -158,7 +160,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase { } } }, (deferringCollector, finalCollector) -> { - deferringCollector.prepareSelectedBuckets(0, 8, 9); + deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9)); assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9)))); }); @@ -176,12 +178,20 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase { } } }, (deferringCollector, finalCollector) -> { - deferringCollector.prepareSelectedBuckets(0, 8, 9); + deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9)); assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(0, 1, 2, 3), 1L, List.of(8), 2L, List.of(9)))); }); } + private LongArray toLongArray(long... lons) { + LongArray longArray = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(lons.length); + for (int i = 0; i < lons.length; i++) { + longArray.set(i, lons[i]); + } + return longArray; + } + private void testCase( BiFunction leafCollector, CheckedBiConsumer verify diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java index 80f27b31ca65..fb4c62ad66f1 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregatorTests.java @@ -16,6 +16,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.Query; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.NumberFieldMapper; @@ -72,7 +73,7 @@ public class BucketsAggregatorTests extends AggregatorTestCase { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) { return new InternalAggregation[0]; } diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java index 2df6a0cfb91c..a0a24e98ae72 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -68,7 +69,7 @@ public class BestDocsDeferringCollectorTests extends AggregatorTestCase { collector.preCollection(); indexSearcher.search(termQuery, collector.asCollector()); collector.postCollection(); - collector.prepareSelectedBuckets(0); + collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true)); assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size()); for (ScoreDoc scoreDoc : topDocs.scoreDocs) { diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java index 85882a5c5685..0c6e94a15ec3 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/multiterms/MultiTermsAggregator.java @@ -20,6 +20,8 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.LongArray; +import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.util.ObjectArrayPriorityQueue; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.Releasables; @@ -235,57 +237,62 @@ class MultiTermsAggregator extends DeferableBucketAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][]; - long[] otherDocCounts = new long[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]); + public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + try ( + LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true); + ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size()) + ) { + for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) { + final long owningBucketOrd = owningBucketOrds.get(ordIdx); + long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd); - int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); - try ( - ObjectArrayPriorityQueue ordered = new BucketPriorityQueue<>( - size, - bigArrays(), - partiallyBuiltBucketComparator - ) - ) { - InternalMultiTerms.Bucket spare = null; - BytesRef spareKey = null; - BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]); - while (ordsEnum.next()) { - long docCount = bucketDocCount(ordsEnum.ord()); - otherDocCounts[ordIdx] += docCount; - if (docCount < bucketCountThresholds.getShardMinDocCount()) { - continue; + int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize()); + try ( + ObjectArrayPriorityQueue ordered = new BucketPriorityQueue<>( + size, + bigArrays(), + partiallyBuiltBucketComparator + ) + ) { + InternalMultiTerms.Bucket spare = null; + BytesRef spareKey = null; + BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd); + while (ordsEnum.next()) { + long docCount = bucketDocCount(ordsEnum.ord()); + otherDocCounts.increment(ordIdx, docCount); + if (docCount < bucketCountThresholds.getShardMinDocCount()) { + continue; + } + if (spare == null) { + spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters); + spareKey = new BytesRef(); + } + ordsEnum.readValue(spareKey); + spare.terms = unpackTerms(spareKey); + spare.docCount = docCount; + spare.bucketOrd = ordsEnum.ord(); + spare = ordered.insertWithOverflow(spare); } - if (spare == null) { - spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters); - spareKey = new BytesRef(); - } - ordsEnum.readValue(spareKey); - spare.terms = unpackTerms(spareKey); - spare.docCount = docCount; - spare.bucketOrd = ordsEnum.ord(); - spare = ordered.insertWithOverflow(spare); - } - // Get the top buckets - InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()]; - topBucketsPerOrd[ordIdx] = bucketsForOrd; - for (int b = (int) ordered.size() - 1; b >= 0; --b) { - topBucketsPerOrd[ordIdx][b] = ordered.pop(); - otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount(); + // Get the top buckets + InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()]; + topBucketsPerOrd.set(ordIdx, bucketsForOrd); + for (int b = (int) ordered.size() - 1; b >= 0; --b) { + InternalMultiTerms.Bucket[] buckets = topBucketsPerOrd.get(ordIdx); + buckets[b] = ordered.pop(); + otherDocCounts.increment(ordIdx, -buckets[b].getDocCount()); + } } } - } - buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); + buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a); - InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { - result[ordIdx] = buildResult(otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]); + InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < result.length; ordIdx++) { + result[ordIdx] = buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx)); + } + return result; } - return result; } InternalMultiTerms buildResult(long otherDocCount, InternalMultiTerms.Bucket[] topBuckets) { @@ -305,7 +312,7 @@ class MultiTermsAggregator extends DeferableBucketAggregator { bucketCountThresholds.getShardSize(), showTermDocCountError, otherDocCount, - List.of(topBuckets), + Arrays.asList(topBuckets), 0, formats, keyConverters, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java index e55736cf4360..5b1ed7c954fe 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/categorization/CategorizeTextAggregator.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.util.BytesRefHash; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.ObjectArray; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.core.Releasables; @@ -110,31 +111,32 @@ public class CategorizeTextAggregator extends DeferableBucketAggregator { } @Override - public InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException { - Bucket[][] topBucketsPerOrd = new Bucket[ordsToCollect.length][]; - for (int ordIdx = 0; ordIdx < ordsToCollect.length; ordIdx++) { - final long ord = ordsToCollect[ordIdx]; - final TokenListCategorizer categorizer = (ord < categorizers.size()) ? categorizers.get(ord) : null; - if (categorizer == null) { - topBucketsPerOrd[ordIdx] = new Bucket[0]; - continue; + public InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException { + try (ObjectArray topBucketsPerOrd = bigArrays().newObjectArray(ordsToCollect.size())) { + for (long ordIdx = 0; ordIdx < ordsToCollect.size(); ordIdx++) { + final long ord = ordsToCollect.get(ordIdx); + final TokenListCategorizer categorizer = (ord < categorizers.size()) ? categorizers.get(ord) : null; + if (categorizer == null) { + topBucketsPerOrd.set(ordIdx, new Bucket[0]); + continue; + } + int size = (int) Math.min(bucketOrds.bucketsInOrd(ordIdx), bucketCountThresholds.getShardSize()); + topBucketsPerOrd.set(ordIdx, categorizer.toOrderedBuckets(size)); } - int size = (int) Math.min(bucketOrds.bucketsInOrd(ordIdx), bucketCountThresholds.getShardSize()); - topBucketsPerOrd[ordIdx] = categorizer.toOrderedBuckets(size); + buildSubAggsForAllBuckets(topBucketsPerOrd, Bucket::getBucketOrd, Bucket::setAggregations); + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(ordsToCollect.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { + results[ordIdx] = new InternalCategorizationAggregation( + name, + bucketCountThresholds.getRequiredSize(), + bucketCountThresholds.getMinDocCount(), + similarityThreshold, + metadata(), + Arrays.asList(topBucketsPerOrd.get(ordIdx)) + ); + } + return results; } - buildSubAggsForAllBuckets(topBucketsPerOrd, Bucket::getBucketOrd, Bucket::setAggregations); - InternalAggregation[] results = new InternalAggregation[ordsToCollect.length]; - for (int ordIdx = 0; ordIdx < ordsToCollect.length; ordIdx++) { - results[ordIdx] = new InternalCategorizationAggregation( - name, - bucketCountThresholds.getRequiredSize(), - bucketCountThresholds.getMinDocCount(), - similarityThreshold, - metadata(), - Arrays.asList(topBucketsPerOrd[ordIdx]) - ); - } - return results; } @Override diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/DelegatingCircuitBreakerService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/DelegatingCircuitBreakerService.java index 350f45afb9e1..1b28ebbb3eec 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/DelegatingCircuitBreakerService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/DelegatingCircuitBreakerService.java @@ -40,10 +40,12 @@ import java.util.function.Consumer; * At the time of writing circuit breakers are a global gauge.) * * After the map phase and before reduce, the {@link ItemSetMapReduceAggregator} creates instances of - * {@link InternalItemSetMapReduceAggregation}, see {@link ItemSetMapReduceAggregator#buildAggregations(long[])}. + * {@link InternalItemSetMapReduceAggregation}, see + * {@link ItemSetMapReduceAggregator#buildAggregations(org.elasticsearch.common.util.LongArray)}. * * (Note 1: Instead of keeping the existing instance, it would have been possible to deep-copy the object like - * {@link CardinalityAggregator#buildAggregations(long[])}. I decided against this approach mainly because the deep-copy isn't + * {@link CardinalityAggregator#buildAggregations(org.elasticsearch.common.util.LongArray)}. + * I decided against this approach mainly because the deep-copy isn't * secured by circuit breakers, meaning the node could run out of memory during the deep-copy.) * (Note 2: Between {@link ItemSetMapReduceAggregator#doClose()} and serializing {@link InternalItemSetMapReduceAggregation} * memory accounting is broken, meaning the agg context gets closed and bytes get returned to the circuit breaker before memory is diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java index 0f9555c77341..1a5e5d7a0790 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceAggregator.java @@ -17,6 +17,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongObjectPagedHashMap; import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; @@ -117,9 +118,9 @@ public abstract class ItemSetMapReduceAggregator< } @Override - public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException { - InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length]; - for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { + public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException { + InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())]; + for (int ordIdx = 0; ordIdx < results.length; ordIdx++) { results[ordIdx] = buildAggregation(ordIdx); }