Use LongArray instead of long[] for owning ordinals when building Internal aggregations (#116874)

This commit changes the signature of InternalAggregation#buildAggregations(long[]) to
InternalAggregation#buildAggregations(LongArray) to avoid allocations of humongous arrays.
This commit is contained in:
Ignacio Vera 2024-11-19 12:26:37 +01:00 committed by GitHub
parent 7ba63f26f0
commit 9296fb40ff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
49 changed files with 875 additions and 739 deletions

View file

@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -177,65 +178,66 @@ public class AdjacencyMatrixAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
// Buckets are ordered into groups - [keyed filters] [key1&key2 intersects]
int maxOrd = owningBucketOrds.length * totalNumKeys;
int totalBucketsToBuild = 0;
for (int ord = 0; ord < maxOrd; ord++) {
long maxOrd = owningBucketOrds.size() * totalNumKeys;
long totalBucketsToBuild = 0;
for (long ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
totalBucketsToBuild++;
}
}
long[] bucketOrdsToBuild = new long[totalBucketsToBuild];
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
bucketOrdsToBuild[builtBucketIndex++] = ord;
}
}
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < owningBucketOrds.length; owningBucketOrdIdx++) {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], i);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
keys[i],
docCount,
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
try (LongArray bucketOrdsToBuild = bigArrays().newLongArray(totalBucketsToBuild)) {
int builtBucketIndex = 0;
for (int ord = 0; ord < maxOrd; ord++) {
if (bucketDocCount(ord) > 0) {
bucketOrdsToBuild.set(builtBucketIndex++, ord);
}
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrds[owningBucketOrdIdx], pos);
assert builtBucketIndex == totalBucketsToBuild;
builtBucketIndex = 0;
var bucketSubAggs = buildSubAggsForBuckets(bucketOrdsToBuild);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int owningBucketOrdIdx = 0; owningBucketOrdIdx < results.length; owningBucketOrdIdx++) {
List<InternalAdjacencyMatrix.InternalBucket> buckets = new ArrayList<>(filters.length);
for (int i = 0; i < keys.length; i++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), i);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned due to potential for very sparse matrices
// Empty buckets are not returned because this aggregation will commonly be used under a
// a date-histogram where we will look for transactions over time and can expect many
// empty buckets.
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
intersectKey,
keys[i],
docCount,
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
pos++;
}
int pos = keys.length;
for (int i = 0; i < keys.length; i++) {
for (int j = i + 1; j < keys.length; j++) {
long bucketOrd = bucketOrd(owningBucketOrds.get(owningBucketOrdIdx), pos);
long docCount = bucketDocCount(bucketOrd);
// Empty buckets are not returned due to potential for very sparse matrices
if (docCount > 0) {
String intersectKey = keys[i] + separator + keys[j];
InternalAdjacencyMatrix.InternalBucket bucket = new InternalAdjacencyMatrix.InternalBucket(
intersectKey,
docCount,
bucketSubAggs.apply(builtBucketIndex++)
);
buckets.add(bucket);
}
pos++;
}
}
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
}
results[owningBucketOrdIdx] = new InternalAdjacencyMatrix(name, buckets, metadata());
assert builtBucketIndex == totalBucketsToBuild;
return results;
}
assert builtBucketIndex == totalBucketsToBuild;
return results;
}
@Override

View file

