Refactor Cardinality Aggregation (#85840)

Move more of the strategy decision making for which Cardinality collector to use into the factory, and also the ValuesSourceRegistry.  Lays the ground work for future improvements to Cardinality.

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
This commit is contained in:
Mark Tozzi 2022-04-20 08:47:14 -04:00 committed by GitHub
parent b6f880149b
commit 406dd9320a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 211 additions and 115 deletions

View file

@ -81,6 +81,8 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->
task.skipTest("search.aggregation/20_terms/string profiler via map", "The profiler results aren't backwards compatible.") task.skipTest("search.aggregation/20_terms/string profiler via map", "The profiler results aren't backwards compatible.")
task.skipTest("search.aggregation/20_terms/numeric profiler", "The profiler results aren't backwards compatible.") task.skipTest("search.aggregation/20_terms/numeric profiler", "The profiler results aren't backwards compatible.")
task.skipTest("migration/10_get_feature_upgrade_status/Get feature upgrade status", "Awaits backport") task.skipTest("migration/10_get_feature_upgrade_status/Get feature upgrade status", "Awaits backport")
task.skipTest("search.aggregation/170_cardinality_metric/profiler int", "The profiler results aren't backwards compatible.")
task.skipTest("search.aggregation/170_cardinality_metric/profiler double", "The profiler results aren't backwards compatible.")
task.replaceValueInMatch("_type", "_doc") task.replaceValueInMatch("_type", "_doc")
task.addAllowedWarningRegex("\\[types removal\\].*") task.addAllowedWarningRegex("\\[types removal\\].*")

View file

@ -213,58 +213,6 @@ setup:
field: int_field field: int_field
precision_threshold: -1 precision_threshold: -1
---
"profiler int":
- skip:
version: " - 7.9.99"
reason: introduced in 7.10.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_int:
cardinality:
field: int_field
- match: { aggregations.distinct_int.value: 4 }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
---
"profiler double":
- skip:
version: " - 7.9.99"
reason: introduced in 7.10.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_double:
cardinality:
field: double_field
- match: { aggregations.distinct_double.value: 4 }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.empty_collectors_used: 0 }
- gt: { profile.shards.0.aggregations.0.debug.numeric_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
--- ---
"profiler string": "profiler string":
- skip: - skip:

View file

@ -175,7 +175,7 @@ public abstract class AggregatorBase extends Aggregator {
* {@link Aggregator} that returns a customer {@linkplain LeafBucketCollector} * {@link Aggregator} that returns a customer {@linkplain LeafBucketCollector}
* from this method runs at best {@code O(hits)} time. See the * from this method runs at best {@code O(hits)} time. See the
* {@link SumAggregator#getLeafCollector(LeafReaderContext, LeafBucketCollector) sum} * {@link SumAggregator#getLeafCollector(LeafReaderContext, LeafBucketCollector) sum}
* {@linkplain Aggregator} for a fairly strait forward example of this. * {@linkplain Aggregator} for a fairly straight forward example of this.
* <p> * <p>
* Some {@linkplain Aggregator}s are able to correctly collect results on * Some {@linkplain Aggregator}s are able to correctly collect results on
* their own, without being iterated by the top level query or the rest * their own, without being iterated by the top level query or the rest

View file

@ -17,6 +17,12 @@ import java.util.Optional;
import static org.elasticsearch.search.aggregations.support.AggregationUsageService.OTHER_SUBTYPE; import static org.elasticsearch.search.aggregations.support.AggregationUsageService.OTHER_SUBTYPE;
/**
* Aggregator factories are responsible for creating the per-shard aggregator instances. They should select the aggregator instance type
* based on the type of the data being aggregated (using the {@link org.elasticsearch.search.aggregations.support.ValuesSourceRegistry}
* when appropriate), and any optimizations that factory can make. The factory layer is the correct place for heuristics to select
* different operating modes (such as using Global Ordinals or not).
*/
public abstract class AggregatorFactory { public abstract class AggregatorFactory {
protected final String name; protected final String name;
protected final AggregatorFactory parent; protected final AggregatorFactory parent;
@ -94,4 +100,5 @@ public abstract class AggregatorFactory {
public String getStatsSubtype() { public String getStatsSubtype() {
return OTHER_SUBTYPE; return OTHER_SUBTYPE;
} }
} }

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.Map;
/**
* Cardinality aggregator that operates directly on the bytes values of a field. This is necessary for fields that don't use ordinals at
* all, such as Range Fields.
*/
public class BytesCardinalityAggregator extends CardinalityAggregator {
public BytesCardinalityAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
AggregationContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, precision, context, parent, metadata);
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
postCollectLastCollector();
stringHashingCollectorsUsed++;
return new DirectCollector(counts, MurmurHash3Values.hash(valuesSourceConfig.getValuesSource().bytesValues(ctx)));
}
}

View file

@ -40,22 +40,22 @@ import java.util.function.BiConsumer;
/** /**
* An aggregator that computes approximate counts of unique values. * An aggregator that computes approximate counts of unique values.
*/ */
public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue { public abstract class CardinalityAggregator extends NumericMetricsAggregator.SingleValue {
private final int precision; protected final int precision;
protected final ValuesSourceConfig valuesSourceConfig;
private final ValuesSource valuesSource; private final ValuesSource valuesSource;
// Expensive to initialize, so we only initialize it when we have an actual value source // Expensive to initialize, so we only initialize it when we have an actual value source
@Nullable @Nullable
private HyperLogLogPlusPlus counts; protected final HyperLogLogPlusPlus counts;
private Collector collector; private Collector collector;
private int emptyCollectorsUsed; protected int emptyCollectorsUsed;
private int numericCollectorsUsed; protected int ordinalsCollectorsUsed;
private int ordinalsCollectorsUsed; protected int ordinalsCollectorsOverheadTooHigh;
private int ordinalsCollectorsOverheadTooHigh; protected int stringHashingCollectorsUsed;
private int stringHashingCollectorsUsed;
public CardinalityAggregator( public CardinalityAggregator(
String name, String name,
@ -68,6 +68,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
super(name, context, parent, metadata); super(name, context, parent, metadata);
// TODO: Stop using nulls here // TODO: Stop using nulls here
this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null; this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null;
this.valuesSourceConfig = valuesSourceConfig;
this.precision = precision; this.precision = precision;
this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1); this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1);
} }
@ -77,51 +78,10 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES; return valuesSource != null && valuesSource.needsScores() ? ScoreMode.COMPLETE : ScoreMode.COMPLETE_NO_SCORES;
} }
private Collector pickCollector(LeafReaderContext ctx) throws IOException {
if (valuesSource == null) {
emptyCollectorsUsed++;
return new EmptyCollector();
}
if (valuesSource instanceof ValuesSource.Numeric source) {
MurmurHash3Values hashValues = source.isFloatingPoint()
? MurmurHash3Values.hash(source.doubleValues(ctx))
: MurmurHash3Values.hash(source.longValues(ctx));
numericCollectorsUsed++;
return new DirectCollector(counts, hashValues);
}
if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals source) {
final SortedSetDocValues ordinalValues = source.ordinalsValues(ctx);
final long maxOrd = ordinalValues.getValueCount();
if (maxOrd == 0) {
emptyCollectorsUsed++;
return new EmptyCollector();
}
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
// only use ordinals if they don't increase memory usage by more than 25%
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
ordinalsCollectorsUsed++;
return new OrdinalsCollector(counts, ordinalValues, bigArrays());
}
ordinalsCollectorsOverheadTooHigh++;
}
stringHashingCollectorsUsed++;
return new DirectCollector(counts, MurmurHash3Values.hash(valuesSource.bytesValues(ctx)));
}
@Override @Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException { public abstract LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException;
postCollectLastCollector();
collector = pickCollector(ctx); protected void postCollectLastCollector() throws IOException {
return collector;
}
private void postCollectLastCollector() throws IOException {
if (collector != null) { if (collector != null) {
try { try {
collector.postCollect(); collector.postCollect();
@ -167,19 +127,18 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
public void collectDebugInfo(BiConsumer<String, Object> add) { public void collectDebugInfo(BiConsumer<String, Object> add) {
super.collectDebugInfo(add); super.collectDebugInfo(add);
add.accept("empty_collectors_used", emptyCollectorsUsed); add.accept("empty_collectors_used", emptyCollectorsUsed);
add.accept("numeric_collectors_used", numericCollectorsUsed);
add.accept("ordinals_collectors_used", ordinalsCollectorsUsed); add.accept("ordinals_collectors_used", ordinalsCollectorsUsed);
add.accept("ordinals_collectors_overhead_too_high", ordinalsCollectorsOverheadTooHigh); add.accept("ordinals_collectors_overhead_too_high", ordinalsCollectorsOverheadTooHigh);
add.accept("string_hashing_collectors_used", stringHashingCollectorsUsed); add.accept("string_hashing_collectors_used", stringHashingCollectorsUsed);
} }
private abstract static class Collector extends LeafBucketCollector implements Releasable { protected abstract static class Collector extends LeafBucketCollector implements Releasable {
public abstract void postCollect() throws IOException; public abstract void postCollect() throws IOException;
} }
private static class EmptyCollector extends Collector { protected static class EmptyCollector extends Collector {
@Override @Override
public void collect(int doc, long bucketOrd) { public void collect(int doc, long bucketOrd) {
@ -197,7 +156,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
} }
} }
private static class DirectCollector extends Collector { protected static class DirectCollector extends Collector {
private final MurmurHash3Values hashes; private final MurmurHash3Values hashes;
private final HyperLogLogPlusPlus counts; private final HyperLogLogPlusPlus counts;
@ -229,7 +188,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
} }
private static class OrdinalsCollector extends Collector { protected static class OrdinalsCollector extends Collector {
private static final long SHALLOW_FIXEDBITSET_SIZE = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class); private static final long SHALLOW_FIXEDBITSET_SIZE = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class);

View file

@ -13,6 +13,7 @@ import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.CardinalityUpperBound; import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource; import org.elasticsearch.search.aggregations.support.ValuesSource;
@ -48,7 +49,20 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
public static void registerAggregators(ValuesSourceRegistry.Builder builder) { public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register( builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY, CardinalityAggregationBuilder.REGISTRY_KEY,
CoreValuesSourceType.ALL_CORE, List.of(CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN, CoreValuesSourceType.DATE),
NumericCardinalityAggregator::new,
true
);
builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY,
List.of(CoreValuesSourceType.GEOPOINT, CoreValuesSourceType.RANGE),
BytesCardinalityAggregator::new,
true
);
builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY,
List.of(CoreValuesSourceType.KEYWORD, CoreValuesSourceType.IP),
(name, valuesSourceConfig, precision, context, parent, metadata) -> { (name, valuesSourceConfig, precision, context, parent, metadata) -> {
// check global ords // check global ords
if (valuesSourceConfig.hasValues()) { if (valuesSourceConfig.hasValues()) {
@ -65,10 +79,12 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
metadata metadata
); );
} }
// fallback in the default aggregator
return new SegmentOrdinalsCardinalityAggregator(name, valuesSourceConfig, precision, context, parent, metadata);
} }
} }
// fallback in the default aggregator // If we don't have ordinals, don't try to use an ordinals collector
return new CardinalityAggregator(name, valuesSourceConfig, precision, context, parent, metadata); return new BytesCardinalityAggregator(name, valuesSourceConfig, precision, context, parent, metadata);
}, },
true true
); );
@ -91,7 +107,12 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
@Override @Override
protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metadata) throws IOException { protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metadata) throws IOException {
return new CardinalityAggregator(name, config, precision(), context, parent, metadata); return new CardinalityAggregator(name, config, precision(), context, parent, metadata) {
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
return new EmptyCollector();
}
};
} }
@Override @Override

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.index.LeafReaderContext;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.Map;
/**
* Specialization of the cardinality aggregator to collect numeric values.
*/
public class NumericCardinalityAggregator extends CardinalityAggregator {
private final ValuesSource.Numeric source;
public NumericCardinalityAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
AggregationContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, precision, context, parent, metadata);
this.source = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
postCollectLastCollector();
MurmurHash3Values hashValues = source.isFloatingPoint()
? MurmurHash3Values.hash(source.doubleValues(ctx))
: MurmurHash3Values.hash(source.longValues(ctx));
return new DirectCollector(counts, hashValues);
}
}

