diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java index cd10b429767f..2d360705f75b 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/BucketOrder.java @@ -134,7 +134,7 @@ public abstract class BucketOrder implements ToXContentObject, Writeable { * The comparator might need to reduce the {@link DelayedBucket} and therefore we need to provide the * reducer and the reduce context.The context must be on the final reduce phase. */ - public abstract Comparator> delayedBucketComparator( + abstract Comparator> delayedBucketComparator( BiFunction, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext ); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java b/server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java index eb671eb921c7..fa8fe9e4628d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/DelayedBucket.java @@ -93,7 +93,7 @@ public final class DelayedBucket Comparator> delayedBucketComparator( + Comparator> delayedBucketComparator( BiFunction, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext ) { @@ -219,7 +219,7 @@ public abstract class InternalOrder extends BucketOrder { } @Override - public Comparator> delayedBucketComparator( + Comparator> delayedBucketComparator( BiFunction, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext ) { @@ -287,7 +287,7 @@ public abstract class InternalOrder extends BucketOrder { } @Override - public Comparator> delayedBucketComparator( + Comparator> delayedBucketComparator( BiFunction, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext ) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/TopBucketBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/TopBucketBuilder.java new file mode 100644 index 000000000000..a3d04ecc2074 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/TopBucketBuilder.java @@ -0,0 +1,211 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.aggregations; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.PriorityQueue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +/** + * Merges many buckets into the "top" buckets as sorted by {@link BucketOrder}. + */ +public abstract class TopBucketBuilder { + /** + * The number of buckets required before we switch to the + * {@link BufferingTopBucketBuilder}. If we need fewer buckets we use + * {@link PriorityQueueTopBucketBuilder}. + *

+ * The value we picked for this boundary is fairly arbitrary, but it + * is important that its bigger than the default size of the terms + * aggregation. It's basically the amount of memory you are willing to + * waste when reduce small terms aggregations so it shouldn't be too + * large either. The value we have, {@code 1024}, preallocates about + * 32k for the priority queue. + */ + static final int USE_BUFFERING_BUILDER = 1024; + + /** + * Create a {@link TopBucketBuilder} to build a list of the top buckets. + *

+ * If there are few required results we use a {@link PriorityQueueTopBucketBuilder} + * which is simpler and when the priority queue is full but allocates {@code size + 1} + * slots in an array. If there are many required results we prefer a + * {@link BufferingTopBucketBuilder} which doesn't preallocate and is faster for the + * first {@code size} results. But it's a little slower when the priority queue is full. + *

+ * It's important for this not to preallocate a bunch of memory when + * {@code size} is very very large because this backs the reduction of the {@code terms} + * aggregation and folks often set the {@code size} of that to something quite large. + * The choice in the paragraph above handles this case. + * + * @param size the requested size of the list + * @param order the sort order of the buckets + * @param nonCompetitive called with non-competitive buckets + * @param reduce function to reduce a list of buckets + * @param reduceContext the reduce context + */ + public static TopBucketBuilder build( + int size, + BucketOrder order, + Consumer> nonCompetitive, + BiFunction, AggregationReduceContext, B> reduce, + AggregationReduceContext reduceContext + ) { + if (size < USE_BUFFERING_BUILDER) { + return new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext); + } + return new BufferingTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext); + } + + protected final Consumer> nonCompetitive; + + private TopBucketBuilder(Consumer> nonCompetitive) { + this.nonCompetitive = nonCompetitive; + } + + /** + * Add a bucket if it is competitive. If there isn't space but the + * bucket is competitive then this will drop the least competitive bucket + * to make room for the new bucket. + *

+ * Instead of operating on complete buckets we this operates on a + * wrapper containing what we need to merge the buckets called + * {@link DelayedBucket}. We can evaluate some common sort criteria + * directly on the {@linkplain DelayedBucket}s so we only need to + * merge exactly the sub-buckets we need. + */ + public abstract void add(DelayedBucket bucket); + + /** + * Return the most competitive buckets sorted by the comparator. + */ + public abstract List build(); + + /** + * Collects the "top" buckets by adding them directly to a {@link PriorityQueue}. + * This is always going to be faster than {@link BufferingTopBucketBuilder} + * but it requires allocating an array of {@code size + 1}. + */ + static class PriorityQueueTopBucketBuilder extends TopBucketBuilder { + private final PriorityQueue> queue; + private final BiFunction, AggregationReduceContext, B> reduce; + private final AggregationReduceContext reduceContext; + + PriorityQueueTopBucketBuilder( + int size, + BucketOrder order, + Consumer> nonCompetitive, + BiFunction, AggregationReduceContext, B> reduce, + AggregationReduceContext reduceContext + ) { + super(nonCompetitive); + if (size >= ArrayUtil.MAX_ARRAY_LENGTH) { + throw new IllegalArgumentException("can't reduce more than [" + ArrayUtil.MAX_ARRAY_LENGTH + "] buckets"); + } + this.reduce = reduce; + this.reduceContext = reduceContext; + queue = new PriorityQueue<>(size) { + private final Comparator> comparator = order.delayedBucketComparator(reduce, reduceContext); + + @Override + protected boolean lessThan(DelayedBucket a, DelayedBucket b) { + return comparator.compare(a, b) > 0; + } + }; + } + + @Override + public void add(DelayedBucket bucket) { + DelayedBucket removed = queue.insertWithOverflow(bucket); + if (removed != null) { + nonCompetitive.accept(removed); + removed.nonCompetitive(reduceContext); + } + } + + @Override + public List build() { + List result = new ArrayList<>(queue.size()); + for (int i = queue.size() - 1; i >= 0; i--) { + result.add(queue.pop().reduced(reduce, reduceContext)); + } + Collections.reverse(result); + return result; + } + } + + /** + * Collects the "top" buckets by adding them to a {@link List} that grows + * as more buckets arrive and is converting into a + * {@link PriorityQueueTopBucketBuilder} when {@code size} buckets arrive. + */ + private static class BufferingTopBucketBuilder extends TopBucketBuilder { + private final int size; + private final BucketOrder order; + private final BiFunction, AggregationReduceContext, B> reduce; + private final AggregationReduceContext reduceContext; + + private List> buffer; + private PriorityQueueTopBucketBuilder next; + + BufferingTopBucketBuilder( + int size, + BucketOrder order, + Consumer> nonCompetitive, + BiFunction, AggregationReduceContext, B> reduce, + AggregationReduceContext reduceContext + ) { + super(nonCompetitive); + this.reduce = reduce; + this.reduceContext = reduceContext; + this.size = size; + this.order = order; + buffer = new ArrayList<>(); + } + + @Override + public void add(DelayedBucket bucket) { + if (next != null) { + assert buffer == null; + next.add(bucket); + return; + } + buffer.add(bucket); + if (buffer.size() < size) { + return; + } + next = new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext); + for (DelayedBucket b : buffer) { + next.queue.add(b); + } + buffer = null; + } + + @Override + public List build() { + if (next != null) { + assert buffer == null; + return next.build(); + } + List result = new ArrayList<>(buffer.size()); + for (DelayedBucket b : buffer) { + result.add(b.reduced(reduce, reduceContext)); + } + result.sort(order.comparator()); + return result; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index 2d728de49511..71a06fb02034 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -21,13 +21,13 @@ import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation; import org.elasticsearch.search.aggregations.InternalOrder; import org.elasticsearch.search.aggregations.KeyComparable; +import org.elasticsearch.search.aggregations.TopBucketBuilder; import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent; import org.elasticsearch.search.aggregations.support.SamplingContext; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -296,34 +296,19 @@ public abstract class AbstractInternalTerms> comparator = getOrder().delayedBucketComparator( + TopBucketBuilder top = TopBucketBuilder.build( + getRequiredSize(), + getOrder(), + removed -> otherDocCount[0] += removed.getDocCount(), AbstractInternalTerms.this::reduceBucket, reduceContext ); - try ( - BucketPriorityQueue> top = new BucketPriorityQueue<>( - getRequiredSize(), - reduceContext.bigArrays(), - comparator - ) - ) { - thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> { - if (bucket.getDocCount() >= getMinDocCount()) { - final DelayedBucket removed = top.insertWithOverflow(bucket); - if (removed != null) { - otherDocCount[0] += removed.getDocCount(); - removed.nonCompetitive(reduceContext); - } - } - }); - // size is an integer as it should be <= getRequiredSize() - final int size = (int) top.size(); - result = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - result.add(top.pop().reduced(AbstractInternalTerms.this::reduceBucket, reduceContext)); + thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> { + if (bucket.getDocCount() >= getMinDocCount()) { + top.add(bucket); } - Collections.reverse(result); - } + }); + result = top.build(); } else { result = new ArrayList<>(); thisReduceOrder = reduceBuckets( diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/TopBucketBuilderTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/TopBucketBuilderTests.java new file mode 100644 index 000000000000..05ac96d0a7e0 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/search/aggregations/TopBucketBuilderTests.java @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search.aggregations; + +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.core.Strings; +import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.InternalBucket; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.InternalAggregationTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; + +import static org.elasticsearch.search.aggregations.DelayedBucketTests.mockReduce; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +public class TopBucketBuilderTests extends ESTestCase { + public void testSizeOne() { + int count = between(1, 1000); + AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction(); + BiFunction, AggregationReduceContext, InternalBucket> reduce = mockReduce(context); + List nonCompetitive = new ArrayList<>(); + TopBucketBuilder builder = TopBucketBuilder.build( + 1, + BucketOrder.key(true), + b -> nonCompetitive.add(b.toString()), + reduce, + context + ); + + for (int i = 0; i < count; i++) { + builder.add(new DelayedBucket<>(List.of(bucket(i)))); + } + + List top = builder.build(); + assertThat(top, hasSize(1)); + assertThat(top.get(0).getKeyAsString(), equalTo("000000")); + assertThat(top.get(0).getDocCount(), equalTo(1L)); + for (int i = 1; i < count; i++) { + assertThat(nonCompetitive.get(i - 1), equalTo("Delayed[" + bucketKey(i) + "]")); + } + } + + public void testAllCompetitive() { + int size = between(3, 1000); + int count = between(1, size); + AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction(); + BiFunction, AggregationReduceContext, InternalBucket> reduce = mockReduce(context); + TopBucketBuilder builder = TopBucketBuilder.build( + size, + BucketOrder.key(true), + b -> fail("unexpected uncompetitive bucket " + b), + reduce, + context + ); + + for (int i = 0; i < count; i++) { + builder.add(new DelayedBucket<>(List.of(bucket(i)))); + } + + List top = builder.build(); + assertThat(top, hasSize(count)); + for (int i = 0; i < count; i++) { + assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i))); + assertThat(top.get(i).getDocCount(), equalTo(1L)); + } + } + + public void someNonCompetitiveTestCase(int size) { + int count = between(size + 1, size * 30); + AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction(); + BiFunction, AggregationReduceContext, InternalBucket> reduce = mockReduce(context); + List nonCompetitive = new ArrayList<>(); + TopBucketBuilder builder = TopBucketBuilder.build( + size, + BucketOrder.key(true), + b -> nonCompetitive.add(b.toString()), + reduce, + context + ); + + for (int i = 0; i < count; i++) { + builder.add(new DelayedBucket<>(List.of(bucket(i)))); + } + + List top = builder.build(); + assertThat(top, hasSize(size)); + for (int i = 0; i < count; i++) { + if (i < size) { + assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i))); + assertThat(top.get(i).getDocCount(), equalTo(1L)); + } else { + assertThat(nonCompetitive.get(i - size), equalTo("Delayed[" + bucketKey(i) + "]")); + } + } + } + + public void testSomeNonCompetitiveSmall() { + someNonCompetitiveTestCase(between(2, TopBucketBuilder.USE_BUFFERING_BUILDER - 1)); + } + + public void testSomeNonCompetitiveLarge() { + someNonCompetitiveTestCase(between(TopBucketBuilder.USE_BUFFERING_BUILDER, TopBucketBuilder.USE_BUFFERING_BUILDER * 5)); + } + + public void testHuge() { + int count = between(1, 1000); + AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction(); + BiFunction, AggregationReduceContext, InternalBucket> reduce = mockReduce(context); + TopBucketBuilder builder = TopBucketBuilder.build( + Integer.MAX_VALUE, + BucketOrder.key(true), + b -> fail("unexpected uncompetitive bucket " + b), + reduce, + context + ); + + for (int i = 0; i < count; i++) { + builder.add(new DelayedBucket<>(List.of(bucket(i)))); + } + + List top = builder.build(); + assertThat(top, hasSize(count)); + assertThat(top.get(0).getKeyAsString(), equalTo("000000")); + assertThat(top.get(0).getDocCount(), equalTo(1L)); + for (int i = 0; i < count; i++) { + assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i))); + assertThat(top.get(i).getDocCount(), equalTo(1L)); + } + } + + public void testHugeQueueError() { + Exception e = expectThrows( + IllegalArgumentException.class, + () -> new TopBucketBuilder.PriorityQueueTopBucketBuilder<>( + ArrayUtil.MAX_ARRAY_LENGTH, + BucketOrder.key(true), + b -> fail("unexpected uncompetitive bucket " + b), + null, + null + ) + ); + assertThat(e.getMessage(), equalTo("can't reduce more than [" + ArrayUtil.MAX_ARRAY_LENGTH + "] buckets")); + } + + private String bucketKey(int index) { + return Strings.format("%06d", index); + } + + private InternalBucket bucket(int index) { + return new StringTerms.Bucket(new BytesRef(bucketKey(index)), 1, InternalAggregations.EMPTY, false, 0, DocValueFormat.RAW); + } +}