@ -141,7 +141,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator {
protected final InternalAggregation[] buildAggregations(
LongKeyedBucketOrds bucketOrds,
LongToIntFunction roundingIndexFor,
long[] owningBucketOrds
LongArray owningBucketOrds
) throws IOException {
return buildAggregationsForVariableBuckets(
owningBucketOrds,
@ -324,7 +324,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregations(bucketOrds, l -> roundingIdx, owningBucketOrds);
}
@ -594,7 +594,7 @@ abstract class AutoDateHistogramAggregator extends DeferableBucketAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
/*
* Rebucket before building the aggregation to build as small as result
* as possible.

View file

@ -11,6 +11,8 @@ package org.elasticsearch.aggregations.bucket.timeseries;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.mapper.RoutingPathFields;
@ -30,6 +32,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -67,42 +70,43 @@ public class TimeSeriesAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
BytesRef spare = new BytesRef();
InternalTimeSeries.InternalBucket[][] allBucketsPerOrd = new InternalTimeSeries.InternalBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
ordsEnum.readValue(spare);
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
docCount,
null,
keyed
);
bucket.bucketOrd = ordsEnum.ord();
buckets.add(bucket);
if (buckets.size() >= size) {
break;
try (ObjectArray<InternalTimeSeries.InternalBucket[]> allBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < allBucketsPerOrd.size(); ordIdx++) {
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
List<InternalTimeSeries.InternalBucket> buckets = new ArrayList<>();
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
ordsEnum.readValue(spare);
InternalTimeSeries.InternalBucket bucket = new InternalTimeSeries.InternalBucket(
BytesRef.deepCopyOf(spare), // Closing bucketOrds will corrupt the bytes ref, so need to make a deep copy here.
docCount,
null,
keyed
);
bucket.bucketOrd = ordsEnum.ord();
buckets.add(bucket);
if (buckets.size() >= size) {
break;
}
}
// NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp).
// _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value
// changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making
// the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore
// because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp).
buckets.sort(Comparator.comparing(bucket -> bucket.key));
allBucketsPerOrd.set(ordIdx, buckets.toArray(new InternalTimeSeries.InternalBucket[0]));
}
// NOTE: after introducing _tsid hashing time series are sorted by (_tsid hash, @timestamp) instead of (_tsid, timestamp).
// _tsid hash and _tsid might sort differently, and out of order data might result in incorrect buckets due to _tsid value
// changes not matching _tsid hash changes. Changes in _tsid hash are handled creating a new bucket as a result of making
// the assumption that sorting data results in new buckets whenever there is a change in _tsid hash. This is no true anymore
// because we collect data sorted on (_tsid hash, timestamp) but build aggregation results sorted by (_tsid, timestamp).
buckets.sort(Comparator.comparing(bucket -> bucket.key));
allBucketsPerOrd[ordIdx] = buckets.toArray(new InternalTimeSeries.InternalBucket[0]);
}
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
buildSubAggsForAllBuckets(allBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(allBucketsPerOrd[ordIdx]);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(allBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(allBucketsPerOrd.get(ordIdx));
}
return result;
}
return result;
}
@Override
@ -185,7 +189,7 @@ public class TimeSeriesAggregator extends BucketsAggregator {
}
InternalTimeSeries buildResult(InternalTimeSeries.InternalBucket[] topBuckets) {
return new InternalTimeSeries(name, List.of(topBuckets), keyed, metadata());
return new InternalTimeSeries(name, Arrays.asList(topBuckets), keyed, metadata());
}
@FunctionalInterface

View file

@ -9,6 +9,7 @@
package org.elasticsearch.join.aggregations;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
@ -44,7 +45,7 @@ public class ChildrenToParentAggregator extends ParentJoinAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalParent(

View file

@ -21,6 +21,7 @@ import org.apache.lucene.util.Bits;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@ -115,7 +116,7 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements
}
@Override
protected void prepareSubAggs(long[] ordsToCollect) throws IOException {
protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {
IndexReader indexReader = searcher().getIndexReader();
for (LeafReaderContext ctx : indexReader.leaves()) {
Scorer childDocsScorer = outFilter.scorer(ctx);
@ -153,9 +154,10 @@ public abstract class ParentJoinAggregator extends BucketsAggregator implements
* structure that maps a primitive long to a list of primitive
* longs.
*/
for (long owningBucketOrd : ordsToCollect) {
if (collectionStrategy.exists(owningBucketOrd, globalOrdinal)) {
collectBucket(sub, docId, owningBucketOrd);
for (long ord = 0; ord < ordsToCollect.size(); ord++) {
long ordToCollect = ordsToCollect.get(ord);
if (collectionStrategy.exists(ordToCollect, globalOrdinal)) {
collectBucket(sub, docId, ordToCollect);
}
}
}

View file

@ -9,6 +9,7 @@
package org.elasticsearch.join.aggregations;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
@ -40,7 +41,7 @@ public class ParentToChildrenAggregator extends ParentJoinAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalChildren(

View file

@ -27,6 +27,7 @@ import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexSettings;
@ -669,7 +670,7 @@ public class TransportSearchIT extends ESIntegTestCase {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) {
return new InternalAggregation[] { buildEmptyAggregation() };
}

View file

@ -10,6 +10,7 @@
package org.elasticsearch.search.aggregations;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.search.profile.aggregation.InternalAggregationProfileTree;
@ -98,10 +99,10 @@ public abstract class AdaptingAggregator extends Aggregator {
}
@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] delegateResults = delegate.buildAggregations(owningBucketOrds);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = adapt(delegateResults[ordIdx]);
}
return result;

View file

@ -13,6 +13,8 @@ import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.search.sort.SortOrder;
@ -142,7 +144,7 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
* @return the results for each ordinal, in the same order as the array
* of ordinals
*/
public abstract InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException;
public abstract InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException;
/**
* Release this aggregation and its sub-aggregations.
@ -153,11 +155,11 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
* Build the result of this aggregation if it is at the "top level"
* of the aggregation tree. If, instead, it is a sub-aggregation of
* another aggregation then the aggregation that contains it will call
* {@link #buildAggregations(long[])}.
* {@link #buildAggregations(LongArray)}.
*/
public final InternalAggregation buildTopLevel() throws IOException {
assert parent() == null;
return buildAggregations(new long[] { 0 })[0];
return buildAggregations(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true))[0];
}
/**

View file

@ -9,6 +9,7 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import java.io.IOException;
@ -39,9 +40,9 @@ public abstract class NonCollectingAggregator extends AggregatorBase {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildEmptyAggregation();
}
return results;

View file

@ -20,6 +20,7 @@ import org.apache.lucene.search.Weight;
import org.apache.lucene.util.packed.PackedInts;
import org.apache.lucene.util.packed.PackedLongValues;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -146,7 +147,7 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
* Replay the wrapped collector, but only on a selection of buckets.
*/
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
public void prepareSelectedBuckets(LongArray selectedBuckets) throws IOException {
if (finished == false) {
throw new IllegalStateException("Cannot replay yet, collection is not finished: postCollect() has not been called");
}
@ -154,9 +155,9 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
throw new IllegalStateException("Already been replayed");
}
this.selectedBuckets = new LongHash(selectedBuckets.length, BigArrays.NON_RECYCLING_INSTANCE);
for (long ord : selectedBuckets) {
this.selectedBuckets.add(ord);
this.selectedBuckets = new LongHash(selectedBuckets.size(), BigArrays.NON_RECYCLING_INSTANCE);
for (long i = 0; i < selectedBuckets.size(); i++) {
this.selectedBuckets.add(selectedBuckets.get(i));
}
boolean needsScores = scoreMode().needsScores();
@ -232,21 +233,22 @@ public class BestBucketsDeferringCollector extends DeferringBucketCollector {
* been collected directly.
*/
@Override
public Aggregator wrap(final Aggregator in) {
public Aggregator wrap(final Aggregator in, BigArrays bigArrays) {
return new WrappedAggregator(in) {
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
if (selectedBuckets == null) {
throw new IllegalStateException("Collection has not been replayed yet.");
}
long[] rebasedOrds = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
rebasedOrds[ordIdx] = selectedBuckets.find(owningBucketOrds[ordIdx]);
if (rebasedOrds[ordIdx] == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
try (LongArray rebasedOrds = bigArrays.newLongArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
rebasedOrds.set(ordIdx, selectedBuckets.find(owningBucketOrds.get(ordIdx)));
if (rebasedOrds.get(ordIdx) == -1) {
throw new IllegalStateException("Cannot build for a bucket which has not been collected");
}
}
return in.buildAggregations(rebasedOrds);
}
return in.buildAggregations(rebasedOrds);
}
};
}

View file

@ -10,7 +10,9 @@ package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.search.aggregations.AggregationErrors;
import org.elasticsearch.search.aggregations.Aggregator;
@ -155,22 +157,22 @@ public abstract class BucketsAggregator extends AggregatorBase {
/**
* Hook to allow taking an action before building the sub agg results.
*/
protected void prepareSubAggs(long[] ordsToCollect) throws IOException {}
protected void prepareSubAggs(LongArray ordsToCollect) throws IOException {}
/**
* Build the results of the sub-aggregations of the buckets at each of
* the provided ordinals.
* <p>
* Most aggregations should probably use something like
* {@link #buildSubAggsForAllBuckets(Object[][], ToLongFunction, BiConsumer)}
* or {@link #buildAggregationsForVariableBuckets(long[], LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
* or {@link #buildAggregationsForFixedBucketCount(long[], int, BucketBuilderForFixedCount, Function)}
* or {@link #buildAggregationsForSingleBucket(long[], SingleBucketResultBuilder)}
* {@link #buildSubAggsForAllBuckets(ObjectArray, ToLongFunction, BiConsumer)}
* or {@link #buildAggregationsForVariableBuckets(LongArray, LongKeyedBucketOrds, BucketBuilderForVariable, ResultBuilderForVariable)}
* or {@link #buildAggregationsForFixedBucketCount(LongArray, int, BucketBuilderForFixedCount, Function)}
* or {@link #buildAggregationsForSingleBucket(LongArray, SingleBucketResultBuilder)}
* instead of calling this directly.
* @return the sub-aggregation results in the same order as the provided
* array of ordinals
*/
protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(long[] bucketOrdsToCollect) throws IOException {
protected final IntFunction<InternalAggregations> buildSubAggsForBuckets(LongArray bucketOrdsToCollect) throws IOException {
prepareSubAggs(bucketOrdsToCollect);
InternalAggregation[][] aggregations = new InternalAggregation[subAggregators.length][];
for (int i = 0; i < subAggregators.length; i++) {
@ -204,26 +206,28 @@ public abstract class BucketsAggregator extends AggregatorBase {
* @param setAggs how to set the sub-aggregation results on a bucket
*/
protected final <B> void buildSubAggsForAllBuckets(
B[][] buckets,
ObjectArray<B[]> buckets,
ToLongFunction<B> bucketToOrd,
BiConsumer<B, InternalAggregations> setAggs
) throws IOException {
int totalBucketOrdsToCollect = 0;
for (B[] bucketsForOneResult : buckets) {
totalBucketOrdsToCollect += bucketsForOneResult.length;
long totalBucketOrdsToCollect = 0;
for (long b = 0; b < buckets.size(); b++) {
totalBucketOrdsToCollect += buckets.get(b).length;
}
long[] bucketOrdsToCollect = new long[totalBucketOrdsToCollect];
int s = 0;
for (B[] bucketsForOneResult : buckets) {
for (B bucket : bucketsForOneResult) {
bucketOrdsToCollect[s++] = bucketToOrd.applyAsLong(bucket);
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalBucketOrdsToCollect)) {
int s = 0;
for (long ord = 0; ord < buckets.size(); ord++) {
for (B bucket : buckets.get(ord)) {
bucketOrdsToCollect.set(s++, bucketToOrd.applyAsLong(bucket));
}
}
}
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
s = 0;
for (B[] bucket : buckets) {
for (int b = 0; b < bucket.length; b++) {
setAggs.accept(bucket[b], results.apply(s++));
var results = buildSubAggsForBuckets(bucketOrdsToCollect);
s = 0;
for (long ord = 0; ord < buckets.size(); ord++) {
for (B value : buckets.get(ord)) {
setAggs.accept(value, results.apply(s++));
}
}
}
}
@ -237,37 +241,38 @@ public abstract class BucketsAggregator extends AggregatorBase {
* @param resultBuilder how to build a result from buckets
*/
protected final <B> InternalAggregation[] buildAggregationsForFixedBucketCount(
long[] owningBucketOrds,
LongArray owningBucketOrds,
int bucketsPerOwningBucketOrd,
BucketBuilderForFixedCount<B> bucketBuilder,
Function<List<B>, InternalAggregation> resultBuilder
) throws IOException {
int totalBuckets = owningBucketOrds.length * bucketsPerOwningBucketOrd;
long[] bucketOrdsToCollect = new long[totalBuckets];
int bucketOrdIdx = 0;
for (long owningBucketOrd : owningBucketOrds) {
long ord = owningBucketOrd * bucketsPerOwningBucketOrd;
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
bucketOrdsToCollect[bucketOrdIdx++] = ord++;
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(owningBucketOrds.size() * bucketsPerOwningBucketOrd)) {
int bucketOrdIdx = 0;
for (long i = 0; i < owningBucketOrds.size(); i++) {
long ord = owningBucketOrds.get(i) * bucketsPerOwningBucketOrd;
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
bucketOrdsToCollect.set(bucketOrdIdx++, ord++);
}
}
}
bucketOrdIdx = 0;
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
buckets.add(
bucketBuilder.build(
offsetInOwningOrd,
bucketDocCount(bucketOrdsToCollect[bucketOrdIdx]),
subAggregationResults.apply(bucketOrdIdx++)
)
);
bucketOrdIdx = 0;
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int owningOrdIdx = 0; owningOrdIdx < results.length; owningOrdIdx++) {
List<B> buckets = new ArrayList<>(bucketsPerOwningBucketOrd);
for (int offsetInOwningOrd = 0; offsetInOwningOrd < bucketsPerOwningBucketOrd; offsetInOwningOrd++) {
buckets.add(
bucketBuilder.build(
offsetInOwningOrd,
bucketDocCount(bucketOrdsToCollect.get(bucketOrdIdx)),
subAggregationResults.apply(bucketOrdIdx++)
)
);
}
results[owningOrdIdx] = resultBuilder.apply(buckets);
}
results[owningOrdIdx] = resultBuilder.apply(buckets);
return results;
}
return results;
}
@FunctionalInterface
@ -280,17 +285,19 @@ public abstract class BucketsAggregator extends AggregatorBase {
* @param owningBucketOrds owning bucket ordinals for which to build the results
* @param resultBuilder how to build a result from the sub aggregation results
*/
protected final InternalAggregation[] buildAggregationsForSingleBucket(long[] owningBucketOrds, SingleBucketResultBuilder resultBuilder)
throws IOException {
protected final InternalAggregation[] buildAggregationsForSingleBucket(
LongArray owningBucketOrds,
SingleBucketResultBuilder resultBuilder
) throws IOException {
/*
* It'd be entirely reasonable to call
* `consumeBucketsAndMaybeBreak(owningBucketOrds.length)`
* here but we don't because single bucket aggs never have.
*/
var subAggregationResults = buildSubAggsForBuckets(owningBucketOrds);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], subAggregationResults.apply(ordIdx));
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = resultBuilder.build(owningBucketOrds.get(ordIdx), subAggregationResults.apply(ordIdx));
}
return results;
}
@ -307,54 +314,60 @@ public abstract class BucketsAggregator extends AggregatorBase {
* @param bucketOrds hash of values to the bucket ordinal
*/
protected final <B> InternalAggregation[] buildAggregationsForVariableBuckets(
long[] owningBucketOrds,
LongArray owningBucketOrds,
LongKeyedBucketOrds bucketOrds,
BucketBuilderForVariable<B> bucketBuilder,
ResultBuilderForVariable<B> resultBuilder
) throws IOException {
long totalOrdsToCollect = 0;
final int[] bucketsInOrd = new int[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
bucketsInOrd[ordIdx] = (int) bucketCount;
totalOrdsToCollect += bucketCount;
}
if (totalOrdsToCollect > Integer.MAX_VALUE) {
// TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is not
// much the user can do about that. If this occurs with any frequency, we should do something about it.
throw new IllegalArgumentException(
"Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
);
}
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (long owningBucketOrd : owningBucketOrds) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
bucketOrdsToCollect[b++] = ordsEnum.ord();
try (IntArray bucketsInOrd = bigArrays().newIntArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx));
bucketsInOrd.set(ordIdx, (int) bucketCount);
totalOrdsToCollect += bucketCount;
}
}
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
List<B> buckets = new ArrayList<>(bucketsInOrd[ordIdx]);
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (bucketOrdsToCollect[b] != ordsEnum.ord()) {
// If we hit this, something has gone horribly wrong and we need to investigate
throw AggregationErrors.iterationOrderChangedWithoutMutating(
bucketOrds.toString(),
ordsEnum.ord(),
bucketOrdsToCollect[b]
);
if (totalOrdsToCollect > Integer.MAX_VALUE) {
// TODO: We should instrument this error. While it is correct for it to be a 400 class IllegalArgumentException, there is
// not
// much the user can do about that. If this occurs with any frequency, we should do something about it.
throw new IllegalArgumentException(
"Can't collect more than [" + Integer.MAX_VALUE + "] buckets but attempted [" + totalOrdsToCollect + "]"
);
}
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
int b = 0;
for (long i = 0; i < owningBucketOrds.size(); i++) {
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
while (ordsEnum.next()) {
bucketOrdsToCollect.set(b++, ordsEnum.ord());
}
}
buckets.add(bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++)));
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
b = 0;
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
List<B> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
if (bucketOrdsToCollect.get(b) != ordsEnum.ord()) {
// If we hit this, something has gone horribly wrong and we need to investigate
throw AggregationErrors.iterationOrderChangedWithoutMutating(
bucketOrds.toString(),
ordsEnum.ord(),
bucketOrdsToCollect.get(b)
);
}
buckets.add(
bucketBuilder.build(ordsEnum.value(), bucketDocCount(ordsEnum.ord()), subAggregationResults.apply(b++))
);
}
results[ordIdx] = resultBuilder.build(owningBucketOrd, buckets);
}
return results;
}
results[ordIdx] = resultBuilder.build(owningBucketOrds[ordIdx], buckets);
}
return results;
}
@FunctionalInterface

