Add ability to select execution mode for cardinality aggregation (#87704)

Plumbs through a new parameter for the cardinality aggregation, to allow configuring the execution mode.  This can have significant impacts on speed and memory usage.  This PR exposes three collection modes and two heuristics that we can tune going forward.  All of these are treated as hints and can be silently ignored, e.g. if not applicable to the given field type.  I've change the default behavior to optimize for time, which potentially uses more memory.  Users can override this for the old behavior if needed.
This commit is contained in:
Mark Tozzi 2022-07-05 09:11:22 -04:00 committed by GitHub
parent 8779330638
commit 9ee6a19187
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 344 additions and 17 deletions

View file

@ -237,3 +237,24 @@ POST /sales/_search?size=0
--------------------------------------------------
// TEST[setup:sales]
<1> Documents without a value in the `tag` field will fall into the same bucket as documents that have the value `N/A`.
==== Execution Hint
There are different mechanisms by which cardinality aggregations can be executed:
- by using field values directly (`direct`)
- by using global ordinals of the field and resolving those values after
finishing a shard (`global_ordinals`)
- by using segment ordinal values and resolving those values after each
segment (`segment_ordinals`)
Additionally, there are two "heuristic based" modes. These modes will cause
Elasticsearch to use some data about the state of the index to choose an
appropriate execution method. The two heuristics are:
- `save_time_heuristic` - this is the default in Elasticsearch 8.4 and later.
- `save_memory_heuristic` - this was the default in Elasticsearch 8.3 and
earlier
When not specified, Elasticsearch will apply a heuristic to chose the
appropriate mode. Also note that some data (i.e. non-ordinal fields), `direct`
is the only option, and the hint will be ignored in these cases. Generally
speaking, it should not be necessary to set this value.

View file

@ -286,3 +286,135 @@ setup:
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
---
"profiler string save memory hint":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: save_memory_heuristic
- match: { aggregations.distinct_string.value: 1 }
- match: { profile.shards.0.aggregations.0.type: "GlobalOrdCardinalityAggregator" }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
---
"profiler string save time hint":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: save_time_heuristic
- match: { aggregations.distinct_string.value: 1 }
- match: { profile.shards.0.aggregations.0.type: "GlobalOrdCardinalityAggregator" }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
---
"profiler string global ords hint":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: global_ordinals
- match: { aggregations.distinct_string.value: 1 }
- match: { profile.shards.0.aggregations.0.type: "GlobalOrdCardinalityAggregator" }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
---
"profiler string segment ords hint":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: segment_ordinals
- match: { aggregations.distinct_string.value: 1 }
- match: { profile.shards.0.aggregations.0.type: "CardinalityAggregator" }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- gt: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- match: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
---
"profiler string direct hint":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: direct
- match: { aggregations.distinct_string.value: 1 }
- match: { profile.shards.0.aggregations.0.type: "CardinalityAggregator" }
- gt: { profile.shards.0.aggregations.0.breakdown.initialize: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_leaf_collector: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.collect: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.build_aggregation: 0 }
- gt: { profile.shards.0.aggregations.0.breakdown.post_collection: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_used: 0 }
- match: { profile.shards.0.aggregations.0.debug.ordinals_collectors_overhead_too_high: 0 }
- gt: { profile.shards.0.aggregations.0.debug.string_hashing_collectors_used: 0 }
---
"invalid execution hint throws":
- skip:
version: " - 8.3.99"
reason: execution hints introduced in 8.4.0
- do:
catch: /Invalid execution mode for cardinality aggregation/
search:
body:
profile: true
size: 0
aggs:
distinct_string:
cardinality:
field: string_field
execution_hint: bogus

View file

@ -108,7 +108,7 @@ public class TermsAggregatorFactory extends ValuesSourceAggregatorFactory {
execution = ExecutionMode.fromString(executionHint);
}
// In some cases, using ordinals is just not supported: override it
if (valuesSource instanceof ValuesSource.Bytes.WithOrdinals == false) {
if (valuesSource.hasOrdinals() == false) {
execution = ExecutionMode.MAP;
}
if (execution == null) {

View file

@ -39,6 +39,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
private static final ParseField REHASH = new ParseField("rehash").withAllDeprecated("no replacement - values will always be rehashed");
public static final ParseField PRECISION_THRESHOLD_FIELD = new ParseField("precision_threshold");
public static final ParseField EXECUTION_HINT_FIELD_NAME = new ParseField("execution_hint");
public static final ObjectParser<CardinalityAggregationBuilder, String> PARSER = ObjectParser.fromBuilder(
NAME,
@ -48,6 +49,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
ValuesSourceAggregationBuilder.declareFields(PARSER, true, false, false);
PARSER.declareLong(CardinalityAggregationBuilder::precisionThreshold, CardinalityAggregationBuilder.PRECISION_THRESHOLD_FIELD);
PARSER.declareLong((b, v) -> {/*ignore*/}, REHASH);
PARSER.declareString(CardinalityAggregationBuilder::executionHint, EXECUTION_HINT_FIELD_NAME);
}
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
@ -56,6 +58,8 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
private Long precisionThreshold = null;
private String executionHint = null;
public CardinalityAggregationBuilder(String name) {
super(name);
}
@ -67,6 +71,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
) {
super(clone, factoriesBuilder, metadata);
this.precisionThreshold = clone.precisionThreshold;
this.executionHint = clone.executionHint;
}
@Override
@ -82,6 +87,9 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
if (in.readBoolean()) {
precisionThreshold = in.readLong();
}
if (in.getVersion().onOrAfter(Version.V_8_4_0)) {
executionHint = in.readOptionalString();
}
}
@Override
@ -96,6 +104,9 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
if (hasPrecisionThreshold) {
out.writeLong(precisionThreshold);
}
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
out.writeOptionalString(executionHint);
}
}
@Override
@ -126,6 +137,26 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
return precisionThreshold;
}
/**
* Get the execution hint. This is an optional user specified hint that
* will be used to decide on the specific collection algorithm. Since this
* is a hint, the implementation may choose to ignore it (typically when
* the specified method is not applicable to the given field type)
*/
public String ExecutionHint() {
return executionHint;
}
/**
* Set the execution hint. This is an optional user specified hint that
* will be used to decide on the specific collection algorithm. Since this
* is a hint, the implementation may choose to ignore it (typically when
* the specified method is not applicable to the given field type)
*/
public void executionHint(String executionHint) {
this.executionHint = executionHint;
}
@Override
protected CardinalityAggregatorFactory innerBuild(
AggregationContext context,
@ -139,6 +170,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
name,
config,
precisionThreshold,
executionHint,
context,
parent,
subFactoriesBuilder,
@ -152,12 +184,15 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
if (precisionThreshold != null) {
builder.field(PRECISION_THRESHOLD_FIELD.getPreferredName(), precisionThreshold);
}
if (executionHint != null) {
builder.field(EXECUTION_HINT_FIELD_NAME.getPreferredName(), executionHint);
}
return builder;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), precisionThreshold);
return Objects.hash(super.hashCode(), precisionThreshold, executionHint);
}
@Override
@ -166,7 +201,7 @@ public final class CardinalityAggregationBuilder extends ValuesSourceAggregation
if (obj == null || getClass() != obj.getClass()) return false;
if (super.equals(obj) == false) return false;
CardinalityAggregationBuilder other = (CardinalityAggregationBuilder) obj;
return Objects.equals(precisionThreshold, other.precisionThreshold);
return Objects.equals(precisionThreshold, other.precisionThreshold) && Objects.equals(executionHint, other.executionHint);
}
@Override

View file

@ -43,6 +43,7 @@ import java.util.function.BiConsumer;
public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue {
private final int precision;
private final CardinalityAggregatorFactory.ExecutionMode executionMode;
private final ValuesSource valuesSource;
// Expensive to initialize, so we only initialize it when we have an actual value source
@ -61,6 +62,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
CardinalityAggregatorFactory.ExecutionMode executionMode,
AggregationContext context,
Aggregator parent,
Map<String, Object> metadata
@ -70,6 +72,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
this.valuesSource = valuesSourceConfig.hasValues() ? valuesSourceConfig.getValuesSource() : null;
this.precision = precision;
this.counts = valuesSource == null ? null : new HyperLogLogPlusPlus(precision, context.bigArrays(), 1);
this.executionMode = executionMode;
}
@Override
@ -99,15 +102,16 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
return new EmptyCollector();
}
final long ordinalsMemoryUsage = OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
// only use ordinals if they don't increase memory usage by more than 25%
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
if (executionMode.useSegmentOrdinals(maxOrd, precision)) {
ordinalsCollectorsUsed++;
return new OrdinalsCollector(counts, ordinalValues, bigArrays());
}
if (executionMode.isHeuristicBased()) {
// if we could have used segment ordinals, and it was our heuristic that made the choice not to, increment the counter
ordinalsCollectorsOverheadTooHigh++;
}
}
stringHashingCollectorsUsed++;
return new DirectCollector(counts, MurmurHash3Values.hash(valuesSource.bytesValues(ctx)));
@ -229,7 +233,7 @@ public class CardinalityAggregator extends NumericMetricsAggregator.SingleValue
}
private static class OrdinalsCollector extends Collector {
static class OrdinalsCollector extends Collector {
private static final long SHALLOW_FIXEDBITSET_SIZE = RamUsageEstimator.shallowSizeOfInstance(FixedBitSet.class);

View file

@ -22,17 +22,118 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
public class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
public enum ExecutionMode {
GLOBAL_ORDINALS(false) {
@Override
public boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision) {
return true;
}
@Override
public boolean useSegmentOrdinals(long maxOrd, int precision) {
return false;
}
},
SEGMENT_ORDINALS(false) {
@Override
public boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision) {
return false;
}
@Override
public boolean useSegmentOrdinals(long maxOrd, int precision) {
return true;
}
},
DIRECT(false) {
@Override
public boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision) {
return false;
}
@Override
public boolean useSegmentOrdinals(long maxOrd, int precision) {
return false;
}
},
SAVE_MEMORY_HEURISTIC(true) {
@Override
public boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision)
throws IOException {
return useGlobalOrds(context, source, precision);
}
@Override
public boolean useSegmentOrdinals(long maxOrd, int precision) {
final long ordinalsMemoryUsage = CardinalityAggregator.OrdinalsCollector.memoryOverhead(maxOrd);
final long countsMemoryUsage = HyperLogLogPlusPlus.memoryUsage(precision);
// only use ordinals if they don't increase memory usage by more than 25%
if (ordinalsMemoryUsage < countsMemoryUsage / 4) {
return true;
}
return false;
}
},
SAVE_TIME_HEURISTIC(true) {
@Override
public boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision)
throws IOException {
return useGlobalOrds(context, source, precision);
}
@Override
public boolean useSegmentOrdinals(long maxOrd, int precision) {
// Using segment ordinals is much faster than using the direct collector, even when it uses more memory
return true;
}
};
public static ExecutionMode fromString(String value) {
if (value == null) {
return null;
}
try {
return ExecutionMode.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid execution mode for cardinality aggregation. Got ["
+ value
+ "]"
+ "expected one of [global_ordinal, segment_ordinal, direct]"
);
}
}
boolean isHeuristicBased;
ExecutionMode(boolean isHeuristicBased) {
this.isHeuristicBased = isHeuristicBased;
}
public boolean isHeuristicBased() {
return isHeuristicBased;
}
public abstract boolean useGlobalOrdinals(AggregationContext context, ValuesSource.Bytes.WithOrdinals source, int precision)
throws IOException;
public abstract boolean useSegmentOrdinals(long maxOrd, int precision);
}
private final Long precisionThreshold;
private final CardinalityAggregatorSupplier aggregatorSupplier;
private final ExecutionMode executionMode;
CardinalityAggregatorFactory(
String name,
ValuesSourceConfig config,
Long precisionThreshold,
String executionHint,
AggregationContext context,
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder,
@ -43,17 +144,19 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
this.aggregatorSupplier = aggregatorSupplier;
this.precisionThreshold = precisionThreshold;
// For BWC reasons, the parameter is nullable.
this.executionMode = executionHint == null ? ExecutionMode.SAVE_TIME_HEURISTIC : ExecutionMode.fromString(executionHint);
}
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY,
CoreValuesSourceType.ALL_CORE,
(name, valuesSourceConfig, precision, context, parent, metadata) -> {
(name, valuesSourceConfig, precision, executionMode, context, parent, metadata) -> {
// check global ords
if (valuesSourceConfig.hasValues()) {
if (valuesSourceConfig.getValuesSource()instanceof final ValuesSource.Bytes.WithOrdinals source) {
if (useGlobalOrds(context, source, precision)) {
if (executionMode.useGlobalOrdinals(context, source, precision)) {
final long maxOrd = source.globalMaxOrd(context.searcher());
return new GlobalOrdCardinalityAggregator(
name,
@ -68,7 +171,7 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
}
}
// fallback in the default aggregator
return new CardinalityAggregator(name, valuesSourceConfig, precision, context, parent, metadata);
return new CardinalityAggregator(name, valuesSourceConfig, precision, executionMode, context, parent, metadata);
},
true
);
@ -91,13 +194,13 @@ class CardinalityAggregatorFactory extends ValuesSourceAggregatorFactory {
@Override
protected Aggregator createUnmapped(Aggregator parent, Map<String, Object> metadata) throws IOException {
return new CardinalityAggregator(name, config, precision(), context, parent, metadata);
return new CardinalityAggregator(name, config, precision(), null, context, parent, metadata);
}
@Override
protected Aggregator doCreateInternal(Aggregator parent, CardinalityUpperBound cardinality, Map<String, Object> metadata)
throws IOException {
return aggregatorSupplier.build(name, config, precision(), context, parent, metadata);
return aggregatorSupplier.build(name, config, precision(), executionMode, context, parent, metadata);
}
private int precision() {

View file

@ -20,6 +20,7 @@ public interface CardinalityAggregatorSupplier {
String name,
ValuesSourceConfig valuesSourceConfig,
int precision,
CardinalityAggregatorFactory.ExecutionMode executionMode,
AggregationContext context,
Aggregator parent,
Map<String, Object> metadata

View file

@ -56,6 +56,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
@ -669,12 +670,16 @@ public class CardinalityAggregatorTests extends AggregatorTestCase {
}
private void testAggregation(
AggregationBuilder aggregationBuilder,
CardinalityAggregationBuilder aggregationBuilder,
Query query,
CheckedConsumer<RandomIndexWriter, IOException> buildIndex,
Consumer<InternalCardinality> verify,
MappedFieldType... fieldTypes
) throws IOException {
testCase(aggregationBuilder, query, buildIndex, verify, fieldTypes);
for (CardinalityAggregatorFactory.ExecutionMode mode : CardinalityAggregatorFactory.ExecutionMode.values()) {
aggregationBuilder.executionHint(mode.toString().toLowerCase(Locale.ROOT));
testCase(aggregationBuilder, query, buildIndex, verify, fieldTypes);
}
}
}

View file

@ -10,6 +10,8 @@ package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
import java.util.List;
public class CardinalityTests extends BaseAggregationTestCase<CardinalityAggregationBuilder> {
@Override
@ -20,6 +22,11 @@ public class CardinalityTests extends BaseAggregationTestCase<CardinalityAggrega
if (randomBoolean()) {
factory.missing("MISSING");
}
if (randomBoolean()) {
factory.executionHint(
randomFrom(List.of("segment_ordinals", "global_ordinals", "direct", "save_time_heuristic", "save_memory_heuristic"))
);
}
return factory;
}

View file

@ -324,7 +324,21 @@ public class SpatialPlugin extends Plugin implements ActionPlugin, MapperPlugin,
}
private static void registerCardinalityAggregator(ValuesSourceRegistry.Builder builder) {
builder.register(CardinalityAggregationBuilder.REGISTRY_KEY, GeoShapeValuesSourceType.instance(), CardinalityAggregator::new, true);
builder.register(
CardinalityAggregationBuilder.REGISTRY_KEY,
GeoShapeValuesSourceType.instance(),
(name, valuesSourceConfig, precision, executionMode, context, parent, metadata) -> new CardinalityAggregator(
name,
valuesSourceConfig,
precision,
// Force execution mode to null
null,
context,
parent,
metadata
),
true
);
}
private <T> ContextParser<String, T> checkLicense(ContextParser<String, T> realParser, LicensedFeature.Momentary feature) {

View file

@ -95,6 +95,11 @@ subprojects {
'search.aggregation/20_terms/string profiler via global ordinals native implementation',
'search.aggregation/20_terms/Global ordinals are loaded with the global_ordinals execution hint',
'search.aggregation/170_cardinality_metric/profiler string',
'search.aggregation/170_cardinality_metric/profiler string segment ords hint',
'search.aggregation/170_cardinality_metric/profiler string global ords hint',
'search.aggregation/170_cardinality_metric/profiler string direct hint',
'search.aggregation/170_cardinality_metric/profiler string save memory hint',
'search.aggregation/170_cardinality_metric/profiler string save time hint',
'search.aggregation/235_composite_sorted/*',
// timeseries dimensions can't be runtime fields
'search.aggregation/450_time_series/*',