mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
I was mistaken and the only BigArray implementation that it initializes with zero pages is the Byte array and therefore this change will allocate all the array upfront.
This commit is contained in:
parent
cfe8dfa322
commit
f211f6a65b
6 changed files with 391 additions and 30 deletions
|
@ -134,7 +134,7 @@ public abstract class BucketOrder implements ToXContentObject, Writeable {
|
||||||
* The comparator might need to reduce the {@link DelayedBucket} and therefore we need to provide the
|
* The comparator might need to reduce the {@link DelayedBucket} and therefore we need to provide the
|
||||||
* reducer and the reduce context.The context must be on the final reduce phase.
|
* reducer and the reduce context.The context must be on the final reduce phase.
|
||||||
*/
|
*/
|
||||||
public abstract <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
abstract <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
||||||
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
AggregationReduceContext reduceContext
|
AggregationReduceContext reduceContext
|
||||||
);
|
);
|
||||||
|
|
|
@ -93,7 +93,7 @@ public final class DelayedBucket<B extends InternalMultiBucketAggregation.Intern
|
||||||
* Called to mark a bucket as non-competitive so it can release it can release
|
* Called to mark a bucket as non-competitive so it can release it can release
|
||||||
* any sub-buckets from the breaker.
|
* any sub-buckets from the breaker.
|
||||||
*/
|
*/
|
||||||
public void nonCompetitive(AggregationReduceContext reduceContext) {
|
void nonCompetitive(AggregationReduceContext reduceContext) {
|
||||||
if (reduced != null) {
|
if (reduced != null) {
|
||||||
// -1 for itself, -countInnerBucket for all the sub-buckets.
|
// -1 for itself, -countInnerBucket for all the sub-buckets.
|
||||||
reduceContext.consumeBucketsAndMaybeBreak(-1 - InternalMultiBucketAggregation.countInnerBucket(reduced));
|
reduceContext.consumeBucketsAndMaybeBreak(-1 - InternalMultiBucketAggregation.countInnerBucket(reduced));
|
||||||
|
|
|
@ -84,7 +84,7 @@ public abstract class InternalOrder extends BucketOrder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
||||||
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
AggregationReduceContext reduceContext
|
AggregationReduceContext reduceContext
|
||||||
) {
|
) {
|
||||||
|
@ -219,7 +219,7 @@ public abstract class InternalOrder extends BucketOrder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
||||||
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
AggregationReduceContext reduceContext
|
AggregationReduceContext reduceContext
|
||||||
) {
|
) {
|
||||||
|
@ -287,7 +287,7 @@ public abstract class InternalOrder extends BucketOrder {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
<B extends InternalMultiBucketAggregation.InternalBucket> Comparator<DelayedBucket<B>> delayedBucketComparator(
|
||||||
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
AggregationReduceContext reduceContext
|
AggregationReduceContext reduceContext
|
||||||
) {
|
) {
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.ArrayUtil;
|
||||||
|
import org.apache.lucene.util.PriorityQueue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Merges many buckets into the "top" buckets as sorted by {@link BucketOrder}.
|
||||||
|
*/
|
||||||
|
public abstract class TopBucketBuilder<B extends InternalMultiBucketAggregation.InternalBucket> {
|
||||||
|
/**
|
||||||
|
* The number of buckets required before we switch to the
|
||||||
|
* {@link BufferingTopBucketBuilder}. If we need fewer buckets we use
|
||||||
|
* {@link PriorityQueueTopBucketBuilder}.
|
||||||
|
* <p>
|
||||||
|
* The value we picked for this boundary is fairly arbitrary, but it
|
||||||
|
* is important that its bigger than the default size of the terms
|
||||||
|
* aggregation. It's basically the amount of memory you are willing to
|
||||||
|
* waste when reduce small terms aggregations so it shouldn't be too
|
||||||
|
* large either. The value we have, {@code 1024}, preallocates about
|
||||||
|
* 32k for the priority queue.
|
||||||
|
*/
|
||||||
|
static final int USE_BUFFERING_BUILDER = 1024;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a {@link TopBucketBuilder} to build a list of the top buckets.
|
||||||
|
* <p>
|
||||||
|
* If there are few required results we use a {@link PriorityQueueTopBucketBuilder}
|
||||||
|
* which is simpler and when the priority queue is full but allocates {@code size + 1}
|
||||||
|
* slots in an array. If there are many required results we prefer a
|
||||||
|
* {@link BufferingTopBucketBuilder} which doesn't preallocate and is faster for the
|
||||||
|
* first {@code size} results. But it's a little slower when the priority queue is full.
|
||||||
|
* <p>
|
||||||
|
* It's important for this <strong>not</strong> to preallocate a bunch of memory when
|
||||||
|
* {@code size} is very very large because this backs the reduction of the {@code terms}
|
||||||
|
* aggregation and folks often set the {@code size} of that to something quite large.
|
||||||
|
* The choice in the paragraph above handles this case.
|
||||||
|
*
|
||||||
|
* @param size the requested size of the list
|
||||||
|
* @param order the sort order of the buckets
|
||||||
|
* @param nonCompetitive called with non-competitive buckets
|
||||||
|
* @param reduce function to reduce a list of buckets
|
||||||
|
* @param reduceContext the reduce context
|
||||||
|
*/
|
||||||
|
public static <B extends InternalMultiBucketAggregation.InternalBucket> TopBucketBuilder<B> build(
|
||||||
|
int size,
|
||||||
|
BucketOrder order,
|
||||||
|
Consumer<DelayedBucket<B>> nonCompetitive,
|
||||||
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
|
AggregationReduceContext reduceContext
|
||||||
|
) {
|
||||||
|
if (size < USE_BUFFERING_BUILDER) {
|
||||||
|
return new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
|
||||||
|
}
|
||||||
|
return new BufferingTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final Consumer<DelayedBucket<B>> nonCompetitive;
|
||||||
|
|
||||||
|
private TopBucketBuilder(Consumer<DelayedBucket<B>> nonCompetitive) {
|
||||||
|
this.nonCompetitive = nonCompetitive;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a bucket if it is competitive. If there isn't space but the
|
||||||
|
* bucket is competitive then this will drop the least competitive bucket
|
||||||
|
* to make room for the new bucket.
|
||||||
|
* <p>
|
||||||
|
* Instead of operating on complete buckets we this operates on a
|
||||||
|
* wrapper containing what we need to merge the buckets called
|
||||||
|
* {@link DelayedBucket}. We can evaluate some common sort criteria
|
||||||
|
* directly on the {@linkplain DelayedBucket}s so we only need to
|
||||||
|
* merge <strong>exactly</strong> the sub-buckets we need.
|
||||||
|
*/
|
||||||
|
public abstract void add(DelayedBucket<B> bucket);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the most competitive buckets sorted by the comparator.
|
||||||
|
*/
|
||||||
|
public abstract List<B> build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collects the "top" buckets by adding them directly to a {@link PriorityQueue}.
|
||||||
|
* This is always going to be faster than {@link BufferingTopBucketBuilder}
|
||||||
|
* but it requires allocating an array of {@code size + 1}.
|
||||||
|
*/
|
||||||
|
static class PriorityQueueTopBucketBuilder<B extends InternalMultiBucketAggregation.InternalBucket> extends TopBucketBuilder<B> {
|
||||||
|
private final PriorityQueue<DelayedBucket<B>> queue;
|
||||||
|
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
|
||||||
|
private final AggregationReduceContext reduceContext;
|
||||||
|
|
||||||
|
PriorityQueueTopBucketBuilder(
|
||||||
|
int size,
|
||||||
|
BucketOrder order,
|
||||||
|
Consumer<DelayedBucket<B>> nonCompetitive,
|
||||||
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
|
AggregationReduceContext reduceContext
|
||||||
|
) {
|
||||||
|
super(nonCompetitive);
|
||||||
|
if (size >= ArrayUtil.MAX_ARRAY_LENGTH) {
|
||||||
|
throw new IllegalArgumentException("can't reduce more than [" + ArrayUtil.MAX_ARRAY_LENGTH + "] buckets");
|
||||||
|
}
|
||||||
|
this.reduce = reduce;
|
||||||
|
this.reduceContext = reduceContext;
|
||||||
|
queue = new PriorityQueue<>(size) {
|
||||||
|
private final Comparator<DelayedBucket<B>> comparator = order.delayedBucketComparator(reduce, reduceContext);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean lessThan(DelayedBucket<B> a, DelayedBucket<B> b) {
|
||||||
|
return comparator.compare(a, b) > 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(DelayedBucket<B> bucket) {
|
||||||
|
DelayedBucket<B> removed = queue.insertWithOverflow(bucket);
|
||||||
|
if (removed != null) {
|
||||||
|
nonCompetitive.accept(removed);
|
||||||
|
removed.nonCompetitive(reduceContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<B> build() {
|
||||||
|
List<B> result = new ArrayList<>(queue.size());
|
||||||
|
for (int i = queue.size() - 1; i >= 0; i--) {
|
||||||
|
result.add(queue.pop().reduced(reduce, reduceContext));
|
||||||
|
}
|
||||||
|
Collections.reverse(result);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collects the "top" buckets by adding them to a {@link List} that grows
|
||||||
|
* as more buckets arrive and is converting into a
|
||||||
|
* {@link PriorityQueueTopBucketBuilder} when {@code size} buckets arrive.
|
||||||
|
*/
|
||||||
|
private static class BufferingTopBucketBuilder<B extends InternalMultiBucketAggregation.InternalBucket> extends TopBucketBuilder<B> {
|
||||||
|
private final int size;
|
||||||
|
private final BucketOrder order;
|
||||||
|
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
|
||||||
|
private final AggregationReduceContext reduceContext;
|
||||||
|
|
||||||
|
private List<DelayedBucket<B>> buffer;
|
||||||
|
private PriorityQueueTopBucketBuilder<B> next;
|
||||||
|
|
||||||
|
BufferingTopBucketBuilder(
|
||||||
|
int size,
|
||||||
|
BucketOrder order,
|
||||||
|
Consumer<DelayedBucket<B>> nonCompetitive,
|
||||||
|
BiFunction<List<B>, AggregationReduceContext, B> reduce,
|
||||||
|
AggregationReduceContext reduceContext
|
||||||
|
) {
|
||||||
|
super(nonCompetitive);
|
||||||
|
this.reduce = reduce;
|
||||||
|
this.reduceContext = reduceContext;
|
||||||
|
this.size = size;
|
||||||
|
this.order = order;
|
||||||
|
buffer = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void add(DelayedBucket<B> bucket) {
|
||||||
|
if (next != null) {
|
||||||
|
assert buffer == null;
|
||||||
|
next.add(bucket);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
buffer.add(bucket);
|
||||||
|
if (buffer.size() < size) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
next = new PriorityQueueTopBucketBuilder<>(size, order, nonCompetitive, reduce, reduceContext);
|
||||||
|
for (DelayedBucket<B> b : buffer) {
|
||||||
|
next.queue.add(b);
|
||||||
|
}
|
||||||
|
buffer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<B> build() {
|
||||||
|
if (next != null) {
|
||||||
|
assert buffer == null;
|
||||||
|
return next.build();
|
||||||
|
}
|
||||||
|
List<B> result = new ArrayList<>(buffer.size());
|
||||||
|
for (DelayedBucket<B> b : buffer) {
|
||||||
|
result.add(b.reduced(reduce, reduceContext));
|
||||||
|
}
|
||||||
|
result.sort(order.comparator());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,13 +21,13 @@ import org.elasticsearch.search.aggregations.InternalAggregations;
|
||||||
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
|
||||||
import org.elasticsearch.search.aggregations.InternalOrder;
|
import org.elasticsearch.search.aggregations.InternalOrder;
|
||||||
import org.elasticsearch.search.aggregations.KeyComparable;
|
import org.elasticsearch.search.aggregations.KeyComparable;
|
||||||
|
import org.elasticsearch.search.aggregations.TopBucketBuilder;
|
||||||
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
|
import org.elasticsearch.search.aggregations.bucket.IteratorAndCurrent;
|
||||||
import org.elasticsearch.search.aggregations.support.SamplingContext;
|
import org.elasticsearch.search.aggregations.support.SamplingContext;
|
||||||
import org.elasticsearch.xcontent.XContentBuilder;
|
import org.elasticsearch.xcontent.XContentBuilder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -296,34 +296,19 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else if (reduceContext.isFinalReduce()) {
|
} else if (reduceContext.isFinalReduce()) {
|
||||||
final Comparator<DelayedBucket<B>> comparator = getOrder().delayedBucketComparator(
|
TopBucketBuilder<B> top = TopBucketBuilder.build(
|
||||||
|
getRequiredSize(),
|
||||||
|
getOrder(),
|
||||||
|
removed -> otherDocCount[0] += removed.getDocCount(),
|
||||||
AbstractInternalTerms.this::reduceBucket,
|
AbstractInternalTerms.this::reduceBucket,
|
||||||
reduceContext
|
reduceContext
|
||||||
);
|
);
|
||||||
try (
|
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
|
||||||
BucketPriorityQueue<DelayedBucket<B>> top = new BucketPriorityQueue<>(
|
if (bucket.getDocCount() >= getMinDocCount()) {
|
||||||
getRequiredSize(),
|
top.add(bucket);
|
||||||
reduceContext.bigArrays(),
|
|
||||||
comparator
|
|
||||||
)
|
|
||||||
) {
|
|
||||||
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
|
|
||||||
if (bucket.getDocCount() >= getMinDocCount()) {
|
|
||||||
final DelayedBucket<B> removed = top.insertWithOverflow(bucket);
|
|
||||||
if (removed != null) {
|
|
||||||
otherDocCount[0] += removed.getDocCount();
|
|
||||||
removed.nonCompetitive(reduceContext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
// size is an integer as it should be <= getRequiredSize()
|
|
||||||
final int size = (int) top.size();
|
|
||||||
result = new ArrayList<>(size);
|
|
||||||
for (int i = 0; i < size; i++) {
|
|
||||||
result.add(top.pop().reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
|
|
||||||
}
|
}
|
||||||
Collections.reverse(result);
|
});
|
||||||
}
|
result = top.build();
|
||||||
} else {
|
} else {
|
||||||
result = new ArrayList<>();
|
result = new ArrayList<>();
|
||||||
thisReduceOrder = reduceBuckets(
|
thisReduceOrder = reduceBuckets(
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the "Elastic License
|
||||||
|
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
|
||||||
|
* Public License v 1"; you may not use this file except in compliance with, at
|
||||||
|
* your election, the "Elastic License 2.0", the "GNU Affero General Public
|
||||||
|
* License v3.0 only", or the "Server Side Public License, v 1".
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.search.aggregations;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.ArrayUtil;
|
||||||
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.elasticsearch.core.Strings;
|
||||||
|
import org.elasticsearch.search.DocValueFormat;
|
||||||
|
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.InternalBucket;
|
||||||
|
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
|
||||||
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.test.InternalAggregationTestCase;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.BiFunction;
|
||||||
|
|
||||||
|
import static org.elasticsearch.search.aggregations.DelayedBucketTests.mockReduce;
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
|
|
||||||
|
public class TopBucketBuilderTests extends ESTestCase {
|
||||||
|
public void testSizeOne() {
|
||||||
|
int count = between(1, 1000);
|
||||||
|
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
|
||||||
|
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
|
||||||
|
List<String> nonCompetitive = new ArrayList<>();
|
||||||
|
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
|
||||||
|
1,
|
||||||
|
BucketOrder.key(true),
|
||||||
|
b -> nonCompetitive.add(b.toString()),
|
||||||
|
reduce,
|
||||||
|
context
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
builder.add(new DelayedBucket<>(List.of(bucket(i))));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<InternalBucket> top = builder.build();
|
||||||
|
assertThat(top, hasSize(1));
|
||||||
|
assertThat(top.get(0).getKeyAsString(), equalTo("000000"));
|
||||||
|
assertThat(top.get(0).getDocCount(), equalTo(1L));
|
||||||
|
for (int i = 1; i < count; i++) {
|
||||||
|
assertThat(nonCompetitive.get(i - 1), equalTo("Delayed[" + bucketKey(i) + "]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAllCompetitive() {
|
||||||
|
int size = between(3, 1000);
|
||||||
|
int count = between(1, size);
|
||||||
|
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
|
||||||
|
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
|
||||||
|
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
|
||||||
|
size,
|
||||||
|
BucketOrder.key(true),
|
||||||
|
b -> fail("unexpected uncompetitive bucket " + b),
|
||||||
|
reduce,
|
||||||
|
context
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
builder.add(new DelayedBucket<>(List.of(bucket(i))));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<InternalBucket> top = builder.build();
|
||||||
|
assertThat(top, hasSize(count));
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i)));
|
||||||
|
assertThat(top.get(i).getDocCount(), equalTo(1L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void someNonCompetitiveTestCase(int size) {
|
||||||
|
int count = between(size + 1, size * 30);
|
||||||
|
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
|
||||||
|
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
|
||||||
|
List<String> nonCompetitive = new ArrayList<>();
|
||||||
|
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
|
||||||
|
size,
|
||||||
|
BucketOrder.key(true),
|
||||||
|
b -> nonCompetitive.add(b.toString()),
|
||||||
|
reduce,
|
||||||
|
context
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
builder.add(new DelayedBucket<>(List.of(bucket(i))));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<InternalBucket> top = builder.build();
|
||||||
|
assertThat(top, hasSize(size));
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
if (i < size) {
|
||||||
|
assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i)));
|
||||||
|
assertThat(top.get(i).getDocCount(), equalTo(1L));
|
||||||
|
} else {
|
||||||
|
assertThat(nonCompetitive.get(i - size), equalTo("Delayed[" + bucketKey(i) + "]"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSomeNonCompetitiveSmall() {
|
||||||
|
someNonCompetitiveTestCase(between(2, TopBucketBuilder.USE_BUFFERING_BUILDER - 1));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSomeNonCompetitiveLarge() {
|
||||||
|
someNonCompetitiveTestCase(between(TopBucketBuilder.USE_BUFFERING_BUILDER, TopBucketBuilder.USE_BUFFERING_BUILDER * 5));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testHuge() {
|
||||||
|
int count = between(1, 1000);
|
||||||
|
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
|
||||||
|
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
|
||||||
|
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
|
||||||
|
Integer.MAX_VALUE,
|
||||||
|
BucketOrder.key(true),
|
||||||
|
b -> fail("unexpected uncompetitive bucket " + b),
|
||||||
|
reduce,
|
||||||
|
context
|
||||||
|
);
|
||||||
|
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
builder.add(new DelayedBucket<>(List.of(bucket(i))));
|
||||||
|
}
|
||||||
|
|
||||||
|
List<InternalBucket> top = builder.build();
|
||||||
|
assertThat(top, hasSize(count));
|
||||||
|
assertThat(top.get(0).getKeyAsString(), equalTo("000000"));
|
||||||
|
assertThat(top.get(0).getDocCount(), equalTo(1L));
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
assertThat(top.get(i).getKeyAsString(), equalTo(bucketKey(i)));
|
||||||
|
assertThat(top.get(i).getDocCount(), equalTo(1L));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testHugeQueueError() {
|
||||||
|
Exception e = expectThrows(
|
||||||
|
IllegalArgumentException.class,
|
||||||
|
() -> new TopBucketBuilder.PriorityQueueTopBucketBuilder<>(
|
||||||
|
ArrayUtil.MAX_ARRAY_LENGTH,
|
||||||
|
BucketOrder.key(true),
|
||||||
|
b -> fail("unexpected uncompetitive bucket " + b),
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
)
|
||||||
|
);
|
||||||
|
assertThat(e.getMessage(), equalTo("can't reduce more than [" + ArrayUtil.MAX_ARRAY_LENGTH + "] buckets"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String bucketKey(int index) {
|
||||||
|
return Strings.format("%06d", index);
|
||||||
|
}
|
||||||
|
|
||||||
|
private InternalBucket bucket(int index) {
|
||||||
|
return new StringTerms.Bucket(new BytesRef(bucketKey(index)), 1, InternalAggregations.EMPTY, false, 0, DocValueFormat.RAW);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Add table
Add a link
Reference in a new issue