View file

@ -9,6 +9,7 @@
package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;
@ -65,7 +66,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
}
deferredAggregations.add(subAggregators[i]);
deferredAggregationNames.add(subAggregators[i].name());
subAggregators[i] = deferringCollector.wrap(subAggregators[i]);
subAggregators[i] = deferringCollector.wrap(subAggregators[i], bigArrays());
} else {
collectors.add(subAggregators[i]);
}
@ -87,7 +88,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
/**
* Build the {@link DeferringBucketCollector}. The default implementation
* replays all hits against the buckets selected by
* {#link {@link DeferringBucketCollector#prepareSelectedBuckets(long...)}.
* {#link {@link DeferringBucketCollector#prepareSelectedBuckets(LongArray)}.
*/
protected DeferringBucketCollector buildDeferringCollector() {
return new BestBucketsDeferringCollector(topLevelQuery(), searcher(), descendsFromGlobalAggregator(parent()));
@ -107,7 +108,7 @@ public abstract class DeferableBucketAggregator extends BucketsAggregator {
}
@Override
protected final void prepareSubAggs(long[] bucketOrdsToCollect) throws IOException {
protected final void prepareSubAggs(LongArray bucketOrdsToCollect) throws IOException {
if (deferringCollector != null) {
deferringCollector.prepareSelectedBuckets(bucketOrdsToCollect);
}

View file

@ -10,6 +10,8 @@
package org.elasticsearch.search.aggregations.bucket;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketCollector;
@ -37,13 +39,13 @@ public abstract class DeferringBucketCollector extends BucketCollector {
/**
* Replay the deferred hits on the selected buckets.
*/
public abstract void prepareSelectedBuckets(long... selectedBuckets) throws IOException;
public abstract void prepareSelectedBuckets(LongArray selectedBuckets) throws IOException;
/**
* Wrap the provided aggregator so that it behaves (almost) as if it had
* been collected directly.
*/
public Aggregator wrap(final Aggregator in) {
public Aggregator wrap(final Aggregator in, BigArrays bigArrays) {
return new WrappedAggregator(in);
}
@ -80,7 +82,7 @@ public abstract class DeferringBucketCollector extends BucketCollector {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return in.buildAggregations(owningBucketOrds);
}

View file

@ -35,6 +35,7 @@ import org.apache.lucene.util.Bits;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Strings;
import org.elasticsearch.index.IndexSortConfig;
@ -184,50 +185,51 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
// Composite aggregator must be at the top of the aggregation tree
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0L;
assert owningBucketOrds.size() == 1 && owningBucketOrds.get(0) == 0L;
if (deferredCollectors != NO_OP_BUCKET_COLLECTOR) {
// Replay all documents that contain at least one top bucket (collected during the first pass).
runDeferredCollections();
}
int num = Math.min(size, (int) queue.size());
final int num = Math.min(size, (int) queue.size());
final InternalComposite.InternalBucket[] buckets = new InternalComposite.InternalBucket[num];
long[] bucketOrdsToCollect = new long[(int) queue.size()];
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect[i] = i;
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(queue.size())) {
for (int i = 0; i < queue.size(); i++) {
bucketOrdsToCollect.set(i, i);
}
var subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = subAggsForBuckets.apply(slot);
long docCount = queue.getDocCount(slot);
buckets[(int) queue.size()] = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
docCount,
aggs
);
}
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
name,
size,
sourceNames,
formats,
Arrays.asList(buckets),
lastBucket,
reverseMuls,
missingOrders,
earlyTerminated,
metadata()
) };
}
var subAggsForBuckets = buildSubAggsForBuckets(bucketOrdsToCollect);
while (queue.size() > 0) {
int slot = queue.pop();
CompositeKey key = queue.toCompositeKey(slot);
InternalAggregations aggs = subAggsForBuckets.apply(slot);
long docCount = queue.getDocCount(slot);
buckets[(int) queue.size()] = new InternalComposite.InternalBucket(
sourceNames,
formats,
key,
reverseMuls,
missingOrders,
docCount,
aggs
);
}
CompositeKey lastBucket = num > 0 ? buckets[num - 1].getRawKey() : null;
return new InternalAggregation[] {
new InternalComposite(
name,
size,
sourceNames,
formats,
Arrays.asList(buckets),
lastBucket,
reverseMuls,
missingOrders,
earlyTerminated,
metadata()
) };
}
@Override
@ -244,6 +246,7 @@ public final class CompositeAggregator extends BucketsAggregator implements Size
false,
metadata()
);
}
private void finishLeaf() {

View file

@ -13,6 +13,8 @@ import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@ -108,70 +110,80 @@ class CountedTermsAggregator extends TermsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
StringTerms.Bucket[][] topBucketsPerOrd = new StringTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
try (
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size());
ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
StringTerms.Bucket spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(new BytesRef(), 0, null, false, 0, format);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (spare == null) {
spare = emptyBucketBuilder.get();
// as users can't control sort order, in practice we'll always sort by doc count descending
try (
BucketPriorityQueue<StringTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
StringTerms.Bucket spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
Supplier<StringTerms.Bucket> emptyBucketBuilder = () -> new StringTerms.Bucket(
new BytesRef(),
0,
null,
false,
0,
format
);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (spare == null) {
spare = emptyBucketBuilder.get();
}
ordsEnum.readValue(spare.getTermBytes());
spare.setDocCount(docCount);
spare.setBucketOrd(ordsEnum.ord());
spare = ordered.insertWithOverflow(spare);
}
ordsEnum.readValue(spare.getTermBytes());
spare.setDocCount(docCount);
spare.setBucketOrd(ordsEnum.ord());
spare = ordered.insertWithOverflow(spare);
}
topBucketsPerOrd[ordIdx] = new StringTerms.Bucket[(int) ordered.size()];
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
topBucketsPerOrd[ordIdx][i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd[ordIdx][i].getTermBytes()));
topBucketsPerOrd.set(ordIdx, new StringTerms.Bucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());
topBucketsPerOrd.get(ordIdx)[i].setTermBytes(BytesRef.deepCopyOf(topBucketsPerOrd.get(ordIdx)[i].getTermBytes()));
}
}
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBucketsPerOrd[ordIdx], reduceOrder.comparator());
} else {
reduceOrder = order;
buildSubAggsForAllBuckets(topBucketsPerOrd, InternalTerms.Bucket::getBucketOrd, InternalTerms.Bucket::setAggregations);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
final BucketOrder reduceOrder;
if (isKeyOrder(order) == false) {
reduceOrder = InternalOrder.key(true);
Arrays.sort(topBucketsPerOrd.get(ordIdx), reduceOrder.comparator());
} else {
reduceOrder = order;
}
result[ordIdx] = new StringTerms(
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
false,
otherDocCounts.get(ordIdx),
Arrays.asList(topBucketsPerOrd.get(ordIdx)),
null
);
}
result[ordIdx] = new StringTerms(
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
false,
otherDocCounts[ordIdx],
Arrays.asList(topBucketsPerOrd[ordIdx]),
null
);
return result;
}
return result;
}
@Override

View file

@ -20,6 +20,7 @@ import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@ -208,7 +209,7 @@ public abstract class FiltersAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(
owningBucketOrds,
filters.size() + (otherBucketKey == null ? 0 : 1),

View file

@ -12,6 +12,8 @@ import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -132,39 +134,40 @@ public abstract class GeoGridAggregator<T extends InternalGeoGrid<?>> extends Bu
protected abstract InternalGeoGridBucket newEmptyBucket();
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalGeoGridBucket[][] topBucketsPerOrd = new InternalGeoGridBucket[owningBucketOrds.length][];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]), shardSize);
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
try (ObjectArray<InternalGeoGridBucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
int size = (int) Math.min(bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx)), shardSize);
try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, bigArrays())) {
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
try (BucketPriorityQueue<InternalGeoGridBucket> ordered = new BucketPriorityQueue<>(size, bigArrays())) {
InternalGeoGridBucket spare = null;
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
if (spare == null) {
spare = newEmptyBucket();
}
// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
// need a special function to keep the source bucket
// up-to-date so it can get the appropriate key
spare.hashAsLong = ordsEnum.value();
spare.docCount = bucketDocCount(ordsEnum.ord());
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
topBucketsPerOrd[ordIdx] = new InternalGeoGridBucket[(int) ordered.size()];
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
topBucketsPerOrd.set(ordIdx, new InternalGeoGridBucket[(int) ordered.size()]);
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
}
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd.get(ordIdx)), metadata());
}
return results;
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(name, requiredSize, Arrays.asList(topBucketsPerOrd[ordIdx]), metadata());
}
return results;
}
@Override

View file

@ -14,6 +14,7 @@ import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.Weight;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
@ -62,8 +63,8 @@ public final class GlobalAggregator extends BucketsAggregator implements SingleB
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
assert owningBucketOrds.length == 1 && owningBucketOrds[0] == 0 : "global aggregator can only be a top level aggregator";
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
assert owningBucketOrds.size() == 1 && owningBucketOrds.get(0) == 0 : "global aggregator can only be a top level aggregator";
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalGlobal(

View file

@ -10,6 +10,7 @@
package org.elasticsearch.search.aggregations.bucket.histogram;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
@ -79,7 +80,7 @@ public abstract class AbstractHistogramAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
double roundKey = Double.longBitsToDouble(bucketValue);
double key = roundKey * interval + offset;

View file

@ -17,6 +17,7 @@ import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.Rounding.DateTimeUnit;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
@ -337,7 +338,7 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(owningBucketOrds, bucketOrds, (bucketValue, docCount, subAggregationResults) -> {
return new InternalDateHistogram.Bucket(bucketValue, docCount, keyed, formatter, subAggregationResults);
}, (owningBucketOrd, buckets) -> {

View file

@ -12,6 +12,7 @@ import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
@ -163,7 +164,7 @@ class DateRangeHistogramAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForVariableBuckets(
owningBucketOrds,
bucketOrds,

View file

@ -14,6 +14,7 @@ import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.InPlaceMergeSorter;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -565,35 +566,36 @@ public class VariableWidthHistogramAggregator extends DeferableBucketAggregator
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
int numClusters = collector.finalNumBuckets();
long[] bucketOrdsToCollect = new long[numClusters];
for (int i = 0; i < numClusters; i++) {
bucketOrdsToCollect[i] = i;
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(numClusters)) {
for (int i = 0; i < numClusters; i++) {
bucketOrdsToCollect.set(i, i);
}
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
List<InternalVariableWidthHistogram.Bucket> buckets = new ArrayList<>(numClusters);
for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) {
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults.apply(bucketOrd)));
}
Function<List<InternalVariableWidthHistogram.Bucket>, InternalAggregation> resultBuilder = bucketsToFormat -> {
// The contract of the histogram aggregation is that shards must return
// buckets ordered by centroid in ascending order
CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator());
InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo(
buildEmptySubAggregations()
);
return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata());
};
return new InternalAggregation[] { resultBuilder.apply(buckets) };
}
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
List<InternalVariableWidthHistogram.Bucket> buckets = new ArrayList<>(numClusters);
for (int bucketOrd = 0; bucketOrd < numClusters; bucketOrd++) {
buckets.add(collector.buildBucket(bucketOrd, subAggregationResults.apply(bucketOrd)));
}
Function<List<InternalVariableWidthHistogram.Bucket>, InternalAggregation> resultBuilder = bucketsToFormat -> {
// The contract of the histogram aggregation is that shards must return
// buckets ordered by centroid in ascending order
CollectionUtil.introSort(bucketsToFormat, BucketOrder.key(true).comparator());
InternalVariableWidthHistogram.EmptyBucketInfo emptyBucketInfo = new InternalVariableWidthHistogram.EmptyBucketInfo(
buildEmptySubAggregations()
);
return new InternalVariableWidthHistogram(name, bucketsToFormat, emptyBucketInfo, numBuckets, formatter, metadata());
};
return new InternalAggregation[] { resultBuilder.apply(buckets) };
}
@Override

View file

@ -8,6 +8,7 @@
*/
package org.elasticsearch.search.aggregations.bucket.missing;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.DocValueBits;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -67,7 +68,7 @@ public class MissingAggregator extends BucketsAggregator implements SingleBucket
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalMissing(

View file

@ -21,6 +21,7 @@ import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.NestedObjectMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -124,7 +125,7 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalNested(

View file

@ -13,6 +13,7 @@ import org.apache.lucene.search.Query;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.NestedObjectMapper;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
@ -86,7 +87,7 @@ public class ReverseNestedAggregator extends BucketsAggregator implements Single
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalReverseNested(

View file

@ -12,6 +12,8 @@ package org.elasticsearch.search.aggregations.bucket.prefix;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
@ -160,57 +162,63 @@ public final class IpPrefixAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
long totalOrdsToCollect = 0;
final int[] bucketsInOrd = new int[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
bucketsInOrd[ordIdx] = (int) bucketCount;
totalOrdsToCollect += bucketCount;
}
long[] bucketOrdsToCollect = new long[(int) totalOrdsToCollect];
int b = 0;
for (long owningBucketOrd : owningBucketOrds) {
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
bucketOrdsToCollect[b++] = ordsEnum.ord();
try (IntArray bucketsInOrd = bigArrays().newIntArray(owningBucketOrds.size())) {
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
final long bucketCount = bucketOrds.bucketsInOrd(owningBucketOrds.get(ordIdx));
bucketsInOrd.set(ordIdx, (int) bucketCount);
totalOrdsToCollect += bucketCount;
}
}
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
b = 0;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
List<InternalIpPrefix.Bucket> buckets = new ArrayList<>(bucketsInOrd[ordIdx]);
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long ordinal = ordsEnum.ord();
if (bucketOrdsToCollect[b] != ordinal) {
throw AggregationErrors.iterationOrderChangedWithoutMutating(bucketOrds.toString(), ordinal, bucketOrdsToCollect[b]);
try (LongArray bucketOrdsToCollect = bigArrays().newLongArray(totalOrdsToCollect)) {
int b = 0;
for (long i = 0; i < owningBucketOrds.size(); i++) {
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(i));
while (ordsEnum.next()) {
bucketOrdsToCollect.set(b++, ordsEnum.ord());
}
}
BytesRef ipAddress = new BytesRef();
ordsEnum.readValue(ipAddress);
long docCount = bucketDocCount(ordinal);
buckets.add(
new InternalIpPrefix.Bucket(
config.format(),
BytesRef.deepCopyOf(ipAddress),
keyed,
ipPrefix.isIpv6,
ipPrefix.prefixLength,
ipPrefix.appendPrefixLength,
docCount,
subAggregationResults.apply(b++)
)
);
// NOTE: the aggregator is expected to return sorted results
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
var subAggregationResults = buildSubAggsForBuckets(bucketOrdsToCollect);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
b = 0;
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
List<InternalIpPrefix.Bucket> buckets = new ArrayList<>(bucketsInOrd.get(ordIdx));
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds.get(ordIdx));
while (ordsEnum.next()) {
long ordinal = ordsEnum.ord();
if (bucketOrdsToCollect.get(b) != ordinal) {
throw AggregationErrors.iterationOrderChangedWithoutMutating(
bucketOrds.toString(),
ordinal,
bucketOrdsToCollect.get(b)
);
}
BytesRef ipAddress = new BytesRef();
ordsEnum.readValue(ipAddress);
long docCount = bucketDocCount(ordinal);
buckets.add(
new InternalIpPrefix.Bucket(
config.format(),
BytesRef.deepCopyOf(ipAddress),
keyed,
ipPrefix.isIpv6,
ipPrefix.prefixLength,
ipPrefix.appendPrefixLength,
docCount,
subAggregationResults.apply(b++)
)
);
// NOTE: the aggregator is expected to return sorted results
CollectionUtil.introSort(buckets, BucketOrder.key(true).comparator());
}
results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata());
}
return results;
}
results[ordIdx] = new InternalIpPrefix(name, config.format(), keyed, minDocCount, buckets, metadata());
}
return results;
}
@Override

View file

@ -14,6 +14,7 @@ import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.search.DocValueFormat;
@ -359,7 +360,7 @@ public final class BinaryRangeAggregator extends BucketsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(
owningBucketOrds,
ranges.length,

View file

@ -14,6 +14,7 @@ import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
@ -531,7 +532,7 @@ public abstract class RangeAggregator extends BucketsAggregator {
@Override
@SuppressWarnings("unchecked")
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForFixedBucketCount(
owningBucketOrds,
ranges.length,

View file

@ -19,6 +19,7 @@ import org.apache.lucene.search.TopScoreDocCollectorManager;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -120,7 +121,7 @@ public class BestDocsDeferringCollector extends DeferringBucketCollector impleme
}
@Override
public void prepareSelectedBuckets(long... selectedBuckets) throws IOException {
public void prepareSelectedBuckets(LongArray selectedBuckets) {
// no-op - deferred aggs processed in postCollection call
}

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler;
import org.apache.lucene.misc.search.DiversifiedTopDocsCollector;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@ -212,7 +213,7 @@ public class SamplerAggregator extends DeferableBucketAggregator implements Sing
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalSampler(

View file

@ -15,6 +15,7 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -60,7 +61,7 @@ public class RandomSamplerAggregator extends BucketsAggregator implements Single
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return buildAggregationsForSingleBucket(
owningBucketOrds,
(owningBucketOrd, subAggregationResults) -> new InternalRandomSampler(

View file

@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
@ -191,7 +192,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
}
@ -696,61 +697,66 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
B extends InternalMultiBucketAggregation.InternalBucket,
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
if (valueCount == 0) { // no context in this reader
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildNoValuesResult(owningBucketOrds[ordIdx]);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildNoValuesResult(owningBucketOrds.get(ordIdx));
}
return results;
}
try (
LongArray otherDocCount = bigArrays().newLongArray(owningBucketOrds.size(), true);
ObjectArray<B[]> topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.size())
) {
GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd;
for (long ordIdx = 0; ordIdx < topBucketsPreOrd.size(); ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
try (ObjectArrayPriorityQueue<TB> ordered = buildPriorityQueue(size)) {
final long finalOrdIdx = ordIdx;
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
BucketUpdater<TB> updater = bucketUpdater(owningBucketOrd, lookupGlobalOrd);
collectionStrategy.forEach(owningBucketOrd, new BucketInfoConsumer() {
TB spare = null;
B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCount = new long[owningBucketOrds.length];
GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd;
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
}
try (ObjectArrayPriorityQueue<TB> ordered = buildPriorityQueue(size)) {
final int finalOrdIdx = ordIdx;
BucketUpdater<TB> updater = bucketUpdater(owningBucketOrds[ordIdx], lookupGlobalOrd);
collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() {
TB spare = null;
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount.increment(finalOrdIdx, docCount);
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
updater.updateBucket(spare, globalOrd, bucketOrd, docCount);
spare = ordered.insertWithOverflow(spare);
}
updater.updateBucket(spare, globalOrd, bucketOrd, docCount);
spare = ordered.insertWithOverflow(spare);
}
}
});
});
// Get the top buckets
topBucketsPreOrd[ordIdx] = buildBuckets((int) ordered.size());
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd);
otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount();
// Get the top buckets
topBucketsPreOrd.set(ordIdx, buildBuckets((int) ordered.size()));
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
B bucket = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd);
topBucketsPreOrd.get(ordIdx)[i] = bucket;
otherDocCount.increment(ordIdx, -bucket.getDocCount());
}
}
}
}
buildSubAggs(topBucketsPreOrd);
buildSubAggs(topBucketsPreOrd);
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCount[ordIdx], topBucketsPreOrd[ordIdx]);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(topBucketsPreOrd.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCount.get(ordIdx), topBucketsPreOrd.get(ordIdx));
}
return results;
}
return results;
}
/**
@ -785,7 +791,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
abstract ObjectArray<B[]> buildTopBucketsPerOrd(long size);
/**
* Build an array of buckets for a particular ordinal to collect the
@ -802,7 +808,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[][] topBucketsPreOrd) throws IOException;
abstract void buildSubAggs(ObjectArray<B[]> topBucketsPreOrd) throws IOException;
/**
* Turn the buckets into an aggregation result.
@ -841,8 +847,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new StringTerms.Bucket[size][];
ObjectArray<StringTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -879,7 +885,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void buildSubAggs(StringTerms.Bucket[][] topBucketsPreOrd) throws IOException {
void buildSubAggs(ObjectArray<StringTerms.Bucket[]> topBucketsPreOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@ -973,8 +979,8 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantStringTerms.Bucket[size][];
ObjectArray<SignificantStringTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -1026,7 +1032,7 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
}
@Override
void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOException {
void buildSubAggs(ObjectArray<SignificantStringTerms.Bucket[]> topBucketsPreOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPreOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}

View file

@ -62,7 +62,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
long supersetSize;
/**
* Ordinal of the bucket while it is being built. Not used after it is
* returned from {@link Aggregator#buildAggregations(long[])} and not
* returned from {@link Aggregator#buildAggregations(org.elasticsearch.common.util.LongArray)} and not
* serialized.
*/
transient long bucketOrd;

View file

@ -12,7 +12,9 @@ import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongHash;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.DocValueFormat;
@ -118,70 +120,75 @@ public class LongRareTermsAggregator extends AbstractRareTermsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
/*
* Collect the list of buckets, populate the filter with terms
* that are too frequent, and figure out how to merge sub-buckets.
*/
LongRareTerms.Bucket[][] rarestPerOrd = new LongRareTerms.Bucket[owningBucketOrds.length][];
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
long keepCount = 0;
long[] mergeMap = new long[(int) bucketOrds.size()];
Arrays.fill(mergeMap, -1);
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, bigArrays())) {
filters[owningOrdIdx] = newFilter();
List<LongRareTerms.Bucket> builtBuckets = new ArrayList<>();
LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
while (collectedBuckets.next()) {
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value());
mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd;
builtBuckets.add(bucket);
keepCount++;
} else {
filters[owningOrdIdx].add(collectedBuckets.value());
try (
ObjectArray<LongRareTerms.Bucket[]> rarestPerOrd = bigArrays().newObjectArray(owningBucketOrds.size());
ObjectArray<SetBackedScalingCuckooFilter> filters = bigArrays().newObjectArray(owningBucketOrds.size())
) {
try (LongArray mergeMap = bigArrays().newLongArray(bucketOrds.size())) {
mergeMap.fill(0, mergeMap.size(), -1);
long keepCount = 0;
long offset = 0;
for (long owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.size(); owningOrdIdx++) {
try (LongHash bucketsInThisOwningBucketToCollect = new LongHash(1, bigArrays())) {
filters.set(owningOrdIdx, newFilter());
List<LongRareTerms.Bucket> builtBuckets = new ArrayList<>();
LongKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds.get(owningOrdIdx));
while (collectedBuckets.next()) {
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
LongRareTerms.Bucket bucket = new LongRareTerms.Bucket(collectedBuckets.value(), docCount, null, format);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(collectedBuckets.value());
mergeMap.set(collectedBuckets.ord(), bucket.bucketOrd);
builtBuckets.add(bucket);
keepCount++;
} else {
filters.get(owningOrdIdx).add(collectedBuckets.value());
}
}
rarestPerOrd.set(owningOrdIdx, builtBuckets.toArray(LongRareTerms.Bucket[]::new));
offset += bucketsInThisOwningBucketToCollect.size();
}
}
rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(LongRareTerms.Bucket[]::new);
offset += bucketsInThisOwningBucketToCollect.size();
}
}
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.length) {
LongUnaryOperator howToMerge = b -> mergeMap[(int) b];
rewriteBuckets(offset, howToMerge);
if (deferringCollector() != null) {
((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge);
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.size()) {
LongUnaryOperator howToMerge = mergeMap::get;
rewriteBuckets(offset, howToMerge);
if (deferringCollector() != null) {
((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge);
}
}
}
}
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
result[ordIdx] = new LongRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(rarestPerOrd[ordIdx]),
maxDocCount,
filters[ordIdx]
);
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
LongRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx);
Arrays.sort(buckets, ORDER.comparator());
result[ordIdx] = new LongRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(buckets),
maxDocCount,
filters.get(ordIdx)
);
}
return result;
}
return result;
}
@Override

View file

@ -18,6 +18,7 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -117,7 +118,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
}
@ -282,45 +283,49 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
implements
Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx], excludeDeletedDocs);
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
try (
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true);
ObjectArray<B[]> topBucketsPerOrd = buildTopBucketsPerOrd(Math.toIntExact(owningBucketOrds.size()))
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
long owningOrd = owningBucketOrds.get(ordIdx);
collectZeroDocEntriesIfNeeded(owningOrd, excludeDeletedDocs);
int size = (int) Math.min(bucketOrds.size(), bucketCountThresholds.getShardSize());
try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
B spare = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningOrd);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningOrd);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
topBucketsPerOrd[ordIdx] = buildBuckets((int) ordered.size());
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd[ordIdx][i] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][i].getDocCount();
finalizeBucket(topBucketsPerOrd[ordIdx][i]);
topBucketsPerOrd.set(ordIdx, buildBuckets((int) ordered.size()));
for (int i = (int) ordered.size() - 1; i >= 0; --i) {
topBucketsPerOrd.get(ordIdx)[i] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[i].getDocCount());
finalizeBucket(topBucketsPerOrd.get(ordIdx)[i]);
}
}
}
}
buildSubAggs(topBucketsPerOrd);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
buildSubAggs(topBucketsPerOrd);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx));
}
return result;
}
return result;
}
/**
@ -361,7 +366,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
abstract ObjectArray<B[]> buildTopBucketsPerOrd(long size);
/**
* Build an array of buckets for a particular ordinal to collect the
@ -379,7 +384,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException;
abstract void buildSubAggs(ObjectArray<B[]> topBucketsPerOrd) throws IOException;
/**
* Turn the buckets into an aggregation result.
@ -501,8 +506,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
}
@Override
StringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new StringTerms.Bucket[size][];
ObjectArray<StringTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -521,7 +526,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
}
@Override
void buildSubAggs(StringTerms.Bucket[][] topBucketsPerOrd) throws IOException {
void buildSubAggs(ObjectArray<StringTerms.Bucket[]> topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
}
@ -637,8 +642,8 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
}
@Override
SignificantStringTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantStringTerms.Bucket[size][];
ObjectArray<SignificantStringTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -657,7 +662,7 @@ public final class MapStringTermsAggregator extends AbstractStringTermsAggregato
}
@Override
void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPerOrd) throws IOException {
void buildSubAggs(ObjectArray<SignificantStringTerms.Bucket[]> topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
}

View file

@ -15,6 +15,7 @@ import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.NumericUtils;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -39,7 +40,6 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
@ -136,7 +136,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
return resultStrategy.buildAggregations(owningBucketOrds);
}
@ -163,48 +163,52 @@ public final class NumericTermsAggregator extends TermsAggregator {
abstract class ResultStrategy<R extends InternalAggregation, B extends InternalMultiBucketAggregation.InternalBucket>
implements
Releasable {
private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
B[][] topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.length);
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
collectZeroDocEntriesIfNeeded(owningBucketOrds[ordIdx], excludeDeletedDocs);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
private InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
try (
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true);
ObjectArray<B[]> topBucketsPerOrd = buildTopBucketsPerOrd(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < topBucketsPerOrd.size(); ordIdx++) {
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
collectZeroDocEntriesIfNeeded(owningBucketOrd, excludeDeletedDocs);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd);
int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
B spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
try (ObjectArrayPriorityQueue<B> ordered = buildPriorityQueue(size)) {
B spare = null;
BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
Supplier<B> emptyBucketBuilder = emptyBucketBuilder(owningBucketOrd);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
if (spare == null) {
spare = emptyBucketBuilder.get();
}
updateBucket(spare, ordsEnum, docCount);
spare = ordered.insertWithOverflow(spare);
}
// Get the top buckets
B[] bucketsForOrd = buildBuckets((int) ordered.size());
topBucketsPerOrd[ordIdx] = bucketsForOrd;
for (int b = (int) ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
// Get the top buckets
B[] bucketsForOrd = buildBuckets((int) ordered.size());
topBucketsPerOrd.set(ordIdx, bucketsForOrd);
for (int b = (int) ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd.get(ordIdx)[b] = ordered.pop();
otherDocCounts.increment(ordIdx, -topBucketsPerOrd.get(ordIdx)[b].getDocCount());
}
}
}
}
buildSubAggs(topBucketsPerOrd);
buildSubAggs(topBucketsPerOrd);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds[ordIdx], otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(topBucketsPerOrd.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(owningBucketOrds.get(ordIdx), otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx));
}
return result;
}
return result;
}
/**
@ -227,7 +231,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
/**
* Build an array to hold the "top" buckets for each ordinal.
*/
abstract B[][] buildTopBucketsPerOrd(int size);
abstract ObjectArray<B[]> buildTopBucketsPerOrd(long size);
/**
* Build an array of buckets for a particular ordinal. These arrays
@ -258,7 +262,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
* Build the sub-aggregations into the buckets. This will usually
* delegate to {@link #buildSubAggsForAllBuckets}.
*/
abstract void buildSubAggs(B[][] topBucketsPerOrd) throws IOException;
abstract void buildSubAggs(ObjectArray<B[]> topBucketsPerOrd) throws IOException;
/**
* Collect extra entries for "zero" hit documents if they were requested
@ -297,7 +301,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
final void buildSubAggs(B[][] topBucketsPerOrd) throws IOException {
final void buildSubAggs(ObjectArray<B[]> topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@ -356,8 +360,8 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
LongTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new LongTerms.Bucket[size][];
ObjectArray<LongTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -397,7 +401,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
Arrays.asList(topBuckets),
null
);
}
@ -438,8 +442,8 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
DoubleTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new DoubleTerms.Bucket[size][];
ObjectArray<DoubleTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -479,7 +483,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
Arrays.asList(topBuckets),
null
);
}
@ -551,8 +555,8 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
SignificantLongTerms.Bucket[][] buildTopBucketsPerOrd(int size) {
return new SignificantLongTerms.Bucket[size][];
ObjectArray<SignificantLongTerms.Bucket[]> buildTopBucketsPerOrd(long size) {
return bigArrays().newObjectArray(size);
}
@Override
@ -583,7 +587,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
}
@Override
void buildSubAggs(SignificantLongTerms.Bucket[][] topBucketsPerOrd) throws IOException {
void buildSubAggs(ObjectArray<SignificantLongTerms.Bucket[]> topBucketsPerOrd) throws IOException {
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
}
@ -601,7 +605,7 @@ public final class NumericTermsAggregator extends TermsAggregator {
subsetSizes.get(owningBucketOrd),
supersetSize,
significanceHeuristic,
List.of(topBuckets)
Arrays.asList(topBuckets)
);
}

View file

@ -12,6 +12,8 @@ import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.fielddata.FieldData;
@ -119,72 +121,82 @@ public class StringRareTermsAggregator extends AbstractRareTermsAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
/*
* Collect the list of buckets, populate the filter with terms
* that are too frequent, and figure out how to merge sub-buckets.
*/
StringRareTerms.Bucket[][] rarestPerOrd = new StringRareTerms.Bucket[owningBucketOrds.length][];
SetBackedScalingCuckooFilter[] filters = new SetBackedScalingCuckooFilter[owningBucketOrds.length];
long keepCount = 0;
long[] mergeMap = new long[(int) bucketOrds.size()];
Arrays.fill(mergeMap, -1);
long offset = 0;
for (int owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.length; owningOrdIdx++) {
try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(1, bigArrays())) {
filters[owningOrdIdx] = newFilter();
List<StringRareTerms.Bucket> builtBuckets = new ArrayList<>();
BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds[owningOrdIdx]);
BytesRef scratch = new BytesRef();
while (collectedBuckets.next()) {
collectedBuckets.readValue(scratch);
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(BytesRef.deepCopyOf(scratch), docCount, null, format);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(scratch);
mergeMap[(int) collectedBuckets.ord()] = bucket.bucketOrd;
builtBuckets.add(bucket);
keepCount++;
} else {
filters[owningOrdIdx].add(scratch);
try (
ObjectArray<StringRareTerms.Bucket[]> rarestPerOrd = bigArrays().newObjectArray(owningBucketOrds.size());
ObjectArray<SetBackedScalingCuckooFilter> filters = bigArrays().newObjectArray(owningBucketOrds.size())
) {
try (LongArray mergeMap = bigArrays().newLongArray(bucketOrds.size())) {
mergeMap.fill(0, mergeMap.size(), -1);
long keepCount = 0;
long offset = 0;
for (long owningOrdIdx = 0; owningOrdIdx < owningBucketOrds.size(); owningOrdIdx++) {
try (BytesRefHash bucketsInThisOwningBucketToCollect = new BytesRefHash(1, bigArrays())) {
filters.set(owningOrdIdx, newFilter());
List<StringRareTerms.Bucket> builtBuckets = new ArrayList<>();
BytesKeyedBucketOrds.BucketOrdsEnum collectedBuckets = bucketOrds.ordsEnum(owningBucketOrds.get(owningOrdIdx));
BytesRef scratch = new BytesRef();
while (collectedBuckets.next()) {
collectedBuckets.readValue(scratch);
long docCount = bucketDocCount(collectedBuckets.ord());
// if the key is below threshold, reinsert into the new ords
if (docCount <= maxDocCount) {
StringRareTerms.Bucket bucket = new StringRareTerms.Bucket(
BytesRef.deepCopyOf(scratch),
docCount,
null,
format
);
bucket.bucketOrd = offset + bucketsInThisOwningBucketToCollect.add(scratch);
mergeMap.set(collectedBuckets.ord(), bucket.bucketOrd);
builtBuckets.add(bucket);
keepCount++;
} else {
filters.get(owningOrdIdx).add(scratch);
}
}
rarestPerOrd.set(owningOrdIdx, builtBuckets.toArray(StringRareTerms.Bucket[]::new));
offset += bucketsInThisOwningBucketToCollect.size();
}
}
rarestPerOrd[owningOrdIdx] = builtBuckets.toArray(StringRareTerms.Bucket[]::new);
offset += bucketsInThisOwningBucketToCollect.size();
}
}
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.length) {
LongUnaryOperator howToMerge = b -> mergeMap[(int) b];
rewriteBuckets(offset, howToMerge);
if (deferringCollector() != null) {
((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge);
/*
* Only merge/delete the ordinals if we have actually deleted one,
* to save on some redundant work.
*/
if (keepCount != mergeMap.size()) {
LongUnaryOperator howToMerge = mergeMap::get;
rewriteBuckets(offset, howToMerge);
if (deferringCollector() != null) {
((BestBucketsDeferringCollector) deferringCollector()).rewriteBuckets(howToMerge);
}
}
}
}
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Arrays.sort(rarestPerOrd[ordIdx], ORDER.comparator());
result[ordIdx] = new StringRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(rarestPerOrd[ordIdx]),
maxDocCount,
filters[ordIdx]
);
/*
* Now build the results!
*/
buildSubAggsForAllBuckets(rarestPerOrd, b -> b.bucketOrd, (b, aggs) -> b.aggregations = aggs);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
StringRareTerms.Bucket[] buckets = rarestPerOrd.get(ordIdx);
Arrays.sort(buckets, ORDER.comparator());
result[ordIdx] = new StringRareTerms(
name,
ORDER,
metadata(),
format,
Arrays.asList(buckets),
maxDocCount,
filters.get(ordIdx)
);
}
return result;
}
return result;
}
@Override

View file

@ -9,6 +9,7 @@
package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorBase;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -36,10 +37,10 @@ public abstract class MetricsAggregator extends AggregatorBase {
public abstract InternalAggregation buildAggregation(long owningBucketOrd) throws IOException;
@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
results[ordIdx] = buildAggregation(owningBucketOrds[ordIdx]);
public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildAggregation(owningBucketOrds.get(ordIdx));
}
return results;
}

View file

@ -10,6 +10,7 @@
package org.elasticsearch.search.profile.aggregation;
import org.apache.lucene.search.ScoreMode;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -68,7 +69,7 @@ public class ProfilingAggregator extends Aggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
Timer timer = profileBreakdown.getNewTimer(AggregationTimingType.BUILD_AGGREGATION);
InternalAggregation[] result;
timer.start();

View file

@ -9,6 +9,7 @@
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.CheckedFunction;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperServiceTestCase;
@ -113,7 +114,7 @@ public class AdaptingAggregatorTests extends MapperServiceTestCase {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) {
return new InternalAggregation[] { null };
}

View file

@ -15,6 +15,7 @@ import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
@ -47,7 +48,7 @@ public class AggregatorBaseTests extends MapperServiceTestCase {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) {
throw new UnsupportedOperationException();
}

View file

@ -28,6 +28,8 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketCollector;
@ -77,7 +79,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);
collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true));
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
@ -91,7 +93,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(new MatchAllDocsQuery(), collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);
collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true));
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
@ -141,7 +143,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9));
equalTo(Map.of(0L, List.of(0, 1, 2, 3, 4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9)));
});
@ -158,7 +160,7 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9));
assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(4, 5, 6, 7), 1L, List.of(8), 2L, List.of(9))));
});
@ -176,12 +178,20 @@ public class BestBucketsDeferringCollectorTests extends AggregatorTestCase {
}
}
}, (deferringCollector, finalCollector) -> {
deferringCollector.prepareSelectedBuckets(0, 8, 9);
deferringCollector.prepareSelectedBuckets(toLongArray(0, 8, 9));
assertThat(finalCollector.collection, equalTo(Map.of(0L, List.of(0, 1, 2, 3), 1L, List.of(8), 2L, List.of(9))));
});
}
private LongArray toLongArray(long... lons) {
LongArray longArray = BigArrays.NON_RECYCLING_INSTANCE.newLongArray(lons.length);
for (int i = 0; i < lons.length; i++) {
longArray.set(i, lons[i]);
}
return longArray;
}
private void testCase(
BiFunction<BestBucketsDeferringCollector, LeafBucketCollector, LeafBucketCollector> leafCollector,
CheckedBiConsumer<BestBucketsDeferringCollector, CollectingBucketCollector, IOException> verify

View file

@ -16,6 +16,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper;
@ -72,7 +73,7 @@ public class BucketsAggregatorTests extends AggregatorTestCase {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) {
return new InternalAggregation[0];
}

View file

@ -22,6 +22,7 @@ import org.apache.lucene.search.TopDocs;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
@ -68,7 +69,7 @@ public class BestDocsDeferringCollectorTests extends AggregatorTestCase {
collector.preCollection();
indexSearcher.search(termQuery, collector.asCollector());
collector.postCollection();
collector.prepareSelectedBuckets(0);
collector.prepareSelectedBuckets(BigArrays.NON_RECYCLING_INSTANCE.newLongArray(1, true));
assertEquals(topDocs.scoreDocs.length, deferredCollectedDocIds.size());
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {

View file

@ -20,6 +20,8 @@ import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.util.ObjectArrayPriorityQueue;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Releasables;
@ -235,57 +237,62 @@ class MultiTermsAggregator extends DeferableBucketAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalMultiTerms.Bucket[][] topBucketsPerOrd = new InternalMultiTerms.Bucket[owningBucketOrds.length][];
long[] otherDocCounts = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrds[ordIdx]);
public InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
try (
LongArray otherDocCounts = bigArrays().newLongArray(owningBucketOrds.size(), true);
ObjectArray<InternalMultiTerms.Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(owningBucketOrds.size())
) {
for (long ordIdx = 0; ordIdx < owningBucketOrds.size(); ordIdx++) {
final long owningBucketOrd = owningBucketOrds.get(ordIdx);
long bucketsInOrd = bucketOrds.bucketsInOrd(owningBucketOrd);
int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
try (
ObjectArrayPriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
InternalMultiTerms.Bucket spare = null;
BytesRef spareKey = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrds[ordIdx]);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts[ordIdx] += docCount;
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
int size = (int) Math.min(bucketsInOrd, bucketCountThresholds.getShardSize());
try (
ObjectArrayPriorityQueue<InternalMultiTerms.Bucket> ordered = new BucketPriorityQueue<>(
size,
bigArrays(),
partiallyBuiltBucketComparator
)
) {
InternalMultiTerms.Bucket spare = null;
BytesRef spareKey = null;
BytesKeyedBucketOrds.BucketOrdsEnum ordsEnum = bucketOrds.ordsEnum(owningBucketOrd);
while (ordsEnum.next()) {
long docCount = bucketDocCount(ordsEnum.ord());
otherDocCounts.increment(ordIdx, docCount);
if (docCount < bucketCountThresholds.getShardMinDocCount()) {
continue;
}
if (spare == null) {
spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters);
spareKey = new BytesRef();
}
ordsEnum.readValue(spareKey);
spare.terms = unpackTerms(spareKey);
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
if (spare == null) {
spare = new InternalMultiTerms.Bucket(null, 0, null, showTermDocCountError, 0, formats, keyConverters);
spareKey = new BytesRef();
}
ordsEnum.readValue(spareKey);
spare.terms = unpackTerms(spareKey);
spare.docCount = docCount;
spare.bucketOrd = ordsEnum.ord();
spare = ordered.insertWithOverflow(spare);
}
// Get the top buckets
InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()];
topBucketsPerOrd[ordIdx] = bucketsForOrd;
for (int b = (int) ordered.size() - 1; b >= 0; --b) {
topBucketsPerOrd[ordIdx][b] = ordered.pop();
otherDocCounts[ordIdx] -= topBucketsPerOrd[ordIdx][b].getDocCount();
// Get the top buckets
InternalMultiTerms.Bucket[] bucketsForOrd = new InternalMultiTerms.Bucket[(int) ordered.size()];
topBucketsPerOrd.set(ordIdx, bucketsForOrd);
for (int b = (int) ordered.size() - 1; b >= 0; --b) {
InternalMultiTerms.Bucket[] buckets = topBucketsPerOrd.get(ordIdx);
buckets[b] = ordered.pop();
otherDocCounts.increment(ordIdx, -buckets[b].getDocCount());
}
}
}
}
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
buildSubAggsForAllBuckets(topBucketsPerOrd, b -> b.bucketOrd, (b, a) -> b.aggregations = a);
InternalAggregation[] result = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
result[ordIdx] = buildResult(otherDocCounts[ordIdx], topBucketsPerOrd[ordIdx]);
InternalAggregation[] result = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < result.length; ordIdx++) {
result[ordIdx] = buildResult(otherDocCounts.get(ordIdx), topBucketsPerOrd.get(ordIdx));
}
return result;
}
return result;
}
InternalMultiTerms buildResult(long otherDocCount, InternalMultiTerms.Bucket[] topBuckets) {
@ -305,7 +312,7 @@ class MultiTermsAggregator extends DeferableBucketAggregator {
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
List.of(topBuckets),
Arrays.asList(topBuckets),
0,
formats,
keyConverters,

View file

@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.Releasables;
@ -110,31 +111,32 @@ public class CategorizeTextAggregator extends DeferableBucketAggregator {
}
@Override
public InternalAggregation[] buildAggregations(long[] ordsToCollect) throws IOException {
Bucket[][] topBucketsPerOrd = new Bucket[ordsToCollect.length][];
for (int ordIdx = 0; ordIdx < ordsToCollect.length; ordIdx++) {
final long ord = ordsToCollect[ordIdx];
final TokenListCategorizer categorizer = (ord < categorizers.size()) ? categorizers.get(ord) : null;
if (categorizer == null) {
topBucketsPerOrd[ordIdx] = new Bucket[0];
continue;
public InternalAggregation[] buildAggregations(LongArray ordsToCollect) throws IOException {
try (ObjectArray<Bucket[]> topBucketsPerOrd = bigArrays().newObjectArray(ordsToCollect.size())) {
for (long ordIdx = 0; ordIdx < ordsToCollect.size(); ordIdx++) {
final long ord = ordsToCollect.get(ordIdx);
final TokenListCategorizer categorizer = (ord < categorizers.size()) ? categorizers.get(ord) : null;
if (categorizer == null) {
topBucketsPerOrd.set(ordIdx, new Bucket[0]);
continue;
}
int size = (int) Math.min(bucketOrds.bucketsInOrd(ordIdx), bucketCountThresholds.getShardSize());
topBucketsPerOrd.set(ordIdx, categorizer.toOrderedBuckets(size));
}
int size = (int) Math.min(bucketOrds.bucketsInOrd(ordIdx), bucketCountThresholds.getShardSize());
topBucketsPerOrd[ordIdx] = categorizer.toOrderedBuckets(size);
buildSubAggsForAllBuckets(topBucketsPerOrd, Bucket::getBucketOrd, Bucket::setAggregations);
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(ordsToCollect.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = new InternalCategorizationAggregation(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
similarityThreshold,
metadata(),
Arrays.asList(topBucketsPerOrd.get(ordIdx))
);
}
return results;
}
buildSubAggsForAllBuckets(topBucketsPerOrd, Bucket::getBucketOrd, Bucket::setAggregations);
InternalAggregation[] results = new InternalAggregation[ordsToCollect.length];
for (int ordIdx = 0; ordIdx < ordsToCollect.length; ordIdx++) {
results[ordIdx] = new InternalCategorizationAggregation(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
similarityThreshold,
metadata(),
Arrays.asList(topBucketsPerOrd[ordIdx])
);
}
return results;
}
@Override

View file

@ -40,10 +40,12 @@ import java.util.function.Consumer;
* At the time of writing circuit breakers are a global gauge.)
*
* After the map phase and before reduce, the {@link ItemSetMapReduceAggregator} creates instances of
* {@link InternalItemSetMapReduceAggregation}, see {@link ItemSetMapReduceAggregator#buildAggregations(long[])}.
* {@link InternalItemSetMapReduceAggregation}, see
* {@link ItemSetMapReduceAggregator#buildAggregations(org.elasticsearch.common.util.LongArray)}.
*
* (Note 1: Instead of keeping the existing instance, it would have been possible to deep-copy the object like
* {@link CardinalityAggregator#buildAggregations(long[])}. I decided against this approach mainly because the deep-copy isn't
* {@link CardinalityAggregator#buildAggregations(org.elasticsearch.common.util.LongArray)}.
* I decided against this approach mainly because the deep-copy isn't
* secured by circuit breakers, meaning the node could run out of memory during the deep-copy.)
* (Note 2: Between {@link ItemSetMapReduceAggregator#doClose()} and serializing {@link InternalItemSetMapReduceAggregation}
* memory accounting is broken, meaning the agg context gets closed and bytes get returned to the circuit breaker before memory is

View file

@ -17,6 +17,7 @@ import org.apache.lucene.util.SetOnce;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.Tuple;
@ -117,9 +118,9 @@ public abstract class ItemSetMapReduceAggregator<
}
@Override
public final InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
public final InternalAggregation[] buildAggregations(LongArray owningBucketOrds) throws IOException {
InternalAggregation[] results = new InternalAggregation[Math.toIntExact(owningBucketOrds.size())];
for (int ordIdx = 0; ordIdx < results.length; ordIdx++) {
results[ordIdx] = buildAggregation(ordIdx);
}