View file

@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.search.aggregations.metrics;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedSetDocValues;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import java.io.IOException;
import java.util.Map;
/**
* This aggregator uses a heuristic to decide between direct collection and using segment ordinals, based on the expected memory overhead
* of the ordinals approach.
*/
public class SegmentOrdinalsCardinalityAggregator extends CardinalityAggregator {
private final ValuesSource.Bytes.WithOrdinals source;
public SegmentOrdinalsCardinalityAggregator(
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
AggregationContext context,
Aggregator parent,
Map<String, Object> metadata
) throws IOException {
super(name, valuesSourceConfig, precision, context, parent, metadata);
source = (ValuesSource.Bytes.WithOrdinals) valuesSourceConfig.getValuesSource();
}
@Override
public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
postCollectLastCollector();
final SortedSetDocValues ordinalValues = source.ordinalsValues(ctx);
final long maxOrd = ordinalValues.getValueCount();
if (maxOrd == 0) {
emptyCollectorsUsed++;
return new EmptyCollector();
}
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
// only use ordinals if they don't increase memory usage by more than 25%
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
ordinalsCollectorsUsed++;
return new OrdinalsCollector(counts, ordinalValues, bigArrays());
}
ordinalsCollectorsOverheadTooHigh++;
stringHashingCollectorsUsed++;
return new DirectCollector(counts, MurmurHash3Values.hash(source.bytesValues(ctx)));
}
}

View file

@ -25,8 +25,8 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoHashGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.BytesCardinalityAggregator;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.CardinalityAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.CardinalityAggregator;
import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.GeoBoundsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.GeoCentroidAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder; import org.elasticsearch.search.aggregations.metrics.ValueCountAggregationBuilder;
@ -322,7 +322,12 @@ public class SpatialPlugin extends Plugin implements ActionPlugin, MapperPlugin,
} }
private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) { private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(CardinalityAggregationBuilder.REGISTRY_KEY, GeoShapeValuesSourceType.instance(), CardinalityAggregator::new, true); builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY,
GeoShapeValuesSourceType.instance(),
BytesCardinalityAggregator::new,
true
);
} }
private <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser, LicensedFeature.Momentary feature) { private <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser, LicensedFeature.Momentary feature) {