mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
[ES|QL] ToAggregateMetricDouble function (#124595)
This commit adds a conversion function from numerics (and aggregate metric doubles) to aggregate metric doubles. It is most useful when you have multiple indices, where one index uses aggregate metric double (e.g. a downsampled index) and another uses a normal numeric type like long or double (e.g. an index prior to downsampling).
This commit is contained in:
parent
3e2cdc774b
commit
08ae54e423
23 changed files with 957 additions and 8 deletions
5
docs/changelog/124595.yaml
Normal file
5
docs/changelog/124595.yaml
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
pr: 124595
|
||||||
|
summary: '`ToAggregateMetricDouble` function'
|
||||||
|
area: "ES|QL"
|
||||||
|
type: enhancement
|
||||||
|
issues: []
|
|
@ -0,0 +1,6 @@
|
||||||
|
% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
|
||||||
|
|
||||||
|
**Description**
|
||||||
|
|
||||||
|
Encode a numeric to an aggregate_metric_double.
|
||||||
|
|
20
docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md
generated
Normal file
20
docs/reference/query-languages/esql/_snippets/functions/layout/to_aggregate_metric_double.md
generated
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
|
||||||
|
|
||||||
|
## `TO_AGGREGATE_METRIC_DOUBLE` [esql-to_aggregate_metric_double]
|
||||||
|
|
||||||
|
**Syntax**
|
||||||
|
|
||||||
|
:::{image} ../../../images/functions/to_aggregate_metric_double.svg
|
||||||
|
:alt: Embedded
|
||||||
|
:class: text-center
|
||||||
|
:::
|
||||||
|
|
||||||
|
|
||||||
|
:::{include} ../parameters/to_aggregate_metric_double.md
|
||||||
|
:::
|
||||||
|
|
||||||
|
:::{include} ../description/to_aggregate_metric_double.md
|
||||||
|
:::
|
||||||
|
|
||||||
|
:::{include} ../types/to_aggregate_metric_double.md
|
||||||
|
:::
|
7
docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md
generated
Normal file
7
docs/reference/query-languages/esql/_snippets/functions/parameters/to_aggregate_metric_double.md
generated
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
|
||||||
|
|
||||||
|
**Parameters**
|
||||||
|
|
||||||
|
`number`
|
||||||
|
: Input value. The input can be a single-valued column or an expression.
|
||||||
|
|
8
docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md
generated
Normal file
8
docs/reference/query-languages/esql/_snippets/functions/types/to_aggregate_metric_double.md
generated
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
% This is generated by ESQL's AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
|
||||||
|
|
||||||
|
**Supported types**
|
||||||
|
|
||||||
|
| number | result |
|
||||||
|
| --- | --- |
|
||||||
|
aggregate_metric_double
|
||||||
|
|
1
docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg
generated
Normal file
1
docs/reference/query-languages/esql/images/functions/to_aggregate_metric_double.svg
generated
Normal file
|
@ -0,0 +1 @@
|
||||||
|
<svg version="1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg" width="528" height="46" viewbox="0 0 528 46"><defs><style type="text/css">.c{fill:none;stroke:#222222;}.k{fill:#000000;font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;font-size:20px;}.s{fill:#e4f4ff;stroke:#222222;}.syn{fill:#8D8D8D;font-family: ui-monospace, SFMono-Regular, Menlo, Monaco, Consolas, "Liberation Mono", "Courier New", monospace;font-size:20px;}</style></defs><path class="c" d="M0 31h5m332 0h10m32 0h10m92 0h10m32 0h5"/><rect class="s" x="5" y="5" width="332" height="36"/><text class="k" x="15" y="31">TO_AGGREGATE_METRIC_DOUBLE</text><rect class="s" x="347" y="5" width="32" height="36" rx="7"/><text class="syn" x="357" y="31">(</text><rect class="s" x="389" y="5" width="92" height="36" rx="7"/><text class="k" x="399" y="31">number</text><rect class="s" x="491" y="5" width="32" height="36" rx="7"/><text class="syn" x="501" y="31">)</text></svg>
|
After Width: | Height: | Size: 1 KiB |
9
docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json
generated
Normal file
9
docs/reference/query-languages/esql/kibana/definition/functions/to_aggregate_metric_double.json
generated
Normal file
|
@ -0,0 +1,9 @@
|
||||||
|
{
|
||||||
|
"comment" : "This is generated by ESQL’s AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.",
|
||||||
|
"type" : "scalar",
|
||||||
|
"name" : "to_aggregate_metric_double",
|
||||||
|
"description" : "Encode a numeric to an aggregate_metric_double.",
|
||||||
|
"signatures" : [ ],
|
||||||
|
"preview" : false,
|
||||||
|
"snapshot_only" : false
|
||||||
|
}
|
7
docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md
generated
Normal file
7
docs/reference/query-languages/esql/kibana/docs/functions/to_aggregate_metric_double.md
generated
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
<!--
|
||||||
|
This is generated by ESQL’s AbstractFunctionTestCase. Do no edit it. See ../README.md for how to regenerate it.
|
||||||
|
-->
|
||||||
|
|
||||||
|
### TO_AGGREGATE_METRIC_DOUBLE
|
||||||
|
Encode a numeric to an aggregate_metric_double.
|
||||||
|
|
|
@ -192,6 +192,7 @@ public class TransportVersions {
|
||||||
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA = def(9_032_0_00);
|
public static final TransportVersion INFERENCE_MODEL_REGISTRY_METADATA = def(9_032_0_00);
|
||||||
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
|
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
|
||||||
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
|
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
|
||||||
|
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* STOP! READ THIS FIRST! No, really,
|
* STOP! READ THIS FIRST! No, really,
|
||||||
|
|
|
@ -557,7 +557,6 @@ public enum DataType {
|
||||||
&& t != SOURCE
|
&& t != SOURCE
|
||||||
&& t != HALF_FLOAT
|
&& t != HALF_FLOAT
|
||||||
&& t != PARTIAL_AGG
|
&& t != PARTIAL_AGG
|
||||||
&& t != AGGREGATE_METRIC_DOUBLE
|
|
||||||
&& t.isCounter() == false;
|
&& t.isCounter() == false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7,9 +7,17 @@
|
||||||
|
|
||||||
package org.elasticsearch.compute.data;
|
package org.elasticsearch.compute.data;
|
||||||
|
|
||||||
|
import org.elasticsearch.TransportVersion;
|
||||||
|
import org.elasticsearch.TransportVersions;
|
||||||
|
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
|
||||||
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.core.Releasables;
|
import org.elasticsearch.core.Releasables;
|
||||||
import org.elasticsearch.index.mapper.BlockLoader;
|
import org.elasticsearch.index.mapper.BlockLoader;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
|
public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder implements BlockLoader.AggregateMetricDoubleBuilder {
|
||||||
|
|
||||||
private DoubleBlockBuilder minBuilder;
|
private DoubleBlockBuilder minBuilder;
|
||||||
|
@ -161,11 +169,40 @@ public class AggregateMetricDoubleBlockBuilder extends AbstractBlockBuilder impl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) {
|
public record AggregateMetricDoubleLiteral(Double min, Double max, Double sum, Integer count) implements GenericNamedWriteable {
|
||||||
public AggregateMetricDoubleLiteral {
|
public AggregateMetricDoubleLiteral {
|
||||||
min = min.isNaN() ? null : min;
|
min = min.isNaN() ? null : min;
|
||||||
max = max.isNaN() ? null : max;
|
max = max.isNaN() ? null : max;
|
||||||
sum = sum.isNaN() ? null : sum;
|
sum = sum.isNaN() ? null : sum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||||
|
GenericNamedWriteable.class,
|
||||||
|
"AggregateMetricDoubleLiteral",
|
||||||
|
AggregateMetricDoubleLiteral::new
|
||||||
|
);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return "AggregateMetricDoubleLiteral";
|
||||||
|
}
|
||||||
|
|
||||||
|
public AggregateMetricDoubleLiteral(StreamInput input) throws IOException {
|
||||||
|
this(input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalDouble(), input.readOptionalInt());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeOptionalDouble(min);
|
||||||
|
out.writeOptionalDouble(max);
|
||||||
|
out.writeOptionalDouble(sum);
|
||||||
|
out.writeOptionalInt(count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TransportVersion getMinimalSupportedVersion() {
|
||||||
|
return TransportVersions.ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -285,7 +285,19 @@ public final class BlockUtils {
|
||||||
DocVector v = ((DocBlock) block).asVector();
|
DocVector v = ((DocBlock) block).asVector();
|
||||||
yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset));
|
yield new Doc(v.shards().getInt(offset), v.segments().getInt(offset), v.docs().getInt(offset));
|
||||||
}
|
}
|
||||||
case COMPOSITE -> throw new IllegalArgumentException("can't read values from composite blocks");
|
case COMPOSITE -> {
|
||||||
|
CompositeBlock compositeBlock = (CompositeBlock) block;
|
||||||
|
var minBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex());
|
||||||
|
var maxBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex());
|
||||||
|
var sumBlock = (DoubleBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex());
|
||||||
|
var countBlock = (IntBlock) compositeBlock.getBlock(AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex());
|
||||||
|
yield new AggregateMetricDoubleLiteral(
|
||||||
|
minBlock.getDouble(offset),
|
||||||
|
maxBlock.getDouble(offset),
|
||||||
|
sumBlock.getDouble(offset),
|
||||||
|
countBlock.getInt(offset)
|
||||||
|
);
|
||||||
|
}
|
||||||
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
|
case UNKNOWN -> throw new IllegalArgumentException("can't read values from [" + block + "]");
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.regex.Regex;
|
import org.elasticsearch.common.regex.Regex;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.BigArrays;
|
import org.elasticsearch.common.util.BigArrays;
|
||||||
|
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
|
||||||
import org.elasticsearch.compute.data.BlockFactory;
|
import org.elasticsearch.compute.data.BlockFactory;
|
||||||
import org.elasticsearch.compute.data.BlockUtils;
|
import org.elasticsearch.compute.data.BlockUtils;
|
||||||
import org.elasticsearch.compute.data.BytesRefBlock;
|
import org.elasticsearch.compute.data.BytesRefBlock;
|
||||||
|
@ -788,6 +789,12 @@ public final class EsqlTestUtils {
|
||||||
case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint());
|
case CARTESIAN_POINT -> CARTESIAN.asWkb(ShapeTestUtils.randomPoint());
|
||||||
case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()));
|
case GEO_SHAPE -> GEO.asWkb(GeometryTestUtils.randomGeometry(randomBoolean()));
|
||||||
case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()));
|
case CARTESIAN_SHAPE -> CARTESIAN.asWkb(ShapeTestUtils.randomGeometry(randomBoolean()));
|
||||||
|
case AGGREGATE_METRIC_DOUBLE -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
|
||||||
|
randomDouble(),
|
||||||
|
randomDouble(),
|
||||||
|
randomDouble(),
|
||||||
|
randomInt()
|
||||||
|
);
|
||||||
case NULL -> null;
|
case NULL -> null;
|
||||||
case SOURCE -> {
|
case SOURCE -> {
|
||||||
try {
|
try {
|
||||||
|
@ -798,8 +805,9 @@ public final class EsqlTestUtils {
|
||||||
throw new UncheckedIOException(e);
|
throw new UncheckedIOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG, AGGREGATE_METRIC_DOUBLE ->
|
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE, TSID_DATA_TYPE, PARTIAL_AGG -> throw new IllegalArgumentException(
|
||||||
throw new IllegalArgumentException("can't make random values for [" + type.typeName() + "]");
|
"can't make random values for [" + type.typeName() + "]"
|
||||||
|
);
|
||||||
}, type);
|
}, type);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -896,7 +896,12 @@ public class EsqlCapabilities {
|
||||||
/**
|
/**
|
||||||
* Non full text functions do not contribute to score
|
* Non full text functions do not contribute to score
|
||||||
*/
|
*/
|
||||||
NON_FULL_TEXT_FUNCTIONS_SCORING;
|
NON_FULL_TEXT_FUNCTIONS_SCORING,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Support for to_aggregate_metric_double function
|
||||||
|
*/
|
||||||
|
AGGREGATE_METRIC_DOUBLE_CONVERT_TO(AGGREGATE_METRIC_DOUBLE_FEATURE_FLAG);
|
||||||
|
|
||||||
private final boolean enabled;
|
private final boolean enabled;
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateWrita
|
||||||
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextWritables;
|
import org.elasticsearch.xpack.esql.expression.function.fulltext.FullTextWritables;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.ScalarFunctionWritables;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.ScalarFunctionWritables;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
|
||||||
|
@ -180,6 +181,7 @@ public class ExpressionWritables {
|
||||||
entries.add(StY.ENTRY);
|
entries.add(StY.ENTRY);
|
||||||
entries.add(Tan.ENTRY);
|
entries.add(Tan.ENTRY);
|
||||||
entries.add(Tanh.ENTRY);
|
entries.add(Tanh.ENTRY);
|
||||||
|
entries.add(ToAggregateMetricDouble.ENTRY);
|
||||||
entries.add(ToBase64.ENTRY);
|
entries.add(ToBase64.ENTRY);
|
||||||
entries.add(ToBoolean.ENTRY);
|
entries.add(ToBoolean.ENTRY);
|
||||||
entries.add(ToCartesianPoint.ENTRY);
|
entries.add(ToCartesianPoint.ENTRY);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Case;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Greatest;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.conditional.Least;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromBase64;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToAggregateMetricDouble;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBase64;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToBoolean;
|
||||||
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
|
import org.elasticsearch.xpack.esql.expression.function.scalar.convert.ToCartesianPoint;
|
||||||
|
@ -376,6 +377,7 @@ public class EsqlFunctionRegistry {
|
||||||
// conversion functions
|
// conversion functions
|
||||||
new FunctionDefinition[] {
|
new FunctionDefinition[] {
|
||||||
def(FromBase64.class, FromBase64::new, "from_base64"),
|
def(FromBase64.class, FromBase64::new, "from_base64"),
|
||||||
|
def(ToAggregateMetricDouble.class, ToAggregateMetricDouble::new, "to_aggregate_metric_double", "to_aggregatemetricdouble"),
|
||||||
def(ToBase64.class, ToBase64::new, "to_base64"),
|
def(ToBase64.class, ToBase64::new, "to_base64"),
|
||||||
def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"),
|
def(ToBoolean.class, ToBoolean::new, "to_boolean", "to_bool"),
|
||||||
def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"),
|
def(ToCartesianPoint.class, ToCartesianPoint::new, "to_cartesianpoint"),
|
||||||
|
|
|
@ -0,0 +1,563 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||||
|
* 2.0.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
|
||||||
|
import org.elasticsearch.compute.data.Block;
|
||||||
|
import org.elasticsearch.compute.data.BlockFactory;
|
||||||
|
import org.elasticsearch.compute.data.CompositeBlock;
|
||||||
|
import org.elasticsearch.compute.data.DoubleBlock;
|
||||||
|
import org.elasticsearch.compute.data.DoubleVector;
|
||||||
|
import org.elasticsearch.compute.data.IntBlock;
|
||||||
|
import org.elasticsearch.compute.data.IntVector;
|
||||||
|
import org.elasticsearch.compute.data.LongBlock;
|
||||||
|
import org.elasticsearch.compute.data.LongVector;
|
||||||
|
import org.elasticsearch.compute.data.Page;
|
||||||
|
import org.elasticsearch.compute.data.Vector;
|
||||||
|
import org.elasticsearch.compute.operator.DriverContext;
|
||||||
|
import org.elasticsearch.compute.operator.EvalOperator;
|
||||||
|
import org.elasticsearch.core.Releasable;
|
||||||
|
import org.elasticsearch.core.Releasables;
|
||||||
|
import org.elasticsearch.search.aggregations.metrics.CompensatedSum;
|
||||||
|
import org.elasticsearch.xpack.esql.core.expression.Expression;
|
||||||
|
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
|
||||||
|
import org.elasticsearch.xpack.esql.core.tree.Source;
|
||||||
|
import org.elasticsearch.xpack.esql.core.type.DataType;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.Param;
|
||||||
|
import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.type.DataType.DOUBLE;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.type.DataType.INTEGER;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.type.DataType.LONG;
|
||||||
|
import static org.elasticsearch.xpack.esql.core.type.DataType.UNSIGNED_LONG;
|
||||||
|
|
||||||
|
public class ToAggregateMetricDouble extends AbstractConvertFunction {
|
||||||
|
|
||||||
|
private static final Map<DataType, AbstractConvertFunction.BuildFactory> EVALUATORS = Map.ofEntries(
|
||||||
|
Map.entry(AGGREGATE_METRIC_DOUBLE, (source, fieldEval) -> fieldEval),
|
||||||
|
Map.entry(DOUBLE, DoubleFactory::new),
|
||||||
|
Map.entry(INTEGER, IntFactory::new),
|
||||||
|
Map.entry(LONG, LongFactory::new),
|
||||||
|
Map.entry(UNSIGNED_LONG, UnsignedLongFactory::new)
|
||||||
|
);
|
||||||
|
|
||||||
|
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||||
|
Expression.class,
|
||||||
|
"ToAggregateMetricDouble",
|
||||||
|
ToAggregateMetricDouble::new
|
||||||
|
);
|
||||||
|
|
||||||
|
@FunctionInfo(returnType = "aggregate_metric_double", description = "Encode a numeric to an aggregate_metric_double.")
|
||||||
|
public ToAggregateMetricDouble(
|
||||||
|
Source source,
|
||||||
|
@Param(
|
||||||
|
name = "number",
|
||||||
|
type = { "double", "long", "unsigned_long", "integer", "aggregate_metric_double" },
|
||||||
|
description = "Input value. The input can be a single-valued column or an expression."
|
||||||
|
) Expression field
|
||||||
|
) {
|
||||||
|
super(source, field);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ToAggregateMetricDouble(StreamInput in) throws IOException {
|
||||||
|
super(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getWriteableName() {
|
||||||
|
return ENTRY.name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TypeResolution resolveType() {
|
||||||
|
if (childrenResolved() == false) {
|
||||||
|
return new TypeResolution("Unresolved children");
|
||||||
|
}
|
||||||
|
return isType(
|
||||||
|
field,
|
||||||
|
dt -> dt == DataType.AGGREGATE_METRIC_DOUBLE || dt == DataType.DOUBLE || dt == LONG || dt == INTEGER || dt == UNSIGNED_LONG,
|
||||||
|
sourceText(),
|
||||||
|
DEFAULT,
|
||||||
|
"numeric or aggregate_metric_double"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public DataType dataType() {
|
||||||
|
return AGGREGATE_METRIC_DOUBLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Expression replaceChildren(List<Expression> newChildren) {
|
||||||
|
return new ToAggregateMetricDouble(source(), newChildren.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NodeInfo<? extends Expression> info() {
|
||||||
|
return NodeInfo.create(this, ToAggregateMetricDouble::new, field);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Map<DataType, AbstractConvertFunction.BuildFactory> factories() {
|
||||||
|
return EVALUATORS;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class AggregateMetricDoubleVectorBuilder implements Releasable {
|
||||||
|
private final DoubleVector.FixedBuilder valuesBuilder;
|
||||||
|
private final BlockFactory blockFactory;
|
||||||
|
|
||||||
|
private AggregateMetricDoubleVectorBuilder(int estimatedSize, BlockFactory blockFactory) {
|
||||||
|
this.blockFactory = blockFactory;
|
||||||
|
this.valuesBuilder = blockFactory.newDoubleVectorFixedBuilder(estimatedSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void appendValue(double value) {
|
||||||
|
valuesBuilder.appendDouble(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block build() {
|
||||||
|
Block[] blocks = new Block[4];
|
||||||
|
Block block;
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
block = valuesBuilder.build().asBlock();
|
||||||
|
blocks[AggregateMetricDoubleBlockBuilder.Metric.MIN.getIndex()] = block;
|
||||||
|
blocks[AggregateMetricDoubleBlockBuilder.Metric.MAX.getIndex()] = block;
|
||||||
|
block.incRef();
|
||||||
|
blocks[AggregateMetricDoubleBlockBuilder.Metric.SUM.getIndex()] = block;
|
||||||
|
block.incRef();
|
||||||
|
blocks[AggregateMetricDoubleBlockBuilder.Metric.COUNT.getIndex()] = blockFactory.newConstantIntBlockWith(
|
||||||
|
1,
|
||||||
|
block.getPositionCount()
|
||||||
|
);
|
||||||
|
CompositeBlock compositeBlock = new CompositeBlock(blocks);
|
||||||
|
success = true;
|
||||||
|
return compositeBlock;
|
||||||
|
} finally {
|
||||||
|
if (success == false) {
|
||||||
|
Releasables.closeExpectNoException(blocks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
Releasables.closeExpectNoException(valuesBuilder);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DoubleFactory implements EvalOperator.ExpressionEvaluator.Factory {
|
||||||
|
private final Source source;
|
||||||
|
|
||||||
|
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
|
||||||
|
|
||||||
|
public DoubleFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
|
||||||
|
this.fieldEvaluator = fieldEvaluator;
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromDoubleEvaluator[" + "field=" + fieldEvaluator + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
|
||||||
|
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
|
||||||
|
|
||||||
|
return new EvalOperator.ExpressionEvaluator() {
|
||||||
|
private Block evalBlock(Block block) {
|
||||||
|
int positionCount = block.getPositionCount();
|
||||||
|
DoubleBlock doubleBlock = (DoubleBlock) block;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
|
||||||
|
.newAggregateMetricDoubleBlockBuilder(positionCount)
|
||||||
|
) {
|
||||||
|
CompensatedSum compensatedSum = new CompensatedSum();
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
int valueCount = doubleBlock.getValueCount(p);
|
||||||
|
if (valueCount == 0) {
|
||||||
|
builder.appendNull();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int start = doubleBlock.getFirstValueIndex(p);
|
||||||
|
int end = start + valueCount;
|
||||||
|
if (valueCount == 1) {
|
||||||
|
double current = doubleBlock.getDouble(start);
|
||||||
|
builder.min().appendDouble(current);
|
||||||
|
builder.max().appendDouble(current);
|
||||||
|
builder.sum().appendDouble(current);
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
double min = Double.POSITIVE_INFINITY;
|
||||||
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
double current = doubleBlock.getDouble(i);
|
||||||
|
min = Math.min(min, current);
|
||||||
|
max = Math.max(max, current);
|
||||||
|
compensatedSum.add(current);
|
||||||
|
}
|
||||||
|
builder.min().appendDouble(min);
|
||||||
|
builder.max().appendDouble(max);
|
||||||
|
builder.sum().appendDouble(compensatedSum.value());
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
compensatedSum.reset(0, 0);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block evalVector(Vector vector) {
|
||||||
|
int positionCount = vector.getPositionCount();
|
||||||
|
DoubleVector doubleVector = (DoubleVector) vector;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
|
||||||
|
positionCount,
|
||||||
|
context.blockFactory()
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
double value = doubleVector.getDouble(p);
|
||||||
|
builder.appendValue(value);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Block eval(Page page) {
|
||||||
|
try (Block block = eval.eval(page)) {
|
||||||
|
Vector vector = block.asVector();
|
||||||
|
return vector == null ? evalBlock(block) : evalVector(vector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
Releasables.closeExpectNoException(eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromDoubleEvaluator[field=" + eval + "]";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class IntFactory implements EvalOperator.ExpressionEvaluator.Factory {
|
||||||
|
private final Source source;
|
||||||
|
|
||||||
|
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
|
||||||
|
|
||||||
|
public IntFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
|
||||||
|
this.fieldEvaluator = fieldEvaluator;
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromIntEvaluator[" + "field=" + fieldEvaluator + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
|
||||||
|
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
|
||||||
|
|
||||||
|
return new EvalOperator.ExpressionEvaluator() {
|
||||||
|
@Override
|
||||||
|
public Block eval(Page page) {
|
||||||
|
try (Block block = eval.eval(page)) {
|
||||||
|
Vector vector = block.asVector();
|
||||||
|
return vector == null ? evalBlock(block) : evalVector(vector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block evalBlock(Block block) {
|
||||||
|
int positionCount = block.getPositionCount();
|
||||||
|
IntBlock intBlock = (IntBlock) block;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
|
||||||
|
.newAggregateMetricDoubleBlockBuilder(positionCount)
|
||||||
|
) {
|
||||||
|
CompensatedSum sum = new CompensatedSum();
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
int valueCount = intBlock.getValueCount(p);
|
||||||
|
int start = intBlock.getFirstValueIndex(p);
|
||||||
|
int end = start + valueCount;
|
||||||
|
if (valueCount == 0) {
|
||||||
|
builder.appendNull();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (valueCount == 1) {
|
||||||
|
double current = intBlock.getInt(start);
|
||||||
|
builder.min().appendDouble(current);
|
||||||
|
builder.max().appendDouble(current);
|
||||||
|
builder.sum().appendDouble(current);
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
double min = Double.POSITIVE_INFINITY;
|
||||||
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
double current = intBlock.getInt(i);
|
||||||
|
min = Math.min(min, current);
|
||||||
|
max = Math.max(max, current);
|
||||||
|
sum.add(current);
|
||||||
|
}
|
||||||
|
builder.min().appendDouble(min);
|
||||||
|
builder.max().appendDouble(max);
|
||||||
|
builder.sum().appendDouble(sum.value());
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
sum.reset(0, 0);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block evalVector(Vector vector) {
|
||||||
|
int positionCount = vector.getPositionCount();
|
||||||
|
IntVector intVector = (IntVector) vector;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
|
||||||
|
positionCount,
|
||||||
|
context.blockFactory()
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
double value = intVector.getInt(p);
|
||||||
|
builder.appendValue(value);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
Releasables.closeExpectNoException(eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromIntEvaluator[field=" + eval + "]";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class LongFactory implements EvalOperator.ExpressionEvaluator.Factory {
|
||||||
|
private final Source source;
|
||||||
|
|
||||||
|
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
|
||||||
|
|
||||||
|
public LongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
|
||||||
|
this.fieldEvaluator = fieldEvaluator;
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromLongEvaluator[" + "field=" + fieldEvaluator + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
|
||||||
|
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
|
||||||
|
|
||||||
|
return new EvalOperator.ExpressionEvaluator() {
|
||||||
|
private Block evalBlock(Block block) {
|
||||||
|
int positionCount = block.getPositionCount();
|
||||||
|
LongBlock longBlock = (LongBlock) block;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
|
||||||
|
.newAggregateMetricDoubleBlockBuilder(positionCount)
|
||||||
|
) {
|
||||||
|
CompensatedSum sum = new CompensatedSum();
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
int valueCount = longBlock.getValueCount(p);
|
||||||
|
int start = longBlock.getFirstValueIndex(p);
|
||||||
|
int end = start + valueCount;
|
||||||
|
if (valueCount == 0) {
|
||||||
|
builder.appendNull();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (valueCount == 1) {
|
||||||
|
double current = longBlock.getLong(start);
|
||||||
|
builder.min().appendDouble(current);
|
||||||
|
builder.max().appendDouble(current);
|
||||||
|
builder.sum().appendDouble(current);
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
double min = Double.POSITIVE_INFINITY;
|
||||||
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
double current = longBlock.getLong(i);
|
||||||
|
min = Math.min(min, current);
|
||||||
|
max = Math.max(max, current);
|
||||||
|
sum.add(current);
|
||||||
|
}
|
||||||
|
builder.min().appendDouble(min);
|
||||||
|
builder.max().appendDouble(max);
|
||||||
|
builder.sum().appendDouble(sum.value());
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
sum.reset(0, 0);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block evalVector(Vector vector) {
|
||||||
|
int positionCount = vector.getPositionCount();
|
||||||
|
LongVector longVector = (LongVector) vector;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
|
||||||
|
positionCount,
|
||||||
|
context.blockFactory()
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
double value = longVector.getLong(p);
|
||||||
|
builder.appendValue(value);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Block eval(Page page) {
|
||||||
|
try (Block block = eval.eval(page)) {
|
||||||
|
Vector vector = block.asVector();
|
||||||
|
return vector == null ? evalBlock(block) : evalVector(vector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
Releasables.closeExpectNoException(eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromLongEvaluator[field=" + eval + "]";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class UnsignedLongFactory implements EvalOperator.ExpressionEvaluator.Factory {
|
||||||
|
private final Source source;
|
||||||
|
|
||||||
|
private final EvalOperator.ExpressionEvaluator.Factory fieldEvaluator;
|
||||||
|
|
||||||
|
public UnsignedLongFactory(Source source, EvalOperator.ExpressionEvaluator.Factory fieldEvaluator) {
|
||||||
|
this.fieldEvaluator = fieldEvaluator;
|
||||||
|
this.source = source;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[" + "field=" + fieldEvaluator + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public EvalOperator.ExpressionEvaluator get(DriverContext context) {
|
||||||
|
final EvalOperator.ExpressionEvaluator eval = fieldEvaluator.get(context);
|
||||||
|
|
||||||
|
return new EvalOperator.ExpressionEvaluator() {
|
||||||
|
private Block evalBlock(Block block) {
|
||||||
|
int positionCount = block.getPositionCount();
|
||||||
|
LongBlock longBlock = (LongBlock) block;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleBlockBuilder builder = context.blockFactory()
|
||||||
|
.newAggregateMetricDoubleBlockBuilder(positionCount)
|
||||||
|
) {
|
||||||
|
CompensatedSum sum = new CompensatedSum();
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
int valueCount = longBlock.getValueCount(p);
|
||||||
|
int start = longBlock.getFirstValueIndex(p);
|
||||||
|
int end = start + valueCount;
|
||||||
|
if (valueCount == 0) {
|
||||||
|
builder.appendNull();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (valueCount == 1) {
|
||||||
|
double current = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p));
|
||||||
|
builder.min().appendDouble(current);
|
||||||
|
builder.max().appendDouble(current);
|
||||||
|
builder.sum().appendDouble(current);
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
double min = Double.POSITIVE_INFINITY;
|
||||||
|
double max = Double.NEGATIVE_INFINITY;
|
||||||
|
for (int i = start; i < end; i++) {
|
||||||
|
double current = EsqlDataTypeConverter.unsignedLongToDouble(longBlock.getLong(p));
|
||||||
|
min = Math.min(min, current);
|
||||||
|
max = Math.max(max, current);
|
||||||
|
sum.add(current);
|
||||||
|
}
|
||||||
|
builder.min().appendDouble(min);
|
||||||
|
builder.max().appendDouble(max);
|
||||||
|
builder.sum().appendDouble(sum.value());
|
||||||
|
builder.count().appendInt(valueCount);
|
||||||
|
sum.reset(0, 0);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Block evalVector(Vector vector) {
|
||||||
|
int positionCount = vector.getPositionCount();
|
||||||
|
LongVector longVector = (LongVector) vector;
|
||||||
|
try (
|
||||||
|
AggregateMetricDoubleVectorBuilder builder = new AggregateMetricDoubleVectorBuilder(
|
||||||
|
positionCount,
|
||||||
|
context.blockFactory()
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
for (int p = 0; p < positionCount; p++) {
|
||||||
|
double value = EsqlDataTypeConverter.unsignedLongToDouble(longVector.getLong(p));
|
||||||
|
builder.appendValue(value);
|
||||||
|
}
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Block eval(Page page) {
|
||||||
|
try (Block block = eval.eval(page)) {
|
||||||
|
Vector vector = block.asVector();
|
||||||
|
return vector == null ? evalBlock(block) : evalVector(vector);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
Releasables.closeExpectNoException(eval);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ToAggregateMetricDoubleFromUnsignedLongEvaluator[field=" + eval + "]";
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,9 +10,11 @@ package org.elasticsearch.xpack.esql;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
import org.elasticsearch.common.io.stream.ByteBufferStreamInput;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
|
||||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||||
import org.elasticsearch.index.query.ExistsQueryBuilder;
|
import org.elasticsearch.index.query.ExistsQueryBuilder;
|
||||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||||
|
@ -112,6 +114,13 @@ public class SerializationTestUtils {
|
||||||
entries.add(SingleValueQuery.ENTRY);
|
entries.add(SingleValueQuery.ENTRY);
|
||||||
entries.addAll(ExpressionWritables.getNamedWriteables());
|
entries.addAll(ExpressionWritables.getNamedWriteables());
|
||||||
entries.addAll(PlanWritables.getNamedWriteables());
|
entries.addAll(PlanWritables.getNamedWriteables());
|
||||||
|
entries.add(
|
||||||
|
new NamedWriteableRegistry.Entry(
|
||||||
|
GenericNamedWriteable.class,
|
||||||
|
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY.name,
|
||||||
|
AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new
|
||||||
|
)
|
||||||
|
);
|
||||||
return new NamedWriteableRegistry(entries);
|
return new NamedWriteableRegistry(entries);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,91 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||||
|
* 2.0.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.xpack.esql.expression.function.scalar.convert;
|
||||||
|
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||||
|
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||||
|
|
||||||
|
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
|
||||||
|
import org.elasticsearch.xpack.esql.core.expression.Expression;
|
||||||
|
import org.elasticsearch.xpack.esql.core.tree.Source;
|
||||||
|
import org.elasticsearch.xpack.esql.core.type.DataType;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTestCase;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.FunctionName;
|
||||||
|
import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
|
||||||
|
|
||||||
|
import java.math.BigInteger;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import static java.util.Collections.emptyList;
|
||||||
|
|
||||||
|
@FunctionName("to_aggregate_metric_double")
|
||||||
|
public class ToAggregateMetricDoubleTests extends AbstractScalarFunctionTestCase {
|
||||||
|
public ToAggregateMetricDoubleTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> testCaseSupplier) {
|
||||||
|
this.testCase = testCaseSupplier.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Expression build(Source source, List<Expression> args) {
|
||||||
|
if (args.get(0).dataType() == DataType.AGGREGATE_METRIC_DOUBLE) {
|
||||||
|
assumeTrue("Test sometimes wraps literals as fields", args.get(0).foldable());
|
||||||
|
}
|
||||||
|
return new ToAggregateMetricDouble(source, args.get(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParametersFactory
|
||||||
|
public static Iterable<Object[]> parameters() {
|
||||||
|
final String evaluatorStringLeft = "ToAggregateMetricDoubleFrom";
|
||||||
|
final String evaluatorStringRight = "Evaluator[field=Attribute[channel=0]]";
|
||||||
|
final List<TestCaseSupplier> suppliers = new ArrayList<>();
|
||||||
|
|
||||||
|
TestCaseSupplier.forUnaryInt(
|
||||||
|
suppliers,
|
||||||
|
evaluatorStringLeft + "Int" + evaluatorStringRight,
|
||||||
|
DataType.AGGREGATE_METRIC_DOUBLE,
|
||||||
|
i -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) i, (double) i, (double) i, 1),
|
||||||
|
Integer.MIN_VALUE,
|
||||||
|
Integer.MAX_VALUE,
|
||||||
|
emptyList()
|
||||||
|
);
|
||||||
|
TestCaseSupplier.forUnaryLong(
|
||||||
|
suppliers,
|
||||||
|
evaluatorStringLeft + "Long" + evaluatorStringRight,
|
||||||
|
DataType.AGGREGATE_METRIC_DOUBLE,
|
||||||
|
l -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral((double) l, (double) l, (double) l, 1),
|
||||||
|
Long.MIN_VALUE,
|
||||||
|
Long.MAX_VALUE,
|
||||||
|
emptyList()
|
||||||
|
);
|
||||||
|
TestCaseSupplier.forUnaryUnsignedLong(
|
||||||
|
suppliers,
|
||||||
|
evaluatorStringLeft + "UnsignedLong" + evaluatorStringRight,
|
||||||
|
DataType.AGGREGATE_METRIC_DOUBLE,
|
||||||
|
ul -> {
|
||||||
|
var newVal = ul.doubleValue();
|
||||||
|
return new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(newVal, newVal, newVal, 1);
|
||||||
|
},
|
||||||
|
BigInteger.ZERO,
|
||||||
|
UNSIGNED_LONG_MAX,
|
||||||
|
emptyList()
|
||||||
|
);
|
||||||
|
TestCaseSupplier.forUnaryDouble(
|
||||||
|
suppliers,
|
||||||
|
evaluatorStringLeft + "Double" + evaluatorStringRight,
|
||||||
|
DataType.AGGREGATE_METRIC_DOUBLE,
|
||||||
|
d -> new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(d, d, d, 1),
|
||||||
|
Double.NEGATIVE_INFINITY,
|
||||||
|
Double.POSITIVE_INFINITY,
|
||||||
|
emptyList()
|
||||||
|
);
|
||||||
|
|
||||||
|
return parameterSuppliersFromTypedDataWithDefaultChecksNoErrors(true, suppliers);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.plan.physical;
|
||||||
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder;
|
||||||
import org.elasticsearch.search.SearchModule;
|
import org.elasticsearch.search.SearchModule;
|
||||||
import org.elasticsearch.xpack.esql.core.tree.Node;
|
import org.elasticsearch.xpack.esql.core.tree.Node;
|
||||||
import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
|
import org.elasticsearch.xpack.esql.expression.ExpressionWritables;
|
||||||
|
@ -51,6 +52,7 @@ public abstract class AbstractPhysicalPlanSerializationTests<T extends PhysicalP
|
||||||
entries.addAll(ExpressionWritables.allExpressions());
|
entries.addAll(ExpressionWritables.allExpressions());
|
||||||
entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders
|
entries.addAll(new SearchModule(Settings.EMPTY, List.of()).getNamedWriteables()); // Query builders
|
||||||
entries.add(Add.ENTRY); // Used by the eval tests
|
entries.add(Add.ENTRY); // Used by the eval tests
|
||||||
|
entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY);
|
||||||
return new NamedWriteableRegistry(entries);
|
return new NamedWriteableRegistry(entries);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -609,3 +609,53 @@ _source:
|
||||||
rx: 530600088
|
rx: 530600088
|
||||||
tx: 1434577921
|
tx: 1434577921
|
||||||
uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9
|
uid: df3145b3-0563-4d3b-a0f7-897eb2876ea9
|
||||||
|
|
||||||
|
---
|
||||||
|
to_aggregate_metric_double with multi_values:
|
||||||
|
- requires:
|
||||||
|
test_runner_features: [ capabilities ]
|
||||||
|
capabilities:
|
||||||
|
- method: POST
|
||||||
|
path: /_query
|
||||||
|
parameters: [ ]
|
||||||
|
capabilities: [ aggregate_metric_double_convert_to ]
|
||||||
|
reason: "Support for to_aggregate_metric_double function"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: convert_test
|
||||||
|
body:
|
||||||
|
mappings:
|
||||||
|
properties:
|
||||||
|
"some_long_field":
|
||||||
|
type: long
|
||||||
|
"some_double_field":
|
||||||
|
type: double
|
||||||
|
"some_int_field":
|
||||||
|
type: integer
|
||||||
|
"some_unsigned_long_field":
|
||||||
|
type: unsigned_long
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
refresh: true
|
||||||
|
index: new_test
|
||||||
|
body:
|
||||||
|
- {"index": {}}
|
||||||
|
- {"some_long_field": [20385, 182941, -10958], "some_double_field": [195.1, 102.444], "some_int_field": [64, 121, 498, 1456], "some_unsigned_long_field": [13985, 19418924, 123]}
|
||||||
|
- do:
|
||||||
|
esql.query:
|
||||||
|
body:
|
||||||
|
query: 'FROM new_test | EVAL from_long=TO_AGGREGATE_METRIC_DOUBLE(some_long_field), from_double=TO_AGGREGATE_METRIC_DOUBLE(some_double_field), from_int=TO_AGGREGATE_METRIC_DOUBLE(some_int_field), from_ulong=TO_AGGREGATE_METRIC_DOUBLE(some_unsigned_long_field) | KEEP from_long, from_double, from_int, from_ulong | LIMIT 1'
|
||||||
|
|
||||||
|
- match: {columns.0.name: "from_long"}
|
||||||
|
- match: {columns.0.type: "aggregate_metric_double"}
|
||||||
|
- match: {columns.1.name: "from_double"}
|
||||||
|
- match: {columns.1.type: "aggregate_metric_double"}
|
||||||
|
- match: {columns.2.name: "from_int"}
|
||||||
|
- match: {columns.2.type: "aggregate_metric_double"}
|
||||||
|
- match: {columns.3.name: "from_ulong"}
|
||||||
|
- match: {columns.3.type: "aggregate_metric_double"}
|
||||||
|
- match: {values.0.0: '{"min":-10958.0,"max":182941.0,"sum":192368.0,"value_count":3}'}
|
||||||
|
- match: {values.0.1: '{"min":102.44400024414062,"max":195.10000610351562,"sum":297.54400634765625,"value_count":2}'}
|
||||||
|
- match: {values.0.2: '{"min":64.0,"max":1456.0,"sum":2139.0,"value_count":4}'}
|
||||||
|
- match: {values.0.3: '{"min":123.0,"max":1.9418924E7,"sum":1.9433032E7,"value_count":3}'}
|
||||||
|
|
|
@ -146,3 +146,108 @@ setup:
|
||||||
- match: {columns.0.name: "k8s.pod.network.rx"}
|
- match: {columns.0.name: "k8s.pod.network.rx"}
|
||||||
- match: {columns.0.type: "aggregate_metric_double"}
|
- match: {columns.0.type: "aggregate_metric_double"}
|
||||||
- match: {values.0.0: '{"min":530604.0,"max":530605.0,"sum":1061209.0,"value_count":2}'}
|
- match: {values.0.0: '{"min":530604.0,"max":530605.0,"sum":1061209.0,"value_count":2}'}
|
||||||
|
|
||||||
|
---
|
||||||
|
"Stats from downsampled and non-downsampled index simultaneously":
|
||||||
|
- requires:
|
||||||
|
test_runner_features: [capabilities]
|
||||||
|
capabilities:
|
||||||
|
- method: POST
|
||||||
|
path: /_query
|
||||||
|
parameters: []
|
||||||
|
capabilities: [aggregate_metric_double_convert_to]
|
||||||
|
reason: "Support for to_aggregate_metric_double function"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.downsample:
|
||||||
|
index: test
|
||||||
|
target_index: test-downsample
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"fixed_interval": "1h"
|
||||||
|
}
|
||||||
|
- is_true: acknowledged
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: test-2
|
||||||
|
body:
|
||||||
|
settings:
|
||||||
|
number_of_shards: 1
|
||||||
|
index:
|
||||||
|
mode: time_series
|
||||||
|
routing_path: [ metricset, k8s.pod.uid ]
|
||||||
|
time_series:
|
||||||
|
start_time: 2021-04-29T00:00:00Z
|
||||||
|
end_time: 2021-04-30T00:00:00Z
|
||||||
|
mappings:
|
||||||
|
properties:
|
||||||
|
"@timestamp":
|
||||||
|
type: date
|
||||||
|
metricset:
|
||||||
|
type: keyword
|
||||||
|
time_series_dimension: true
|
||||||
|
k8s:
|
||||||
|
properties:
|
||||||
|
pod:
|
||||||
|
properties:
|
||||||
|
uid:
|
||||||
|
type: keyword
|
||||||
|
time_series_dimension: true
|
||||||
|
name:
|
||||||
|
type: keyword
|
||||||
|
created_at:
|
||||||
|
type: date_nanos
|
||||||
|
running:
|
||||||
|
type: boolean
|
||||||
|
number_of_containers:
|
||||||
|
type: integer
|
||||||
|
ip:
|
||||||
|
type: ip
|
||||||
|
tags:
|
||||||
|
type: keyword
|
||||||
|
values:
|
||||||
|
type: integer
|
||||||
|
network:
|
||||||
|
properties:
|
||||||
|
tx:
|
||||||
|
type: long
|
||||||
|
time_series_metric: gauge
|
||||||
|
rx:
|
||||||
|
type: long
|
||||||
|
time_series_metric: gauge
|
||||||
|
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
refresh: true
|
||||||
|
index: test-2
|
||||||
|
body:
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-29T21:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001810, "rx": 802339}, "created_at": "2021-04-28T19:34:00.000Z", "running": false, "number_of_containers": 2, "tags": ["backend", "prod"], "values": [2, 3, 6]}}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-29T21:50:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.26", "network": {"tx": 2000177, "rx": 800479}, "created_at": "2021-04-28T19:35:00.000Z", "running": true, "number_of_containers": 2, "tags": ["backend", "prod", "us-west1"], "values": [1, 1, 3]}}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
|
||||||
|
- do:
|
||||||
|
esql.query:
|
||||||
|
body:
|
||||||
|
query: "FROM test-* |
|
||||||
|
WHERE k8s.pod.uid == \"947e4ced-1786-4e53-9e0c-5c447e959507\" |
|
||||||
|
EVAL rx = to_aggregate_metric_double(k8s.pod.network.rx) |
|
||||||
|
STATS max(rx), min(rx), sum(rx), count(rx) |
|
||||||
|
LIMIT 100"
|
||||||
|
|
||||||
|
- length: {values: 1}
|
||||||
|
- length: {values.0: 4}
|
||||||
|
- match: {columns.0.name: "max(rx)"}
|
||||||
|
- match: {columns.0.type: "double"}
|
||||||
|
- match: {columns.1.name: "min(rx)"}
|
||||||
|
- match: {columns.1.type: "double"}
|
||||||
|
- match: {columns.2.name: "sum(rx)"}
|
||||||
|
- match: {columns.2.type: "double"}
|
||||||
|
- match: {columns.3.name: "count(rx)"}
|
||||||
|
- match: {columns.3.type: "long"}
|
||||||
|
- match: {values.0.0: 803685.0}
|
||||||
|
- match: {values.0.1: 800479.0}
|
||||||
|
- match: {values.0.2: 4812452.0}
|
||||||
|
- match: {values.0.3: 6}
|
||||||
|
|
|
@ -93,7 +93,7 @@ setup:
|
||||||
- gt: {esql.functions.to_long: $functions_to_long}
|
- gt: {esql.functions.to_long: $functions_to_long}
|
||||||
- match: {esql.functions.coalesce: $functions_coalesce}
|
- match: {esql.functions.coalesce: $functions_coalesce}
|
||||||
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
|
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
|
||||||
- length: {esql.functions: 133} # check the "sister" test below for a likely update to the same esql.functions length check
|
- length: {esql.functions: 134} # check the "sister" test below for a likely update to the same esql.functions length check
|
||||||
|
|
||||||
---
|
---
|
||||||
"Basic ESQL usage output (telemetry) non-snapshot version":
|
"Basic ESQL usage output (telemetry) non-snapshot version":
|
||||||
|
@ -164,4 +164,4 @@ setup:
|
||||||
- match: {esql.functions.cos: $functions_cos}
|
- match: {esql.functions.cos: $functions_cos}
|
||||||
- gt: {esql.functions.to_long: $functions_to_long}
|
- gt: {esql.functions.to_long: $functions_to_long}
|
||||||
- match: {esql.functions.coalesce: $functions_coalesce}
|
- match: {esql.functions.coalesce: $functions_coalesce}
|
||||||
- length: {esql.functions: 130} # check the "sister" test above for a likely update to the same esql.functions length check
|
- length: {esql.functions: 131} # check the "sister" test above for a likely update to the same esql.functions length check
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue