Bring ReduceContext to top level class (#81394)

This brings `ReduceContext` to a top level class so we can add more
things to it without making `InternalAggregation` huge. It also removes
the now unused `pipelineTreeForBwcSerialization`.
This commit is contained in:
Nik Everett 2021-12-07 11:37:47 -05:00 committed by GitHub
parent ada1972c54
commit fd185e4661
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
114 changed files with 520 additions and 494 deletions

View file

@ -24,8 +24,8 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
@ -65,29 +65,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
@Fork(value = 1)
public class TermsReduceBenchmark {
private final SearchPhaseController controller = new SearchPhaseController(
(task, req) -> new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(null, null, () -> PipelineAggregator.PipelineTree.EMPTY, task);
}
@Override
public InternalAggregation.ReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return InternalAggregation.ReduceContext.forFinalReduction(
null,
null,
bucketConsumer,
PipelineAggregator.PipelineTree.EMPTY,
task
);
}
private final SearchPhaseController controller = new SearchPhaseController((task, req) -> new AggregationReduceContext.Builder() {
@Override
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(null, null, task);
}
);
@Override
public AggregationReduceContext forFinalReduction() {
final MultiBucketConsumerService.MultiBucketConsumer bucketConsumer = new MultiBucketConsumerService.MultiBucketConsumer(
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
return new AggregationReduceContext.ForFinal(null, null, bucketConsumer, PipelineAggregator.PipelineTree.EMPTY, task);
}
});
@State(Scope.Benchmark)
public static class TermsList extends AbstractList<InternalAggregations> {

View file

@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.matrix.stats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -226,7 +227,7 @@ public class InternalMatrixStats extends InternalAggregation implements MatrixSt
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// merge stats across all shards
List<InternalAggregation> aggs = new ArrayList<>(aggregations);
aggs.removeIf(p -> ((InternalMatrixStats) p).stats == null);

View file

@ -15,6 +15,7 @@ import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.matrix.MatrixAggregationPlugin;
@ -153,7 +154,7 @@ public class InternalMatrixStatsTests extends InternalAggregationTestCase<Intern
ScriptService mockScriptService = mockScriptService();
MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
b -> {},

View file

@ -21,7 +21,7 @@ import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContextBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.query.QuerySearchResult;
@ -57,7 +57,7 @@ public class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhas
private final CircuitBreaker circuitBreaker;
private final SearchPhaseController controller;
private final SearchProgressListener progressListener;
private final ReduceContextBuilder aggReduceContextBuilder;
private final AggregationReduceContext.Builder aggReduceContextBuilder;
private final int topNSize;
private final boolean hasTopDocs;

View file

@ -31,8 +31,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.dfs.AggregatedDfs;
@ -64,10 +63,10 @@ import java.util.stream.Collectors;
public final class SearchPhaseController {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
private final BiFunction<Supplier<Boolean>, SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final BiFunction<Supplier<Boolean>, SearchRequest, AggregationReduceContext.Builder> requestToAggReduceContextBuilder;
public SearchPhaseController(
BiFunction<Supplier<Boolean>, SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
BiFunction<Supplier<Boolean>, SearchRequest, AggregationReduceContext.Builder> requestToAggReduceContextBuilder
) {
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
}
@ -380,14 +379,14 @@ public final class SearchPhaseController {
* @param queryResults a list of non-null query shard results
*/
ReducedQueryPhase reducedScrollQueryPhase(Collection<? extends SearchPhaseResult> queryResults) {
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder = new InternalAggregation.ReduceContextBuilder() {
AggregationReduceContext.Builder aggReduceContextBuilder = new AggregationReduceContext.Builder() {
@Override
public ReduceContext forPartialReduction() {
public AggregationReduceContext forPartialReduction() {
throw new UnsupportedOperationException("Scroll requests don't have aggs");
}
@Override
public ReduceContext forFinalReduction() {
public AggregationReduceContext forFinalReduction() {
throw new UnsupportedOperationException("Scroll requests don't have aggs");
}
};
@ -423,7 +422,7 @@ public final class SearchPhaseController {
TopDocsStats topDocsStats,
int numReducePhases,
boolean isScrollRequest,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
AggregationReduceContext.Builder aggReduceContextBuilder,
boolean performFinalReduce
) {
assert numReducePhases >= 0 : "num reduce phases must be >= 0 but was: " + numReducePhases;
@ -528,7 +527,7 @@ public final class SearchPhaseController {
}
private static InternalAggregations reduceAggs(
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
AggregationReduceContext.Builder aggReduceContextBuilder,
boolean performFinalReduce,
List<InternalAggregations> toReduce
) {
@ -679,7 +678,7 @@ public final class SearchPhaseController {
}
}
InternalAggregation.ReduceContextBuilder getReduceContext(Supplier<Boolean> isCanceled, SearchRequest request) {
AggregationReduceContext.Builder getReduceContext(Supplier<Boolean> isCanceled, SearchRequest request) {
return requestToAggReduceContextBuilder.apply(isCanceled, request);
}

View file

@ -24,7 +24,7 @@ import org.elasticsearch.lucene.grouping.TopFieldGroups;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.profile.SearchProfileResults;
@ -69,7 +69,7 @@ final class SearchResponseMerger {
final int size;
final int trackTotalHitsUpTo;
private final SearchTimeProvider searchTimeProvider;
private final InternalAggregation.ReduceContextBuilder aggReduceContextBuilder;
private final AggregationReduceContext.Builder aggReduceContextBuilder;
private final List<SearchResponse> searchResponses = new CopyOnWriteArrayList<>();
SearchResponseMerger(
@ -77,7 +77,7 @@ final class SearchResponseMerger {
int size,
int trackTotalHitsUpTo,
SearchTimeProvider searchTimeProvider,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder
AggregationReduceContext.Builder aggReduceContextBuilder
) {
this.from = from;
this.size = size;

View file

@ -52,7 +52,7 @@ import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
@ -507,7 +507,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
OriginalIndices localIndices,
Map<String, OriginalIndices> remoteIndices,
SearchTimeProvider timeProvider,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
AggregationReduceContext.Builder aggReduceContextBuilder,
RemoteClusterService remoteClusterService,
ThreadPool threadPool,
ActionListener<SearchResponse> listener,
@ -632,7 +632,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
static SearchResponseMerger createSearchResponseMerger(
SearchSourceBuilder source,
SearchTimeProvider timeProvider,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder
AggregationReduceContext.Builder aggReduceContextBuilder
) {
final int from;
final int size;

View file

@ -70,9 +70,8 @@ import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.script.FieldScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
@ -1591,25 +1590,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
}
/**
* Returns a builder for {@link InternalAggregation.ReduceContext}. This
* Returns a builder for {@link AggregationReduceContext}. This
* builder retains a reference to the provided {@link SearchRequest}.
*/
public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(Supplier<Boolean> isCanceled, SearchRequest request) {
return new InternalAggregation.ReduceContextBuilder() {
public AggregationReduceContext.Builder aggReduceContextBuilder(Supplier<Boolean> isCanceled, SearchRequest request) {
return new AggregationReduceContext.Builder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(
bigArrays,
scriptService,
() -> requestToPipelineTree(request),
isCanceled
);
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(bigArrays, scriptService, isCanceled);
}
@Override
public ReduceContext forFinalReduction() {
public AggregationReduceContext forFinalReduction() {
PipelineTree pipelineTree = requestToPipelineTree(request);
return InternalAggregation.ReduceContext.forFinalReduction(
return new AggregationReduceContext.ForFinal(
bigArrays,
scriptService,
multiBucketConsumerService.create(),

View file

@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.tasks.TaskCancelledException;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
public abstract sealed class AggregationReduceContext permits AggregationReduceContext.ForPartial,AggregationReduceContext.ForFinal {
/**
* Builds {@link AggregationReduceContext}s.
*/
public interface Builder {
/**
* Build an {@linkplain AggregationReduceContext} to perform a partial reduction.
*/
AggregationReduceContext forPartialReduction();
/**
* Build an {@linkplain AggregationReduceContext} to perform the final reduction.
*/
AggregationReduceContext forFinalReduction();
}
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final Supplier<Boolean> isCanceled;
public AggregationReduceContext(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.isCanceled = isCanceled;
}
/**
* Returns <code>true</code> iff the current reduce phase is the final
* reduce phase. This indicates if operations like pipeline aggregations
* should be applied or if specific features like {@code minDocCount}
* should be taken into account. Operations that are potentially losing
* information can only be applied during the final reduce phase.
*/
public abstract boolean isFinalReduce();
public final BigArrays bigArrays() {
return bigArrays;
}
public final ScriptService scriptService() {
return scriptService;
}
public final Supplier<Boolean> isCanceled() {
return isCanceled;
}
/**
* The root of the tree of pipeline aggregations for this request.
*/
public abstract PipelineTree pipelineTreeRoot();
/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
public final void consumeBucketsAndMaybeBreak(int size) {
// This is a volatile read.
if (isCanceled.get()) {
throw new TaskCancelledException("Cancelled");
}
consumeBucketCountAndMaybeBreak(size);
}
protected abstract void consumeBucketCountAndMaybeBreak(int size);
/**
* A {@linkplain AggregationReduceContext} to perform a partial reduction.
*/
public static final class ForPartial extends AggregationReduceContext {
public ForPartial(BigArrays bigArrays, ScriptService scriptService, Supplier<Boolean> isCanceled) {
super(bigArrays, scriptService, isCanceled);
}
@Override
public boolean isFinalReduce() {
return false;
}
@Override
protected void consumeBucketCountAndMaybeBreak(int size) {}
@Override
public PipelineTree pipelineTreeRoot() {
return null;
}
}
/**
* A {@linkplain AggregationReduceContext} to perform the final reduction.
*/
public static final class ForFinal extends AggregationReduceContext {
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;
public ForFinal(
BigArrays bigArrays,
ScriptService scriptService,
IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot,
Supplier<Boolean> isCanceled
) {
super(bigArrays, scriptService, isCanceled);
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
}
@Override
public boolean isFinalReduce() {
return true;
}
@Override
protected void consumeBucketCountAndMaybeBreak(int size) {
multiBucketConsumer.accept(size);
}
@Override
public PipelineTree pipelineTreeRoot() {
return pipelineTreeRoot;
}
}
}

View file

@ -16,8 +16,8 @@ import java.util.function.BiFunction;
* as long as possible. It's stateful and not even close to thread safe.
*/
public final class DelayedBucket<B extends InternalMultiBucketAggregation.InternalBucket> {
private final BiFunction<List<B>, InternalAggregation.ReduceContext, B> reduce;
private final InternalAggregation.ReduceContext reduceContext;
private final BiFunction<List<B>, AggregationReduceContext, B> reduce;
private final AggregationReduceContext reduceContext;
/**
* The buckets to reduce or {@code null} if we've already reduced the buckets.
*/
@ -40,8 +40,8 @@ public final class DelayedBucket<B extends InternalMultiBucketAggregation.Intern
* {@link InternalMultiBucketAggregation#reduceBucket}.
*/
public DelayedBucket(
BiFunction<List<B>, InternalAggregation.ReduceContext, B> reduce,
InternalAggregation.ReduceContext reduceContext,
BiFunction<List<B>, AggregationReduceContext, B> reduce,
AggregationReduceContext reduceContext,
List<B> toReduce
) {
this.reduce = reduce;

View file

@ -11,13 +11,10 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
@ -27,142 +24,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntConsumer;
import java.util.function.Supplier;
import static java.util.Objects.requireNonNull;
/**
* An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
*/
public abstract class InternalAggregation implements Aggregation, NamedWriteable {
/**
* Builds {@link ReduceContext}.
*/
public interface ReduceContextBuilder {
/**
* Build a {@linkplain ReduceContext} to perform a partial reduction.
*/
ReduceContext forPartialReduction();
/**
* Build a {@linkplain ReduceContext} to perform the final reduction.
*/
ReduceContext forFinalReduction();
}
public static class ReduceContext {
private final BigArrays bigArrays;
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;
private final Supplier<Boolean> isCanceled;
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
private final Supplier<PipelineTree> pipelineTreeForBwcSerialization;
/**
* Build a {@linkplain ReduceContext} to perform a partial reduction.
*/
public static ReduceContext forPartialReduction(
BigArrays bigArrays,
ScriptService scriptService,
Supplier<PipelineTree> pipelineTreeForBwcSerialization,
Supplier<Boolean> isCanceled
) {
return new ReduceContext(bigArrays, scriptService, (s) -> {}, null, pipelineTreeForBwcSerialization, isCanceled);
}
/**
* Build a {@linkplain ReduceContext} to perform the final reduction.
* @param pipelineTreeRoot The root of tree of pipeline aggregations for this request
*/
public static ReduceContext forFinalReduction(
BigArrays bigArrays,
ScriptService scriptService,
IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot,
Supplier<Boolean> isCanceled
) {
return new ReduceContext(
bigArrays,
scriptService,
multiBucketConsumer,
requireNonNull(pipelineTreeRoot, "prefer EMPTY to null"),
() -> pipelineTreeRoot,
isCanceled
);
}
private ReduceContext(
BigArrays bigArrays,
ScriptService scriptService,
IntConsumer multiBucketConsumer,
PipelineTree pipelineTreeRoot,
Supplier<PipelineTree> pipelineTreeForBwcSerialization,
Supplier<Boolean> isCanceled
) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
this.isCanceled = isCanceled;
}
/**
* Returns <code>true</code> iff the current reduce phase is the final reduce phase. This indicates if operations like
* pipeline aggregations should be applied or if specific features like {@code minDocCount} should be taken into account.
* Operations that are potentially losing information can only be applied during the final reduce phase.
*/
public boolean isFinalReduce() {
return pipelineTreeRoot != null;
}
public BigArrays bigArrays() {
return bigArrays;
}
public ScriptService scriptService() {
return scriptService;
}
/**
* The root of the tree of pipeline aggregations for this request.
*/
public PipelineTree pipelineTreeRoot() {
return pipelineTreeRoot;
}
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
* to them.
*/
public Supplier<PipelineTree> pipelineTreeForBwcSerialization() {
return pipelineTreeForBwcSerialization;
}
/**
* Adds {@code count} buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
public void consumeBucketsAndMaybeBreak(int size) {
// This is a volatile read.
if (isCanceled.get()) {
throw new TaskCancelledException("Cancelled");
}
multiBucketConsumer.accept(size);
}
public Supplier<Boolean> isCanceled() {
return isCanceled;
}
}
protected final String name;
protected final Map<String, Object> metadata;
@ -234,7 +100,7 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
*/
public InternalAggregation reducePipelines(
InternalAggregation reducedAggs,
ReduceContext reduceContext,
AggregationReduceContext reduceContext,
PipelineTree pipelinesForThisAgg
) {
assert reduceContext.isFinalReduce();
@ -252,7 +118,7 @@ public abstract class InternalAggregation implements Aggregation, NamedWriteable
*
* @see #mustReduceOnSingleInternalAgg()
*/
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext);
public abstract InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext);
/**
* Signal the framework if the {@linkplain InternalAggregation#reduce(List, ReduceContext)} phase needs to be called

View file

@ -10,7 +10,6 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.aggregations.support.AggregationPath;
@ -99,7 +98,7 @@ public final class InternalAggregations extends Aggregations implements Writeabl
* This method first reduces the aggregations, and if it is the final reduce, then reduce the pipeline
* aggregations (both embedded parent/sibling as well as top-level sibling pipelines)
*/
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
public static InternalAggregations topLevelReduce(List<InternalAggregations> aggregationsList, AggregationReduceContext context) {
InternalAggregations reduced = reduce(aggregationsList, context);
if (reduced == null) {
return null;
@ -127,7 +126,7 @@ public final class InternalAggregations extends Aggregations implements Writeabl
* Note that pipeline aggregations _are not_ reduced by this method. Pipelines are handled
* separately by {@link InternalAggregations#topLevelReduce(List, ReduceContext)}
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, ReduceContext context) {
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList, AggregationReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}

View file

@ -63,7 +63,7 @@ public abstract class InternalMultiBucketAggregation<
* Reduce a list of same-keyed buckets (from multiple shards) to a single bucket. This
* requires all buckets to have the same key.
*/
protected abstract B reduceBucket(List<B> buckets, ReduceContext context);
protected abstract B reduceBucket(List<B> buckets, AggregationReduceContext context);
@Override
public abstract List<B> getBuckets();
@ -141,7 +141,7 @@ public abstract class InternalMultiBucketAggregation<
@Override
public final InternalAggregation reducePipelines(
InternalAggregation reducedAggs,
ReduceContext reduceContext,
AggregationReduceContext reduceContext,
PipelineTree pipelineTree
) {
assert reduceContext.isFinalReduce();
@ -182,7 +182,7 @@ public abstract class InternalMultiBucketAggregation<
}
}
private List<B> reducePipelineBuckets(ReduceContext reduceContext, PipelineTree pipelineTree) {
private List<B> reducePipelineBuckets(AggregationReduceContext reduceContext, PipelineTree pipelineTree) {
List<B> reducedBuckets = new ArrayList<>();
for (B bucket : getBuckets()) {
List<InternalAggregation> aggs = new ArrayList<>();

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
@ -89,7 +90,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
protected abstract InternalSingleBucketAggregation newAggregation(String name, long docCount, InternalAggregations subAggregations);
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long docCount = 0L;
List<InternalAggregations> subAggregationsList = new ArrayList<>(aggregations.size());
for (InternalAggregation aggregation : aggregations) {
@ -108,7 +109,7 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
@Override
public final InternalAggregation reducePipelines(
InternalAggregation reducedAggs,
ReduceContext reduceContext,
AggregationReduceContext reduceContext,
PipelineTree pipelineTree
) {
assert reduceContext.isFinalReduce();

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.adjacency;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -167,7 +168,7 @@ public class InternalAdjacencyMatrix extends InternalMultiBucketAggregation<Inte
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
Map<String, List<InternalBucket>> bucketsMap = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
InternalAdjacencyMatrix filters = (InternalAdjacencyMatrix) aggregation;
@ -197,7 +198,7 @@ public class InternalAdjacencyMatrix extends InternalMultiBucketAggregation<Inte
}
@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());

View file

@ -14,6 +14,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -186,7 +187,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
PriorityQueue<BucketIterator> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(BucketIterator a, BucketIterator b) {
@ -253,7 +254,7 @@ public class InternalComposite extends InternalMultiBucketAggregation<InternalCo
}
@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.filter;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -175,7 +176,7 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<List<InternalBucket>> bucketsList = null;
for (InternalAggregation aggregation : aggregations) {
InternalFilters filters = (InternalFilters) aggregation;
@ -203,7 +204,7 @@ public class InternalFilters extends InternalMultiBucketAggregation<InternalFilt
}
@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, ReduceContext context) {
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());

View file

@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.LongObjectPagedHashMap;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -69,7 +70,7 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket> extends I
}
@Override
public InternalGeoGrid<B> reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalGeoGrid<B> reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
LongObjectPagedHashMap<List<InternalGeoGridBucket>> buckets = null;
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
@ -104,7 +105,7 @@ public abstract class InternalGeoGrid<B extends InternalGeoGridBucket> extends I
}
@Override
protected InternalGeoGridBucket reduceBucket(List<InternalGeoGridBucket> buckets, ReduceContext context) {
protected InternalGeoGridBucket reduceBucket(List<InternalGeoGridBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
long docCount = 0;

View file

@ -12,6 +12,7 @@ import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -271,7 +272,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
* rounding returned across all the shards so the resolution of the buckets
* is the same and they can be reduced together.
*/
private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
private BucketReduceResult reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// First we need to find the highest level rounding used across all the
// shards
@ -338,7 +339,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
return mergeBucketsIfNeeded(new BucketReduceResult(reducedBuckets, reduceRoundingIdx, 1, reduceRounding, min, max), reduceContext);
}
private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, ReduceContext reduceContext) {
private BucketReduceResult mergeBucketsIfNeeded(BucketReduceResult firstPassResult, AggregationReduceContext reduceContext) {
int idx = firstPassResult.roundingIdx;
RoundingInfo info = bucketInfo.roundingInfos[idx];
List<Bucket> buckets = firstPassResult.buckets;
@ -357,7 +358,11 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
return min <= max ? rounding.prepare(min, max) : rounding.prepareForUnknown();
}
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding.Prepared reduceRounding, ReduceContext reduceContext) {
private List<Bucket> mergeBuckets(
List<Bucket> reducedBuckets,
Rounding.Prepared reduceRounding,
AggregationReduceContext reduceContext
) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();
@ -384,7 +389,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
@ -421,7 +426,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
}
}
private BucketReduceResult addEmptyBuckets(BucketReduceResult current, ReduceContext reduceContext) {
private BucketReduceResult addEmptyBuckets(BucketReduceResult current, AggregationReduceContext reduceContext) {
List<Bucket> list = current.buckets;
if (list.isEmpty()) {
return current;
@ -500,7 +505,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {
@ -531,7 +536,7 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
);
}
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, ReduceContext reduceContext) {
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, AggregationReduceContext reduceContext) {
List<Bucket> buckets = current.buckets;
RoundingInfo roundingInfo = bucketInfo.roundingInfos[current.roundingIdx];
if (buckets.size() > targetBuckets) {
@ -548,7 +553,11 @@ public final class InternalAutoDateHistogram extends InternalMultiBucketAggregat
return current;
}
private BucketReduceResult mergeConsecutiveBuckets(BucketReduceResult current, int mergeInterval, ReduceContext reduceContext) {
private BucketReduceResult mergeConsecutiveBuckets(
BucketReduceResult current,
int mergeInterval,
AggregationReduceContext reduceContext
) {
List<Bucket> mergedBuckets = new ArrayList<>();
List<Bucket> sameKeyedBuckets = new ArrayList<>();

View file

@ -13,6 +13,7 @@ import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -287,7 +288,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
@ -348,7 +349,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
* requires all buckets to have the same key.
*/
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
@ -372,7 +373,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
*/
private static final int REPORT_EMPTY_EVERY = 10_000;
private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
* counting all of the empties we plan to add and firing them into
@ -456,7 +457,7 @@ public final class InternalDateHistogram extends InternalMultiBucketAggregation<
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
boolean alreadyAccountedForBuckets = false;
if (reduceContext.isFinalReduce()) {

View file

@ -12,6 +12,7 @@ import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -269,8 +270,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
return new Bucket(prototype.key, prototype.docCount, prototype.keyed, prototype.format, aggregations);
}
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
private List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
final PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
@ -327,7 +327,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
@ -359,7 +359,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
*/
private static final int REPORT_EMPTY_EVERY = 10_000;
private void addEmptyBuckets(List<Bucket> list, ReduceContext reduceContext) {
private void addEmptyBuckets(List<Bucket> list, AggregationReduceContext reduceContext) {
/*
* Make sure we have space for the empty buckets we're going to add by
* counting all of the empties we plan to add and firing them into
@ -431,7 +431,7 @@ public final class InternalHistogram extends InternalMultiBucketAggregation<Inte
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
boolean alreadyAccountedForBuckets = false;
if (reduceContext.isFinalReduce()) {

View file

@ -12,6 +12,7 @@ import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -316,7 +317,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
double min = Double.POSITIVE_INFINITY;
@ -335,7 +336,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
return new Bucket(centroid, bounds, docCount, format, aggs);
}
public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public List<Bucket> reduceBuckets(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
PriorityQueue<IteratorAndCurrent<Bucket>> pq = new PriorityQueue<>(aggregations.size()) {
@Override
protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Bucket> b) {
@ -420,7 +421,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
* For each range {startIdx, endIdx} in <code>ranges</code>, all the buckets in that index range
* from <code>buckets</code> are merged, and this merged bucket replaces the entire range.
*/
private void mergeBucketsWithPlan(List<Bucket> buckets, List<BucketRange> plan, ReduceContext reduceContext) {
private void mergeBucketsWithPlan(List<Bucket> buckets, List<BucketRange> plan, AggregationReduceContext reduceContext) {
for (int i = plan.size() - 1; i >= 0; i--) {
BucketRange range = plan.get(i);
int endIdx = range.endIdx;
@ -450,7 +451,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
*
* Requires: <code>buckets</code> is sorted by centroid.
*/
private void mergeBucketsIfNeeded(List<Bucket> buckets, int targetNumBuckets, ReduceContext reduceContext) {
private void mergeBucketsIfNeeded(List<Bucket> buckets, int targetNumBuckets, AggregationReduceContext reduceContext) {
// Make a plan for getting the target number of buckets
// Each range represents a set of adjacent bucket indices of buckets that will be merged together
List<BucketRange> ranges = new ArrayList<>();
@ -489,7 +490,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
mergeBucketsWithPlan(buckets, ranges, reduceContext);
}
private void mergeBucketsWithSameMin(List<Bucket> buckets, ReduceContext reduceContext) {
private void mergeBucketsWithSameMin(List<Bucket> buckets, AggregationReduceContext reduceContext) {
// Create a merge plan
List<BucketRange> ranges = new ArrayList<>();
@ -526,7 +527,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
*
* After this adjustment, A will contain more values than indicated and B will have less.
*/
private void adjustBoundsForOverlappingBuckets(List<Bucket> buckets, ReduceContext reduceContext) {
private void adjustBoundsForOverlappingBuckets(List<Bucket> buckets, AggregationReduceContext reduceContext) {
for (int i = 1; i < buckets.size(); i++) {
Bucket curBucket = buckets.get(i);
Bucket prevBucket = buckets.get(i - 1);
@ -540,7 +541,7 @@ public class InternalVariableWidthHistogram extends InternalMultiBucketAggregati
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Bucket> reducedBuckets = reduceBuckets(aggregations, reduceContext);
if (reduceContext.isFinalReduce()) {

View file

@ -12,6 +12,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -223,7 +224,7 @@ public final class InternalBinaryRange extends InternalMultiBucketAggregation<In
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(buckets.size());
long[] docCounts = new long[buckets.size()];
InternalAggregations[][] aggs = new InternalAggregations[buckets.size()][];
@ -260,7 +261,7 @@ public final class InternalBinaryRange extends InternalMultiBucketAggregation<In
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregationsList = buckets.stream().map(bucket -> bucket.aggregations).collect(Collectors.toList());
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.range;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -304,7 +305,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
@SuppressWarnings("unchecked")
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
reduceContext.consumeBucketsAndMaybeBreak(ranges.size());
@SuppressWarnings("rawtypes")
List<B>[] rangeList = new List[ranges.size()];
@ -327,7 +328,7 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
}
@Override
protected B reduceBucket(List<B> buckets, ReduceContext context) {
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
long docCount = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());

View file

@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.bucket.sampler;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.xcontent.XContentBuilder;
@ -37,7 +38,7 @@ public class UnmappedSampler extends InternalSampler {
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
return new UnmappedSampler(name, metadata);
}

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.DelayedBucket;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -84,7 +85,7 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
protected abstract B createBucket(long docCount, InternalAggregations aggs, long docCountError, B prototype);
@Override
public B reduceBucket(List<B> buckets, ReduceContext context) {
public B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
long docCount = 0;
// For the per term doc count error we add up the errors from the
@ -151,7 +152,7 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
*/
private BucketOrder reduceBuckets(
List<InternalAggregation> aggregations,
InternalAggregation.ReduceContext reduceContext,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
) {
/*
@ -174,7 +175,7 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
private void reduceMergeSort(
List<InternalAggregation> aggregations,
BucketOrder thisReduceOrder,
InternalAggregation.ReduceContext reduceContext,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
) {
assert isKeyOrder(thisReduceOrder);
@ -231,7 +232,7 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
private void reduceLegacy(
List<InternalAggregation> aggregations,
InternalAggregation.ReduceContext reduceContext,
AggregationReduceContext reduceContext,
Function<DelayedBucket<B>, Boolean> sink
) {
Map<Object, List<B>> bucketMap = new HashMap<>();
@ -254,7 +255,7 @@ public abstract class AbstractInternalTerms<A extends AbstractInternalTerms<A, B
}
}
public InternalAggregation reduce(List<InternalAggregation> aggregations, InternalAggregation.ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long sumDocCountError = 0;
long[] otherDocCount = new long[] { 0 };
A referenceTerms = null;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -186,7 +187,7 @@ public class DoubleTerms extends InternalMappedTerms<DoubleTerms, DoubleTerms.Bu
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
boolean promoteToDouble = false;
for (InternalAggregation agg : aggregations) {
if (agg instanceof LongTerms

View file

@ -16,6 +16,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -81,7 +82,7 @@ public abstract class InternalMappedRareTerms<A extends InternalRareTerms<A, B>,
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
Map<Object, List<B>> buckets = new HashMap<>();
InternalRareTerms<A, B> referenceTerms = null;
SetBackedScalingCuckooFilter filter = null;

View file

@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -144,14 +145,14 @@ public abstract class InternalRareTerms<A extends InternalRareTerms<A, B>, B ext
public abstract B getBucketByKey(String term);
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException();
}
abstract B createBucket(long docCount, InternalAggregations aggs, B prototype);
@Override
protected B reduceBucket(List<B> buckets, ReduceContext context) {
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
long docCount = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -195,7 +196,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
public abstract List<B> getBuckets();
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long globalSubsetSize = 0;
long globalSupersetSize = 0;
// Compute the overall result set size and the corpus size using the
@ -256,7 +257,7 @@ public abstract class InternalSignificantTerms<A extends InternalSignificantTerm
}
@Override
protected B reduceBucket(List<B> buckets, ReduceContext context) {
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
long subsetDf = 0;
long supersetDf = 0;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -198,7 +199,7 @@ public class LongTerms extends InternalMappedTerms<LongTerms, LongTerms.Bucket>
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
boolean unsignedLongFormat = false;
boolean rawFormat = false;
for (InternalAggregation agg : aggregations) {

View file

@ -11,6 +11,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.SetBackedScalingCuckooFilter;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.xcontent.XContentBuilder;
@ -81,7 +82,7 @@ public class UnmappedRareTerms extends InternalRareTerms<UnmappedRareTerms, Unma
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
return new UnmappedRareTerms(name, metadata);
}

View file

@ -11,6 +11,7 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
@ -103,7 +104,7 @@ public class UnmappedSignificantTerms extends InternalSignificantTerms<UnmappedS
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
return new UnmappedSignificantTerms(name, requiredSize, minDocCount, metadata);
}

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.bucket.terms;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -91,7 +92,7 @@ public class UnmappedTerms extends InternalTerms<UnmappedTerms, UnmappedTerms.Bu
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
return new UnmappedTerms(name, order, requiredSize, minDocCount, metadata);
}

View file

@ -12,7 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.SignificantTermsHeuristicScoreScript;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentBuilder;
@ -87,7 +87,7 @@ public class ScriptHeuristic extends SignificanceHeuristic {
}
@Override
public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext context) {
public SignificanceHeuristic rewrite(AggregationReduceContext context) {
SignificantTermsHeuristicScoreScript.Factory factory = context.scriptService()
.compile(script, SignificantTermsHeuristicScoreScript.CONTEXT);
return new ExecutableScriptHeuristic(script, factory.newInstance());

View file

@ -9,7 +9,7 @@
package org.elasticsearch.search.aggregations.bucket.terms.heuristic;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.bucket.terms.SignificantTerms;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.xcontent.ToXContentFragment;
@ -53,7 +53,7 @@ public abstract class SignificanceHeuristic implements NamedWriteable, ToXConten
* @param reduceContext the reduce context on the coordinating node
* @return a version of this heuristic suitable for execution
*/
public SignificanceHeuristic rewrite(InternalAggregation.ReduceContext reduceContext) {
public SignificanceHeuristic rewrite(AggregationReduceContext reduceContext) {
return this;
}

View file

@ -12,6 +12,7 @@ import org.HdrHistogram.DoubleHistogram;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -119,7 +120,7 @@ abstract class AbstractInternalHDRPercentiles extends InternalNumericMetricsAggr
}
@Override
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public AbstractInternalHDRPercentiles reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
DoubleHistogram merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalHDRPercentiles percentiles = (AbstractInternalHDRPercentiles) aggregation;

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -103,7 +104,7 @@ abstract class AbstractInternalTDigestPercentiles extends InternalNumericMetrics
}
@Override
public AbstractInternalTDigestPercentiles reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public AbstractInternalTDigestPercentiles reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
TDigestState merged = null;
for (InternalAggregation aggregation : aggregations) {
final AbstractInternalTDigestPercentiles percentiles = (AbstractInternalTDigestPercentiles) aggregation;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -74,7 +75,7 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAvg reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
CompensatedSum kahanSummation = new CompensatedSum(0, 0);
long count = 0;
// Compute the sum of double values with Kahan summation algorithm which is more

View file

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -72,7 +73,7 @@ public final class InternalCardinality extends InternalNumericMetricsAggregation
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
HyperLogLogPlusPlus reduced = null;
for (InternalAggregation aggregation : aggregations) {
final InternalCardinality cardinality = (InternalCardinality) aggregation;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -256,7 +257,7 @@ public class InternalExtendedStats extends InternalStats implements ExtendedStat
}
@Override
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
double sumOfSqrs = 0;
double compensationOfSqrs = 0;
for (InternalAggregation aggregation : aggregations) {

View file

@ -12,6 +12,7 @@ import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -81,7 +82,7 @@ public class InternalGeoBounds extends InternalAggregation implements GeoBounds
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
double top = Double.NEGATIVE_INFINITY;
double bottom = Double.POSITIVE_INFINITY;
double posLeft = Double.POSITIVE_INFINITY;

View file

@ -13,6 +13,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentBuilder;
@ -102,7 +103,7 @@ public class InternalGeoCentroid extends InternalAggregation implements GeoCentr
}
@Override
public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalGeoCentroid reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
double lonSum = Double.NaN;
double latSum = Double.NaN;
long totalCount = 0;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -58,7 +59,7 @@ public class InternalMax extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
double max = Double.NEGATIVE_INFINITY;
for (InternalAggregation aggregation : aggregations) {
max = Math.max(max, ((InternalMax) aggregation).max);

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -63,7 +64,7 @@ public class InternalMedianAbsoluteDeviation extends InternalNumericMetricsAggre
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
final TDigestState valueMerged = new TDigestState(valuesSketch.compression());
for (InternalAggregation aggregation : aggregations) {
final InternalMedianAbsoluteDeviation madAggregation = (InternalMedianAbsoluteDeviation) aggregation;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -58,7 +59,7 @@ public class InternalMin extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalMin reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMin reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
double min = Double.POSITIVE_INFINITY;
for (InternalAggregation aggregation : aggregations) {
min = Math.min(min, ((InternalMin) aggregation).min);

View file

@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptedMetricAggContexts;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -87,7 +88,7 @@ public class InternalScriptedMetric extends InternalAggregation implements Scrip
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
List<Object> aggregationObjects = new ArrayList<>();
for (InternalAggregation aggregation : aggregations) {
InternalScriptedMetric mapReduceAggregation = (InternalScriptedMetric) aggregation;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -161,7 +162,7 @@ public class InternalStats extends InternalNumericMetricsAggregation.MultiValue
}
@Override
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long count = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -58,7 +59,7 @@ public class InternalSum extends InternalNumericMetricsAggregation.SingleValue i
}
@Override
public InternalSum reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSum reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
CompensatedSum kahanSummation = new CompensatedSum(0, 0);

View file

@ -19,6 +19,7 @@ import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -93,7 +94,7 @@ public class InternalTopHits extends InternalAggregation implements TopHits {
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
final SearchHits[] shardHits = new SearchHits[aggregations.size()];
final int from;
final int size;

View file

@ -9,6 +9,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -57,7 +58,7 @@ public class InternalValueCount extends InternalNumericMetricsAggregation.Single
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long valueCount = 0;
for (InternalAggregation aggregation : aggregations) {
valueCount += ((InternalValueCount) aggregation).value;

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -74,7 +75,7 @@ public class InternalWeightedAvg extends InternalNumericMetricsAggregation.Singl
}
@Override
public InternalWeightedAvg reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalWeightedAvg reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
CompensatedSum sumCompensation = new CompensatedSum(0, 0);
CompensatedSum weightCompensation = new CompensatedSum(0, 0);

View file

@ -10,9 +10,9 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
import org.elasticsearch.search.aggregations.support.AggregationPath;
@ -42,7 +42,7 @@ public abstract class BucketMetricsPipelineAggregator extends SiblingPipelineAgg
}
@Override
public final InternalAggregation doReduce(Aggregations aggregations, ReduceContext context) {
public final InternalAggregation doReduce(Aggregations aggregations, AggregationReduceContext context) {
preCollection();
List<String> bucketsPath = AggregationPath.parse(bucketsPaths()[0]).getPathElementsAsStringList();
for (Aggregation aggregation : aggregations) {

View file

@ -11,8 +11,8 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.script.BucketAggregationScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
@ -48,7 +48,7 @@ public class BucketScriptPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings({ "rawtypes", "unchecked" })
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;

View file

@ -10,8 +10,8 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.script.BucketAggregationSelectorScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
@ -41,7 +41,7 @@ public class BucketSelectorPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings({ "rawtypes", "unchecked" })
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;

View file

@ -7,8 +7,8 @@
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
@ -44,7 +44,7 @@ public class BucketSortPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings({ "rawtypes", "unchecked" })
InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket> originalAgg =
(InternalMultiBucketAggregation<InternalMultiBucketAggregation, InternalMultiBucketAggregation.InternalBucket>) aggregation;

View file

@ -9,8 +9,8 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
@ -34,7 +34,7 @@ public class CumulativeSumPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation<
? extends InternalMultiBucketAggregation,

View file

@ -9,8 +9,8 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
@ -45,7 +45,7 @@ public class DerivativePipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation<
? extends InternalMultiBucketAggregation,

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.xcontent.ParseField;
@ -73,7 +74,7 @@ public class InternalBucketMetricValue extends InternalNumericMetricsAggregation
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalExtendedStats;
@ -45,7 +46,7 @@ public class InternalExtendedStatsBucket extends InternalExtendedStats implement
}
@Override
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalExtendedStats reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
@ -129,7 +130,7 @@ public class InternalPercentilesBucket extends InternalNumericMetricsAggregation
}
@Override
public InternalMax reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalMax reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View file

@ -11,6 +11,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -64,7 +65,7 @@ public class InternalSimpleValue extends InternalNumericMetricsAggregation.Singl
}
@Override
public InternalSimpleValue reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSimpleValue reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalStats;
@ -43,7 +44,7 @@ public class InternalStatsBucket extends InternalStats implements StatsBucket {
}
@Override
public InternalStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStats reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}
}

View file

@ -10,6 +10,7 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
@ -70,7 +71,7 @@ public class MovFnPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, InternalAggregation.ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation<
? extends InternalMultiBucketAggregation,

View file

@ -8,8 +8,8 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
@ -114,5 +114,5 @@ public abstract class PipelineAggregator {
return metadata;
}
public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
public abstract InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext);
}

View file

@ -11,8 +11,8 @@ package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.collect.EvictingQueue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
@ -47,7 +47,7 @@ public class SerialDiffPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("rawtypes")
InternalMultiBucketAggregation<
? extends InternalMultiBucketAggregation,

View file

@ -8,9 +8,9 @@
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import java.util.List;
@ -22,7 +22,7 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
return aggregation.copyWithRewritenBuckets(aggregations -> {
List<InternalAggregation> aggs = aggregations.copyResults();
aggs.add(doReduce(aggregations, reduceContext));
@ -30,5 +30,5 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
});
}
public abstract InternalAggregation doReduce(Aggregations aggregations, ReduceContext context);
public abstract InternalAggregation doReduce(Aggregations aggregations, AggregationReduceContext context);
}

View file

@ -20,7 +20,7 @@ import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.query.QuerySearchResult;
@ -46,19 +46,14 @@ public class QueryPhaseResultConsumerTests extends ESTestCase {
@Before
public void setup() {
searchPhaseController = new SearchPhaseController((t, s) -> new InternalAggregation.ReduceContextBuilder() {
searchPhaseController = new SearchPhaseController((t, s) -> new AggregationReduceContext.Builder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
() -> PipelineAggregator.PipelineTree.EMPTY,
t
);
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t);
}
public InternalAggregation.ReduceContext forFinalReduction() {
return InternalAggregation.ReduceContext.forFinalReduction(
public AggregationReduceContext forFinalReduction() {
return new AggregationReduceContext.ForFinal(
BigArrays.NON_RECYCLING_INSTANCE,
null,
b -> {},

View file

@ -41,8 +41,7 @@ import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.metrics.InternalMax;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
@ -112,27 +111,16 @@ public class SearchPhaseControllerTests extends ESTestCase {
@Before
public void setup() {
reductions = new CopyOnWriteArrayList<>();
searchPhaseController = new SearchPhaseController((t, s) -> new InternalAggregation.ReduceContextBuilder() {
searchPhaseController = new SearchPhaseController((t, s) -> new AggregationReduceContext.Builder() {
@Override
public ReduceContext forPartialReduction() {
public AggregationReduceContext forPartialReduction() {
reductions.add(false);
return InternalAggregation.ReduceContext.forPartialReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
() -> PipelineTree.EMPTY,
t
);
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, t);
}
public ReduceContext forFinalReduction() {
public AggregationReduceContext forFinalReduction() {
reductions.add(true);
return InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
b -> {},
PipelineTree.EMPTY,
t
);
return new AggregationReduceContext.ForFinal(BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, PipelineTree.EMPTY, t);
};
});
threadPool = new TestThreadPool(SearchPhaseControllerTests.class.getName());

View file

@ -47,7 +47,7 @@ import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseBuilder;
@ -465,7 +465,7 @@ public class TransportSearchActionTests extends ESTestCase {
boolean local = randomBoolean();
OriginalIndices localIndices = local ? new OriginalIndices(new String[] { "index" }, SearchRequest.DEFAULT_INDICES_OPTIONS) : null;
TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
Function<Boolean, InternalAggregation.ReduceContext> reduceContext = finalReduce -> null;
Function<Boolean, AggregationReduceContext> reduceContext = finalReduce -> null;
try (MockTransportService service = MockTransportService.createNewService(settings, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();

View file

@ -26,11 +26,11 @@ import org.elasticsearch.index.query.TypeQueryV7Builder;
import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.BaseAggregationBuilder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.heuristic.ChiSquare;
import org.elasticsearch.search.aggregations.pipeline.AbstractPipelineAggregationBuilder;
@ -588,7 +588,7 @@ public class SearchModuleTests extends ESTestCase {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
return null;
}
}

View file

@ -75,7 +75,7 @@ import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.filter.FiltersAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
@ -1241,16 +1241,16 @@ public class SearchServiceTests extends ESSingleNodeTestCase {
public void testCreateReduceContext() {
SearchService service = getInstanceFromNode(SearchService.class);
InternalAggregation.ReduceContextBuilder reduceContextBuilder = service.aggReduceContextBuilder(() -> false, new SearchRequest());
AggregationReduceContext.Builder reduceContextBuilder = service.aggReduceContextBuilder(() -> false, new SearchRequest());
{
InternalAggregation.ReduceContext reduceContext = reduceContextBuilder.forFinalReduction();
AggregationReduceContext reduceContext = reduceContextBuilder.forFinalReduction();
expectThrows(
MultiBucketConsumerService.TooManyBucketsException.class,
() -> reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1)
);
}
{
InternalAggregation.ReduceContext reduceContext = reduceContextBuilder.forPartialReduction();
AggregationReduceContext reduceContext = reduceContextBuilder.forPartialReduction();
reduceContext.consumeBucketsAndMaybeBreak(MultiBucketConsumerService.DEFAULT_MAX_BUCKETS + 1);
}
}

View file

@ -10,22 +10,20 @@ package org.elasticsearch.search.aggregations;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.InternalBucket;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class DelayedBucketTests extends ESTestCase {
public void testToString() {
@ -33,18 +31,18 @@ public class DelayedBucketTests extends ESTestCase {
}
public void testReduced() {
ReduceContext context = mock(ReduceContext.class);
AtomicInteger buckets = new AtomicInteger();
AggregationReduceContext context = new AggregationReduceContext.ForFinal(null, null, buckets::addAndGet, null, () -> false);
DelayedBucket<?> b = new DelayedBucket<>(mockReduce(context), context, List.of(bucket("test", 1), bucket("test", 2)));
assertThat(b.getDocCount(), equalTo(3L));
assertThat(b.reduced(), sameInstance(b.reduced()));
assertThat(b.reduced().getKeyAsString(), equalTo("test"));
assertThat(b.reduced().getDocCount(), equalTo(3L));
verify(context).consumeBucketsAndMaybeBreak(1);
verifyNoMoreInteractions(context);
assertEquals(1, buckets.get());
}
public void testCompareKey() {
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
DelayedBucket<?> a = new DelayedBucket<>(mockReduce(context), context, List.of(bucket("a", 1)));
DelayedBucket<?> b = new DelayedBucket<>(mockReduce(context), context, List.of(bucket("b", 1)));
if (randomBoolean()) {
@ -59,26 +57,31 @@ public class DelayedBucketTests extends ESTestCase {
}
public void testNonCompetitiveNotReduced() {
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
null,
null,
b -> fail("shouldn't be called"),
null,
() -> false
);
new DelayedBucket<>(mockReduce(context), context, List.of(bucket("test", 1))).nonCompetitive();
verifyNoMoreInteractions(context);
}
public void testNonCompetitiveReduced() {
ReduceContext context = mock(ReduceContext.class);
AtomicInteger buckets = new AtomicInteger();
AggregationReduceContext context = new AggregationReduceContext.ForFinal(null, null, buckets::addAndGet, null, () -> false);
DelayedBucket<?> b = new DelayedBucket<>(mockReduce(context), context, List.of(bucket("test", 1)));
b.reduced();
verify(context).consumeBucketsAndMaybeBreak(1);
assertEquals(1, buckets.get());
b.nonCompetitive();
verify(context).consumeBucketsAndMaybeBreak(-1);
verifyNoMoreInteractions(context);
assertEquals(0, buckets.get());
}
private static InternalBucket bucket(String key, long docCount) {
return new StringTerms.Bucket(new BytesRef(key), docCount, InternalAggregations.EMPTY, false, 0, DocValueFormat.RAW);
}
static BiFunction<List<InternalBucket>, ReduceContext, InternalBucket> mockReduce(ReduceContext context) {
static BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> mockReduce(AggregationReduceContext context) {
return (l, c) -> {
assertThat(c, sameInstance(context));
return bucket(l.get(0).getKeyAsString(), l.stream().mapToLong(Bucket::getDocCount).sum());

View file

@ -43,8 +43,8 @@ public class InternalAggregationsTests extends ESTestCase {
public void testReduceEmptyAggs() {
List<InternalAggregations> aggs = Collections.emptyList();
InternalAggregation.ReduceContextBuilder builder = InternalAggregationTestCase.emptyReduceContextBuilder();
InternalAggregation.ReduceContext reduceContext = randomBoolean() ? builder.forFinalReduction() : builder.forPartialReduction();
AggregationReduceContext.Builder builder = InternalAggregationTestCase.emptyReduceContextBuilder();
AggregationReduceContext reduceContext = randomBoolean() ? builder.forFinalReduction() : builder.forPartialReduction();
assertNull(InternalAggregations.reduce(aggs, reduceContext));
}
@ -92,7 +92,7 @@ public class InternalAggregationsTests extends ESTestCase {
assertEquals(2, reducedAggs.aggregations.size());
}
private InternalAggregation.ReduceContextBuilder maxBucketReduceContext() {
private AggregationReduceContext.Builder maxBucketReduceContext() {
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
PipelineAggregator.PipelineTree tree = new PipelineAggregator.PipelineTree(
emptyMap(),

View file

@ -11,10 +11,10 @@ package org.elasticsearch.search.aggregations;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation.InternalBucket;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalAggregationTestCase;
import java.util.ArrayList;
import java.util.List;
@ -23,12 +23,11 @@ import java.util.Locale;
import static org.elasticsearch.search.aggregations.DelayedBucketTests.mockReduce;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Mockito.mock;
public class TopBucketBuilderTests extends ESTestCase {
public void testSizeOne() {
int count = between(1, 1000);
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
List<String> nonCompetitive = new ArrayList<>();
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(1, BucketOrder.key(true), b -> nonCompetitive.add(b.toString()));
@ -48,7 +47,7 @@ public class TopBucketBuilderTests extends ESTestCase {
public void testAllCompetitive() {
int size = between(3, 1000);
int count = between(1, size);
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
size,
BucketOrder.key(true),
@ -69,7 +68,7 @@ public class TopBucketBuilderTests extends ESTestCase {
public void someNonCompetitiveTestCase(int size) {
int count = between(size + 1, size * 30);
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
List<String> nonCompetitive = new ArrayList<>();
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
size,
@ -103,7 +102,7 @@ public class TopBucketBuilderTests extends ESTestCase {
public void testHuge() {
int count = between(1, 1000);
ReduceContext context = mock(ReduceContext.class);
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
TopBucketBuilder<InternalBucket> builder = TopBucketBuilder.build(
Integer.MAX_VALUE,
BucketOrder.key(true),

View file

@ -57,6 +57,7 @@ import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -540,7 +541,7 @@ public class FiltersAggregatorTests extends AggregatorTestCase {
InternalAggregation result = aggregator.buildTopLevel();
result = result.reduce(
List.of(result),
InternalAggregation.ReduceContext.forFinalReduction(
new AggregationReduceContext.ForFinal(
context.bigArrays(),
getMockScriptService(),
b -> {},

View file

@ -8,8 +8,8 @@
package org.elasticsearch.search.aggregations.bucket.filter;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalSingleBucketAggregationTestCase;
import org.elasticsearch.search.aggregations.bucket.ParsedSingleBucketAggregation;
@ -61,7 +61,7 @@ public class InternalFilterTests extends InternalSingleBucketAggregationTestCase
InternalFilter test = createTestInstance("test", randomNonNegativeLong(), sub, emptyMap());
PipelineAggregator mockPipeline = new PipelineAggregator(null, null, null) {
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
return dummy;
}
};

View file

@ -8,8 +8,8 @@
package org.elasticsearch.search.aggregations.bucket.filter;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.filter.InternalFilters.InternalBucket;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -124,7 +124,7 @@ public class InternalFiltersTests extends InternalMultiBucketAggregationTestCase
InternalFilters test = createTestInstance("test", emptyMap(), sub);
PipelineAggregator mockPipeline = new PipelineAggregator(null, null, null) {
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
return dummy;
}
};

View file

@ -27,9 +27,9 @@ import org.elasticsearch.index.mapper.BooleanFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.FieldNamesFieldMapper;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.bucket.range.RangeAggregator;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
@ -925,7 +925,7 @@ public class DateHistogramAggregatorTests extends DateHistogramAggregatorTestCas
InternalDateHistogram result = (InternalDateHistogram) agg.buildTopLevel();
result = (InternalDateHistogram) result.reduce(
List.of(result),
ReduceContext.forFinalReduction(
new AggregationReduceContext.ForFinal(
context.bigArrays(),
null,
context.multiBucketConsumer(),

View file

@ -16,6 +16,7 @@ import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
@ -153,7 +154,7 @@ public class InternalVariableWidthHistogramTests extends InternalMultiBucketAggr
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,
@ -207,7 +208,7 @@ public class InternalVariableWidthHistogramTests extends InternalMultiBucketAggr
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,
@ -297,7 +298,7 @@ public class InternalVariableWidthHistogramTests extends InternalMultiBucketAggr
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,
@ -353,7 +354,7 @@ public class InternalVariableWidthHistogramTests extends InternalMultiBucketAggr
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,
@ -414,7 +415,7 @@ public class InternalVariableWidthHistogramTests extends InternalMultiBucketAggr
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,

View file

@ -77,6 +77,7 @@ import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase;
import org.elasticsearch.search.aggregations.BucketOrder;
@ -1268,7 +1269,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
}
dir.close();
}
InternalAggregation.ReduceContext ctx = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext ctx = new AggregationReduceContext.ForFinal(
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
null,
b -> {},
@ -2197,7 +2198,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
Integer.MAX_VALUE,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
getMockScriptService(),
reduceBucketConsumer,

View file

@ -18,8 +18,8 @@ import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.aggregations.Aggregation.CommonFields;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.elasticsearch.test.InternalAggregationTestCase;
@ -163,7 +163,7 @@ public class InternalScriptedMetricTests extends InternalAggregationTestCase<Int
InternalScriptedMetric aggregation = createTestInstance();
return (InternalScriptedMetric) aggregation.reduce(
singletonList(aggregation),
ReduceContext.forFinalReduction(null, mockScriptService(), null, PipelineTree.EMPTY, () -> false)
new AggregationReduceContext.ForFinal(null, mockScriptService(), null, PipelineTree.EMPTY, () -> false)
);
}

View file

@ -106,7 +106,6 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.NestedDocuments;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.MetricsAggregator;
@ -581,10 +580,9 @@ public abstract class AggregatorTestCase extends ESTestCase {
Collections.shuffle(aggs, random());
int r = randomIntBetween(1, toReduceSize);
List<InternalAggregation> toReduce = aggs.subList(0, r);
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forPartialReduction(
AggregationReduceContext reduceContext = new AggregationReduceContext.ForPartial(
context.bigArrays(),
getMockScriptService(),
() -> PipelineAggregator.PipelineTree.EMPTY,
() -> false
);
A reduced = (A) aggs.get(0).reduce(toReduce, reduceContext);
@ -598,7 +596,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
maxBucket,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal(
context.bigArrays(),
getMockScriptService(),
reduceBucketConsumer,
@ -732,7 +730,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
InternalAggregation r = aggregator.buildTopLevel();
r = r.reduce(
List.of(r),
ReduceContext.forFinalReduction(
new AggregationReduceContext.ForFinal(
context.bigArrays(),
getMockScriptService(),
context.multiBucketConsumer(),
@ -1325,7 +1323,7 @@ public abstract class AggregatorTestCase extends ESTestCase {
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
aggregations.forEach(ia -> { assertThat(((InternalAggCardinalityUpperBound) ia).cardinality, equalTo(cardinality)); });
return new InternalAggCardinalityUpperBound(name, cardinality, metadata);
}

View file

@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.bucket.terms.InternalMappedSignificantTerms;
@ -132,7 +133,7 @@ public abstract class AbstractSignificanceHeuristicTestCase extends ESTestCase {
public void testReduce() {
List<InternalAggregation> aggs = createInternalAggregations();
InternalAggregation.ReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
AggregationReduceContext context = InternalAggregationTestCase.emptyReduceContextBuilder().forFinalReduction();
SignificantTerms reducedAgg = (SignificantTerms) aggs.get(0).reduce(aggs, context);
assertThat(reducedAgg.getBuckets().size(), equalTo(2));
assertThat(reducedAgg.getBuckets().get(0).getSubsetDf(), equalTo(8L));

View file

@ -27,8 +27,8 @@ import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService.MultiBucketConsumer;
import org.elasticsearch.search.aggregations.ParsedAggregation;
@ -170,37 +170,26 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
*/
public abstract class InternalAggregationTestCase<T extends InternalAggregation> extends AbstractNamedWriteableTestCase<T> {
/**
* Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid but empty.
* Builds an {@link AggregationReduceContext} that is valid but empty.
*/
public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder() {
public static AggregationReduceContext.Builder emptyReduceContextBuilder() {
return emptyReduceContextBuilder(PipelineTree.EMPTY);
}
/**
* Builds an {@link InternalAggregation.ReduceContextBuilder} that is valid and nearly
* Builds an {@link AggregationReduceContext} that is valid and nearly
* empty <strong>except</strong> that it contain {@link PipelineAggregator}s.
*/
public static InternalAggregation.ReduceContextBuilder emptyReduceContextBuilder(PipelineTree pipelineTree) {
return new InternalAggregation.ReduceContextBuilder() {
public static AggregationReduceContext.Builder emptyReduceContextBuilder(PipelineTree pipelineTree) {
return new AggregationReduceContext.Builder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
() -> pipelineTree,
() -> false
);
public AggregationReduceContext forPartialReduction() {
return new AggregationReduceContext.ForPartial(BigArrays.NON_RECYCLING_INSTANCE, null, () -> false);
}
@Override
public ReduceContext forFinalReduction() {
return InternalAggregation.ReduceContext.forFinalReduction(
BigArrays.NON_RECYCLING_INSTANCE,
null,
b -> {},
pipelineTree,
() -> false
);
public AggregationReduceContext forFinalReduction() {
return new AggregationReduceContext.ForFinal(BigArrays.NON_RECYCLING_INSTANCE, null, b -> {}, pipelineTree, () -> false);
}
};
}
@ -374,12 +363,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
List<InternalAggregation> toPartialReduce = toReduce.subList(0, r);
// Sort aggs so that unmapped come last. This mimicks the behavior of InternalAggregations.reduce()
toPartialReduce.sort(INTERNAL_AGG_COMPARATOR);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(
bigArrays,
mockScriptService,
() -> PipelineAggregator.PipelineTree.EMPTY,
() -> false
);
AggregationReduceContext context = new AggregationReduceContext.ForPartial(bigArrays, mockScriptService, () -> false);
@SuppressWarnings("unchecked")
T reduced = (T) toPartialReduce.get(0).reduce(toPartialReduce, context);
int initialBucketCount = 0;
@ -404,7 +388,7 @@ public abstract class InternalAggregationTestCase<T extends InternalAggregation>
DEFAULT_MAX_BUCKETS,
new NoneCircuitBreakerService().getBreaker(CircuitBreaker.REQUEST)
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext context = new AggregationReduceContext.ForFinal(
bigArrays,
mockScriptService,
bucketConsumer,

View file

@ -10,6 +10,7 @@ package org.elasticsearch.test;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -202,10 +203,10 @@ public abstract class InternalMultiBucketAggregationTestCase<T extends InternalA
}
/**
* Build a reuce
* Expect that reducing this aggregation will pass the bucket limit.
*/
protected static void expectReduceUsesTooManyBuckets(InternalAggregation agg, int bucketLimit) {
InternalAggregation.ReduceContext reduceContext = InternalAggregation.ReduceContext.forFinalReduction(
AggregationReduceContext reduceContext = new AggregationReduceContext.ForFinal(
BigArrays.NON_RECYCLING_INSTANCE,
null,
new IntConsumer() {

View file

@ -12,6 +12,7 @@ import com.tdunning.math.stats.Centroid;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
@ -270,7 +271,7 @@ public class InternalBoxplot extends InternalNumericMetricsAggregation.MultiValu
}
@Override
public InternalBoxplot reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalBoxplot reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
TDigestState merged = null;
for (InternalAggregation aggregation : aggregations) {
final InternalBoxplot percentiles = (InternalBoxplot) aggregation;

View file

@ -8,8 +8,8 @@ package org.elasticsearch.xpack.analytics.cumulativecardinality;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
@ -37,7 +37,7 @@ public class CumulativeCardinalityPipelineAggregator extends PipelineAggregator
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
InternalMultiBucketAggregation<?, ?> histo = (InternalMultiBucketAggregation<?, ?>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();
HistogramFactory factory = (HistogramFactory) histo;

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.analytics.cumulativecardinality;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.SimpleValue;
@ -63,7 +64,7 @@ public class InternalSimpleLongValue extends InternalNumericMetricsAggregation.S
}
@Override
public InternalSimpleLongValue reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalSimpleLongValue reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
throw new UnsupportedOperationException("Not supported");
}

View file

@ -9,8 +9,8 @@ package org.elasticsearch.xpack.analytics.movingPercentiles;
import org.HdrHistogram.DoubleHistogram;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
@ -41,7 +41,7 @@ public class MovingPercentilesPipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("unchecked")
InternalMultiBucketAggregation<?, ?> histo = (InternalMultiBucketAggregation<?, ?>) aggregation;
List<? extends InternalMultiBucketAggregation.InternalBucket> buckets = histo.getBuckets();

View file

@ -13,6 +13,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
@ -558,7 +559,7 @@ public class InternalMultiTerms extends AbstractInternalTerms<InternalMultiTerms
public InternalAggregation reduce(
List<InternalAggregation> aggregations,
ReduceContext reduceContext,
AggregationReduceContext reduceContext,
boolean[] needsPromotionToDouble
) {
if (needsPromotionToDouble != null) {
@ -573,7 +574,7 @@ public class InternalMultiTerms extends AbstractInternalTerms<InternalMultiTerms
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
return reduce(aggregations, reduceContext, needsPromotionToDouble(aggregations));
}

View file

@ -8,8 +8,8 @@
package org.elasticsearch.xpack.analytics.normalize;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalMultiBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
@ -43,7 +43,7 @@ public class NormalizePipelineAggregator extends PipelineAggregator {
}
@Override
public InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext) {
public InternalAggregation reduce(InternalAggregation aggregation, AggregationReduceContext reduceContext) {
@SuppressWarnings("unchecked")
InternalMultiBucketAggregation<?, InternalMultiBucketAggregation.InternalBucket> originalAgg = (InternalMultiBucketAggregation<
?,

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.analytics.rate;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
@ -68,7 +69,7 @@ public class InternalRate extends InternalNumericMetricsAggregation.SingleValue
}
@Override
public InternalRate reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalRate reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
// Compute the sum of double values with Kahan summation algorithm which is more
// accurate than naive summation.
CompensatedSum kahanSummation = new CompensatedSum(0, 0);

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.analytics.stringstats;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
import org.elasticsearch.xcontent.ParseField;
@ -200,7 +201,7 @@ public class InternalStringStats extends InternalAggregation {
@Override
@SuppressWarnings("HiddenField")
public InternalStringStats reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalStringStats reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
long count = 0;
long totalLength = 0;
int minLength = Integer.MAX_VALUE;

View file

@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalMultiValueAggregation;
import org.elasticsearch.search.sort.SortOrder;
@ -109,7 +110,7 @@ public class InternalTopMetrics extends InternalMultiValueAggregation {
}
@Override
public InternalTopMetrics reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalTopMetrics reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
if (false == isMapped()) {
return this;
}

View file

@ -10,6 +10,7 @@ package org.elasticsearch.xpack.analytics.ttest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.xcontent.XContentBuilder;
@ -55,7 +56,7 @@ public class InternalTTest extends InternalNumericMetricsAggregation.SingleValue
}
@Override
public InternalTTest reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalTTest reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
TTestState reduced = state.reduce(aggregations.stream().map(a -> ((InternalTTest) a).state));
return new InternalTTest(name, reduced, format, getMetadata());
}

View file

@ -18,11 +18,10 @@ import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.test.InternalAggregationTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
@ -371,12 +370,7 @@ public class InternalMultiTermsTests extends InternalAggregationTestCase<Interna
keyConverters2,
null
);
InternalAggregation.ReduceContext context = InternalAggregation.ReduceContext.forPartialReduction(
bigArrays,
mockScriptService,
() -> PipelineAggregator.PipelineTree.EMPTY,
() -> false
);
AggregationReduceContext context = new AggregationReduceContext.ForPartial(bigArrays, mockScriptService, () -> false);
InternalMultiTerms result = (InternalMultiTerms) terms1.reduce(List.of(terms1, terms2), context);
assertThat(result.buckets, hasSize(3));

View file

@ -23,7 +23,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;
@ -53,7 +53,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
private final AsyncExecutionId searchId;
private final Client client;
private final ThreadPool threadPool;
private final Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier;
private final Supplier<AggregationReduceContext> aggReduceContextSupplier;
private final Listener progressListener;
private final Map<String, String> originHeaders;
@ -95,7 +95,7 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
AsyncExecutionId searchId,
Client client,
ThreadPool threadPool,
Function<Supplier<Boolean>, Supplier<InternalAggregation.ReduceContext>> aggReduceContextSupplierFactory
Function<Supplier<Boolean>, Supplier<AggregationReduceContext>> aggReduceContextSupplierFactory
) {
super(id, type, action, () -> "async_search{" + descriptionSupplier.get() + "}", parentTaskId, taskHeaders);
this.expirationTimeMillis = getStartTime() + keepAlive.getMillis();

View file

@ -23,7 +23,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportService;
@ -44,7 +44,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
public class TransportSubmitAsyncSearchAction extends HandledTransportAction<SubmitAsyncSearchRequest, AsyncSearchResponse> {
private final NodeClient nodeClient;
private final BiFunction<Supplier<Boolean>, SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
private final BiFunction<Supplier<Boolean>, SearchRequest, AggregationReduceContext> requestToAggReduceContextBuilder;
private final TransportSearchAction searchAction;
private final ThreadContext threadContext;
private final AsyncTaskIndexService<AsyncSearchResponse> store;
@ -149,8 +149,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
@Override
public AsyncSearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> taskHeaders) {
AsyncExecutionId searchId = new AsyncExecutionId(docID, new TaskId(nodeClient.getLocalNodeId(), id));
Function<Supplier<Boolean>, Supplier<InternalAggregation.ReduceContext>> aggReduceContextSupplierFactory = (
isCancelled) -> () -> requestToAggReduceContextBuilder.apply(isCancelled, request.getSearchRequest());
Function<Supplier<Boolean>, Supplier<AggregationReduceContext>> aggReduceContextSupplierFactory =
isCancelled -> () -> requestToAggReduceContextBuilder.apply(isCancelled, request.getSearchRequest());
return new AsyncSearchTask(
id,
type,

View file

@ -14,6 +14,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.BytesRefHash;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.AggregationReduceContext;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -53,7 +54,7 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
return docCount;
}
public Bucket reduce(BucketKey bucketKey, ReduceContext reduceContext) {
public Bucket reduce(BucketKey bucketKey, AggregationReduceContext reduceContext) {
List<InternalAggregations> innerAggs = new ArrayList<>(toReduce.size());
long totalDocCount = 0;
for (Bucket bucket : toReduce) {
@ -335,7 +336,7 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
}
@Override
protected Bucket reduceBucket(List<Bucket> buckets, ReduceContext context) {
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
throw new IllegalArgumentException("For optimization purposes, typical bucket path is not supported");
}
@ -350,7 +351,7 @@ public class InternalCategorizationAggregation extends InternalMultiBucketAggreg
}
@Override
public InternalAggregation reduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
public InternalAggregation reduce(List<InternalAggregation> aggregations, AggregationReduceContext reduceContext) {
try (CategorizationBytesRefHash hash = new CategorizationBytesRefHash(new BytesRefHash(1L, reduceContext.bigArrays()))) {
CategorizationTokenTree categorizationTokenTree = new CategorizationTokenTree(
maxUniqueTokens,

Some files were not shown because too many files have changed in this diff Show more