Add last_over_time (#126650)

This PR introduces a time-series aggregation function that collects the 
last value of each time series within each grouping.

For example:

TS index 
| STATS sum(last_over_time(memory_usage)) BY cluster, bucket(@timestamp, 1minute)
This commit is contained in:
Nhat Nguyen 2025-04-17 16:13:58 -07:00 committed by GitHub
parent aebb1de41e
commit 9d5df193ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 2033 additions and 3 deletions

View file

@ -862,4 +862,27 @@ tasks.named('stringTemplates').configure {
it.outputFile = "org/elasticsearch/xpack/compute/operator/lookup/EnrichResultBuilderForBoolean.java"
}
// TODO: add last_over_time for other types: boolean, bytes_refs
File lastOverTimeAggregatorInputFile = file("src/main/java/org/elasticsearch/compute/aggregation/X-LastOverTimeAggregator.java.st")
template {
it.properties = intProperties
it.inputFile = lastOverTimeAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeIntAggregator.java"
}
template {
it.properties = longProperties
it.inputFile = lastOverTimeAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeLongAggregator.java"
}
template {
it.properties = floatProperties
it.inputFile = lastOverTimeAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeFloatAggregator.java"
}
template {
it.properties = doubleProperties
it.inputFile = lastOverTimeAggregatorInputFile
it.outputFile = "org/elasticsearch/compute/aggregation/LastOverTimeDoubleAggregator.java"
}
}

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
/**
* A time-series aggregation function that collects the most recent value of a time series in a specified interval.
* This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "DOUBLE_BLOCK") }
)
public class LastOverTimeDoubleAggregator {
public static GroupingState initGrouping(DriverContext driverContext) {
return new GroupingState(driverContext.bigArrays());
}
// TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order,
// we can read the first encountered value for each group of `_tsid` and time bucket.
public static void combine(GroupingState current, int groupId, long timestamp, double value) {
current.collectValue(groupId, timestamp, value);
}
public static void combineIntermediate(
GroupingState current,
int groupId,
LongBlock timestamps, // stylecheck
DoubleBlock values,
int otherPosition
) {
int valueCount = values.getValueCount(otherPosition);
if (valueCount > 0) {
long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition));
int firstIndex = values.getFirstValueIndex(otherPosition);
for (int i = 0; i < valueCount; i++) {
current.collectValue(groupId, timestamp, values.getDouble(firstIndex + i));
}
}
}
public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) {
if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) {
var timestamp = otherState.timestamps.get(otherGroupId);
var value = otherState.values.get(otherGroupId);
current.collectValue(currentGroupId, timestamp, value);
}
}
public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}
public static final class GroupingState extends AbstractArrayState {
private final BigArrays bigArrays;
private LongArray timestamps;
private DoubleArray values;
GroupingState(BigArrays bigArrays) {
super(bigArrays);
this.bigArrays = bigArrays;
boolean success = false;
LongArray timestamps = null;
try {
timestamps = bigArrays.newLongArray(1, false);
this.timestamps = timestamps;
this.values = bigArrays.newDoubleArray(1, false);
success = true;
} finally {
if (success == false) {
Releasables.close(timestamps, values, super::close);
}
}
}
void collectValue(int groupId, long timestamp, double value) {
if (groupId < timestamps.size()) {
// TODO: handle multiple values?
if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) {
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
} else {
timestamps = bigArrays.grow(timestamps, groupId + 1);
values = bigArrays.grow(values, groupId + 1);
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
trackGroupId(groupId);
}
@Override
public void close() {
Releasables.close(timestamps, values, super::close);
}
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
try (
var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
var valuesBuilder = driverContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())
) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
timestampsBuilder.appendLong(timestamps.get(group));
valuesBuilder.appendDouble(values.get(group));
} else {
timestampsBuilder.appendNull();
valuesBuilder.appendNull();
}
}
blocks[offset] = timestampsBuilder.build();
blocks[offset + 1] = valuesBuilder.build();
}
}
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
try (var builder = evalContext.blockFactory().newDoubleBlockBuilder(selected.getPositionCount())) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
builder.appendDouble(values.get(group));
} else {
builder.appendNull();
}
}
return builder.build();
}
}
}
}

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.FloatArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
/**
* A time-series aggregation function that collects the most recent value of a time series in a specified interval.
* This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "FLOAT_BLOCK") }
)
public class LastOverTimeFloatAggregator {
public static GroupingState initGrouping(DriverContext driverContext) {
return new GroupingState(driverContext.bigArrays());
}
// TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order,
// we can read the first encountered value for each group of `_tsid` and time bucket.
public static void combine(GroupingState current, int groupId, long timestamp, float value) {
current.collectValue(groupId, timestamp, value);
}
public static void combineIntermediate(
GroupingState current,
int groupId,
LongBlock timestamps, // stylecheck
FloatBlock values,
int otherPosition
) {
int valueCount = values.getValueCount(otherPosition);
if (valueCount > 0) {
long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition));
int firstIndex = values.getFirstValueIndex(otherPosition);
for (int i = 0; i < valueCount; i++) {
current.collectValue(groupId, timestamp, values.getFloat(firstIndex + i));
}
}
}
public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) {
if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) {
var timestamp = otherState.timestamps.get(otherGroupId);
var value = otherState.values.get(otherGroupId);
current.collectValue(currentGroupId, timestamp, value);
}
}
public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}
public static final class GroupingState extends AbstractArrayState {
private final BigArrays bigArrays;
private LongArray timestamps;
private FloatArray values;
GroupingState(BigArrays bigArrays) {
super(bigArrays);
this.bigArrays = bigArrays;
boolean success = false;
LongArray timestamps = null;
try {
timestamps = bigArrays.newLongArray(1, false);
this.timestamps = timestamps;
this.values = bigArrays.newFloatArray(1, false);
success = true;
} finally {
if (success == false) {
Releasables.close(timestamps, values, super::close);
}
}
}
void collectValue(int groupId, long timestamp, float value) {
if (groupId < timestamps.size()) {
// TODO: handle multiple values?
if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) {
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
} else {
timestamps = bigArrays.grow(timestamps, groupId + 1);
values = bigArrays.grow(values, groupId + 1);
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
trackGroupId(groupId);
}
@Override
public void close() {
Releasables.close(timestamps, values, super::close);
}
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
try (
var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
var valuesBuilder = driverContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())
) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
timestampsBuilder.appendLong(timestamps.get(group));
valuesBuilder.appendFloat(values.get(group));
} else {
timestampsBuilder.appendNull();
valuesBuilder.appendNull();
}
}
blocks[offset] = timestampsBuilder.build();
blocks[offset + 1] = valuesBuilder.build();
}
}
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
try (var builder = evalContext.blockFactory().newFloatBlockBuilder(selected.getPositionCount())) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
builder.appendFloat(values.get(group));
} else {
builder.appendNull();
}
}
return builder.build();
}
}
}
}

View file

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
/**
* A time-series aggregation function that collects the most recent value of a time series in a specified interval.
* This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "INT_BLOCK") }
)
public class LastOverTimeIntAggregator {
public static GroupingState initGrouping(DriverContext driverContext) {
return new GroupingState(driverContext.bigArrays());
}
// TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order,
// we can read the first encountered value for each group of `_tsid` and time bucket.
public static void combine(GroupingState current, int groupId, long timestamp, int value) {
current.collectValue(groupId, timestamp, value);
}
public static void combineIntermediate(
GroupingState current,
int groupId,
LongBlock timestamps, // stylecheck
IntBlock values,
int otherPosition
) {
int valueCount = values.getValueCount(otherPosition);
if (valueCount > 0) {
long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition));
int firstIndex = values.getFirstValueIndex(otherPosition);
for (int i = 0; i < valueCount; i++) {
current.collectValue(groupId, timestamp, values.getInt(firstIndex + i));
}
}
}
public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) {
if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) {
var timestamp = otherState.timestamps.get(otherGroupId);
var value = otherState.values.get(otherGroupId);
current.collectValue(currentGroupId, timestamp, value);
}
}
public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}
public static final class GroupingState extends AbstractArrayState {
private final BigArrays bigArrays;
private LongArray timestamps;
private IntArray values;
GroupingState(BigArrays bigArrays) {
super(bigArrays);
this.bigArrays = bigArrays;
boolean success = false;
LongArray timestamps = null;
try {
timestamps = bigArrays.newLongArray(1, false);
this.timestamps = timestamps;
this.values = bigArrays.newIntArray(1, false);
success = true;
} finally {
if (success == false) {
Releasables.close(timestamps, values, super::close);
}
}
}
void collectValue(int groupId, long timestamp, int value) {
if (groupId < timestamps.size()) {
// TODO: handle multiple values?
if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) {
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
} else {
timestamps = bigArrays.grow(timestamps, groupId + 1);
values = bigArrays.grow(values, groupId + 1);
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
trackGroupId(groupId);
}
@Override
public void close() {
Releasables.close(timestamps, values, super::close);
}
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
try (
var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
var valuesBuilder = driverContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())
) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
timestampsBuilder.appendLong(timestamps.get(group));
valuesBuilder.appendInt(values.get(group));
} else {
timestampsBuilder.appendNull();
valuesBuilder.appendNull();
}
}
blocks[offset] = timestampsBuilder.build();
blocks[offset + 1] = valuesBuilder.build();
}
}
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
try (var builder = evalContext.blockFactory().newIntBlockBuilder(selected.getPositionCount())) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
builder.appendInt(values.get(group));
} else {
builder.appendNull();
}
}
return builder.build();
}
}
}
}

View file

@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
/**
* A time-series aggregation function that collects the most recent value of a time series in a specified interval.
* This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "LONG_BLOCK") }
)
public class LastOverTimeLongAggregator {
public static GroupingState initGrouping(DriverContext driverContext) {
return new GroupingState(driverContext.bigArrays());
}
// TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order,
// we can read the first encountered value for each group of `_tsid` and time bucket.
public static void combine(GroupingState current, int groupId, long timestamp, long value) {
current.collectValue(groupId, timestamp, value);
}
public static void combineIntermediate(
GroupingState current,
int groupId,
LongBlock timestamps, // stylecheck
LongBlock values,
int otherPosition
) {
int valueCount = values.getValueCount(otherPosition);
if (valueCount > 0) {
long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition));
int firstIndex = values.getFirstValueIndex(otherPosition);
for (int i = 0; i < valueCount; i++) {
current.collectValue(groupId, timestamp, values.getLong(firstIndex + i));
}
}
}
public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) {
if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) {
var timestamp = otherState.timestamps.get(otherGroupId);
var value = otherState.values.get(otherGroupId);
current.collectValue(currentGroupId, timestamp, value);
}
}
public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}
public static final class GroupingState extends AbstractArrayState {
private final BigArrays bigArrays;
private LongArray timestamps;
private LongArray values;
GroupingState(BigArrays bigArrays) {
super(bigArrays);
this.bigArrays = bigArrays;
boolean success = false;
LongArray timestamps = null;
try {
timestamps = bigArrays.newLongArray(1, false);
this.timestamps = timestamps;
this.values = bigArrays.newLongArray(1, false);
success = true;
} finally {
if (success == false) {
Releasables.close(timestamps, values, super::close);
}
}
}
void collectValue(int groupId, long timestamp, long value) {
if (groupId < timestamps.size()) {
// TODO: handle multiple values?
if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) {
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
} else {
timestamps = bigArrays.grow(timestamps, groupId + 1);
values = bigArrays.grow(values, groupId + 1);
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
trackGroupId(groupId);
}
@Override
public void close() {
Releasables.close(timestamps, values, super::close);
}
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
try (
var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
var valuesBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())
) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
timestampsBuilder.appendLong(timestamps.get(group));
valuesBuilder.appendLong(values.get(group));
} else {
timestampsBuilder.appendNull();
valuesBuilder.appendNull();
}
}
blocks[offset] = timestampsBuilder.build();
blocks[offset + 1] = valuesBuilder.build();
}
}
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
try (var builder = evalContext.blockFactory().newLongBlockBuilder(selected.getPositionCount())) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
builder.appendLong(values.get(group));
} else {
builder.appendNull();
}
}
return builder.build();
}
}
}
}

View file

@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.util.List;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeDoubleAggregator}.
* This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
*/
public final class LastOverTimeDoubleAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
public LastOverTimeDoubleAggregatorFunctionSupplier() {
}
@Override
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
return LastOverTimeDoubleGroupingAggregatorFunction.intermediateStateDesc();
}
@Override
public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public LastOverTimeDoubleGroupingAggregatorFunction groupingAggregator(
DriverContext driverContext, List<Integer> channels) {
return LastOverTimeDoubleGroupingAggregatorFunction.create(channels, driverContext);
}
@Override
public String describe() {
return "last_over_time of doubles";
}
}

View file

@ -0,0 +1,228 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.lang.StringBuilder;
import java.util.List;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.DoubleVector;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeDoubleAggregator}.
* This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
*/
public final class LastOverTimeDoubleGroupingAggregatorFunction implements GroupingAggregatorFunction {
private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("timestamps", ElementType.LONG),
new IntermediateStateDesc("values", ElementType.DOUBLE) );
private final LastOverTimeDoubleAggregator.GroupingState state;
private final List<Integer> channels;
private final DriverContext driverContext;
public LastOverTimeDoubleGroupingAggregatorFunction(List<Integer> channels,
LastOverTimeDoubleAggregator.GroupingState state, DriverContext driverContext) {
this.channels = channels;
this.state = state;
this.driverContext = driverContext;
}
public static LastOverTimeDoubleGroupingAggregatorFunction create(List<Integer> channels,
DriverContext driverContext) {
return new LastOverTimeDoubleGroupingAggregatorFunction(channels, LastOverTimeDoubleAggregator.initGrouping(driverContext), driverContext);
}
public static List<IntermediateStateDesc> intermediateStateDesc() {
return INTERMEDIATE_STATE_DESC;
}
@Override
public int intermediateBlockCount() {
return INTERMEDIATE_STATE_DESC.size();
}
@Override
public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds,
Page page) {
DoubleBlock valuesBlock = page.getBlock(channels.get(0));
DoubleVector valuesVector = valuesBlock.asVector();
LongBlock timestampsBlock = page.getBlock(channels.get(1));
LongVector timestampsVector = timestampsBlock.asVector();
if (timestampsVector == null) {
throw new IllegalStateException("expected @timestamp vector; but got a block");
}
if (valuesVector == null) {
if (valuesBlock.mayHaveNulls()) {
state.enableGroupIdTracking(seenGroupIds);
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void close() {
}
};
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void close() {
}
};
}
private void addRawInput(int positionOffset, IntVector groups, DoubleBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v));
}
}
}
private void addRawInput(int positionOffset, IntVector groups, DoubleVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
var valuePosition = groupPosition + positionOffset;
LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition));
}
}
private void addRawInput(int positionOffset, IntBlock groups, DoubleBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(v), values.getDouble(v));
}
}
}
}
private void addRawInput(int positionOffset, IntBlock groups, DoubleVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
var valuePosition = groupPosition + positionOffset;
LastOverTimeDoubleAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getDouble(valuePosition));
}
}
}
@Override
public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
state.enableGroupIdTracking(seenGroupIds);
}
@Override
public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
state.enableGroupIdTracking(new SeenGroupIds.Empty());
assert channels.size() == intermediateBlockCount();
Block timestampsUncast = page.getBlock(channels.get(0));
if (timestampsUncast.areAllValuesNull()) {
return;
}
LongBlock timestamps = (LongBlock) timestampsUncast;
Block valuesUncast = page.getBlock(channels.get(1));
if (valuesUncast.areAllValuesNull()) {
return;
}
DoubleBlock values = (DoubleBlock) valuesUncast;
assert timestamps.getPositionCount() == values.getPositionCount();
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
LastOverTimeDoubleAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset);
}
}
@Override
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
if (input.getClass() != getClass()) {
throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
}
LastOverTimeDoubleAggregator.GroupingState inState = ((LastOverTimeDoubleGroupingAggregatorFunction) input).state;
state.enableGroupIdTracking(new SeenGroupIds.Empty());
LastOverTimeDoubleAggregator.combineStates(state, groupId, inState, position);
}
@Override
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
state.toIntermediate(blocks, offset, selected, driverContext);
}
@Override
public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
GroupingAggregatorEvaluationContext evaluatorContext) {
blocks[offset] = LastOverTimeDoubleAggregator.evaluateFinal(state, selected, evaluatorContext);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName()).append("[");
sb.append("channels=").append(channels);
sb.append("]");
return sb.toString();
}
@Override
public void close() {
state.close();
}
}

View file

@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.util.List;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeFloatAggregator}.
* This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
*/
public final class LastOverTimeFloatAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
public LastOverTimeFloatAggregatorFunctionSupplier() {
}
@Override
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
return LastOverTimeFloatGroupingAggregatorFunction.intermediateStateDesc();
}
@Override
public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public LastOverTimeFloatGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
List<Integer> channels) {
return LastOverTimeFloatGroupingAggregatorFunction.create(channels, driverContext);
}
@Override
public String describe() {
return "last_over_time of floats";
}
}

View file

@ -0,0 +1,228 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.lang.StringBuilder;
import java.util.List;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.FloatVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeFloatAggregator}.
* This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
*/
public final class LastOverTimeFloatGroupingAggregatorFunction implements GroupingAggregatorFunction {
private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("timestamps", ElementType.LONG),
new IntermediateStateDesc("values", ElementType.FLOAT) );
private final LastOverTimeFloatAggregator.GroupingState state;
private final List<Integer> channels;
private final DriverContext driverContext;
public LastOverTimeFloatGroupingAggregatorFunction(List<Integer> channels,
LastOverTimeFloatAggregator.GroupingState state, DriverContext driverContext) {
this.channels = channels;
this.state = state;
this.driverContext = driverContext;
}
public static LastOverTimeFloatGroupingAggregatorFunction create(List<Integer> channels,
DriverContext driverContext) {
return new LastOverTimeFloatGroupingAggregatorFunction(channels, LastOverTimeFloatAggregator.initGrouping(driverContext), driverContext);
}
public static List<IntermediateStateDesc> intermediateStateDesc() {
return INTERMEDIATE_STATE_DESC;
}
@Override
public int intermediateBlockCount() {
return INTERMEDIATE_STATE_DESC.size();
}
@Override
public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds,
Page page) {
FloatBlock valuesBlock = page.getBlock(channels.get(0));
FloatVector valuesVector = valuesBlock.asVector();
LongBlock timestampsBlock = page.getBlock(channels.get(1));
LongVector timestampsVector = timestampsBlock.asVector();
if (timestampsVector == null) {
throw new IllegalStateException("expected @timestamp vector; but got a block");
}
if (valuesVector == null) {
if (valuesBlock.mayHaveNulls()) {
state.enableGroupIdTracking(seenGroupIds);
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void close() {
}
};
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void close() {
}
};
}
private void addRawInput(int positionOffset, IntVector groups, FloatBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v));
}
}
}
private void addRawInput(int positionOffset, IntVector groups, FloatVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
var valuePosition = groupPosition + positionOffset;
LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition));
}
}
private void addRawInput(int positionOffset, IntBlock groups, FloatBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(v), values.getFloat(v));
}
}
}
}
private void addRawInput(int positionOffset, IntBlock groups, FloatVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
var valuePosition = groupPosition + positionOffset;
LastOverTimeFloatAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getFloat(valuePosition));
}
}
}
@Override
public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
state.enableGroupIdTracking(seenGroupIds);
}
@Override
public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
state.enableGroupIdTracking(new SeenGroupIds.Empty());
assert channels.size() == intermediateBlockCount();
Block timestampsUncast = page.getBlock(channels.get(0));
if (timestampsUncast.areAllValuesNull()) {
return;
}
LongBlock timestamps = (LongBlock) timestampsUncast;
Block valuesUncast = page.getBlock(channels.get(1));
if (valuesUncast.areAllValuesNull()) {
return;
}
FloatBlock values = (FloatBlock) valuesUncast;
assert timestamps.getPositionCount() == values.getPositionCount();
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
LastOverTimeFloatAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset);
}
}
@Override
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
if (input.getClass() != getClass()) {
throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
}
LastOverTimeFloatAggregator.GroupingState inState = ((LastOverTimeFloatGroupingAggregatorFunction) input).state;
state.enableGroupIdTracking(new SeenGroupIds.Empty());
LastOverTimeFloatAggregator.combineStates(state, groupId, inState, position);
}
@Override
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
state.toIntermediate(blocks, offset, selected, driverContext);
}
@Override
public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
GroupingAggregatorEvaluationContext evaluatorContext) {
blocks[offset] = LastOverTimeFloatAggregator.evaluateFinal(state, selected, evaluatorContext);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName()).append("[");
sb.append("channels=").append(channels);
sb.append("]");
return sb.toString();
}
@Override
public void close() {
state.close();
}
}

View file

@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.util.List;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeIntAggregator}.
* This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
*/
public final class LastOverTimeIntAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
public LastOverTimeIntAggregatorFunctionSupplier() {
}
@Override
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
return LastOverTimeIntGroupingAggregatorFunction.intermediateStateDesc();
}
@Override
public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public LastOverTimeIntGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
List<Integer> channels) {
return LastOverTimeIntGroupingAggregatorFunction.create(channels, driverContext);
}
@Override
public String describe() {
return "last_over_time of ints";
}
}

View file

@ -0,0 +1,226 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.lang.StringBuilder;
import java.util.List;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeIntAggregator}.
* This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
*/
public final class LastOverTimeIntGroupingAggregatorFunction implements GroupingAggregatorFunction {
private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("timestamps", ElementType.LONG),
new IntermediateStateDesc("values", ElementType.INT) );
private final LastOverTimeIntAggregator.GroupingState state;
private final List<Integer> channels;
private final DriverContext driverContext;
public LastOverTimeIntGroupingAggregatorFunction(List<Integer> channels,
LastOverTimeIntAggregator.GroupingState state, DriverContext driverContext) {
this.channels = channels;
this.state = state;
this.driverContext = driverContext;
}
public static LastOverTimeIntGroupingAggregatorFunction create(List<Integer> channels,
DriverContext driverContext) {
return new LastOverTimeIntGroupingAggregatorFunction(channels, LastOverTimeIntAggregator.initGrouping(driverContext), driverContext);
}
public static List<IntermediateStateDesc> intermediateStateDesc() {
return INTERMEDIATE_STATE_DESC;
}
@Override
public int intermediateBlockCount() {
return INTERMEDIATE_STATE_DESC.size();
}
@Override
public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds,
Page page) {
IntBlock valuesBlock = page.getBlock(channels.get(0));
IntVector valuesVector = valuesBlock.asVector();
LongBlock timestampsBlock = page.getBlock(channels.get(1));
LongVector timestampsVector = timestampsBlock.asVector();
if (timestampsVector == null) {
throw new IllegalStateException("expected @timestamp vector; but got a block");
}
if (valuesVector == null) {
if (valuesBlock.mayHaveNulls()) {
state.enableGroupIdTracking(seenGroupIds);
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void close() {
}
};
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void close() {
}
};
}
private void addRawInput(int positionOffset, IntVector groups, IntBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v));
}
}
}
private void addRawInput(int positionOffset, IntVector groups, IntVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
var valuePosition = groupPosition + positionOffset;
LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition));
}
}
private void addRawInput(int positionOffset, IntBlock groups, IntBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(v), values.getInt(v));
}
}
}
}
private void addRawInput(int positionOffset, IntBlock groups, IntVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
var valuePosition = groupPosition + positionOffset;
LastOverTimeIntAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getInt(valuePosition));
}
}
}
@Override
public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
state.enableGroupIdTracking(seenGroupIds);
}
@Override
public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
state.enableGroupIdTracking(new SeenGroupIds.Empty());
assert channels.size() == intermediateBlockCount();
Block timestampsUncast = page.getBlock(channels.get(0));
if (timestampsUncast.areAllValuesNull()) {
return;
}
LongBlock timestamps = (LongBlock) timestampsUncast;
Block valuesUncast = page.getBlock(channels.get(1));
if (valuesUncast.areAllValuesNull()) {
return;
}
IntBlock values = (IntBlock) valuesUncast;
assert timestamps.getPositionCount() == values.getPositionCount();
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
LastOverTimeIntAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset);
}
}
@Override
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
if (input.getClass() != getClass()) {
throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
}
LastOverTimeIntAggregator.GroupingState inState = ((LastOverTimeIntGroupingAggregatorFunction) input).state;
state.enableGroupIdTracking(new SeenGroupIds.Empty());
LastOverTimeIntAggregator.combineStates(state, groupId, inState, position);
}
@Override
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
state.toIntermediate(blocks, offset, selected, driverContext);
}
@Override
public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
GroupingAggregatorEvaluationContext evaluatorContext) {
blocks[offset] = LastOverTimeIntAggregator.evaluateFinal(state, selected, evaluatorContext);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName()).append("[");
sb.append("channels=").append(channels);
sb.append("]");
return sb.toString();
}
@Override
public void close() {
state.close();
}
}

View file

@ -0,0 +1,46 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.util.List;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link AggregatorFunctionSupplier} implementation for {@link LastOverTimeLongAggregator}.
* This class is generated. Edit {@code AggregatorFunctionSupplierImplementer} instead.
*/
public final class LastOverTimeLongAggregatorFunctionSupplier implements AggregatorFunctionSupplier {
public LastOverTimeLongAggregatorFunctionSupplier() {
}
@Override
public List<IntermediateStateDesc> nonGroupingIntermediateStateDesc() {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public List<IntermediateStateDesc> groupingIntermediateStateDesc() {
return LastOverTimeLongGroupingAggregatorFunction.intermediateStateDesc();
}
@Override
public AggregatorFunction aggregator(DriverContext driverContext, List<Integer> channels) {
throw new UnsupportedOperationException("non-grouping aggregator is not supported");
}
@Override
public LastOverTimeLongGroupingAggregatorFunction groupingAggregator(DriverContext driverContext,
List<Integer> channels) {
return LastOverTimeLongGroupingAggregatorFunction.create(channels, driverContext);
}
@Override
public String describe() {
return "last_over_time of longs";
}
}

View file

@ -0,0 +1,226 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License
// 2.0; you may not use this file except in compliance with the Elastic License
// 2.0.
package org.elasticsearch.compute.aggregation;
import java.lang.Integer;
import java.lang.Override;
import java.lang.String;
import java.lang.StringBuilder;
import java.util.List;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
/**
* {@link GroupingAggregatorFunction} implementation for {@link LastOverTimeLongAggregator}.
* This class is generated. Edit {@code GroupingAggregatorImplementer} instead.
*/
public final class LastOverTimeLongGroupingAggregatorFunction implements GroupingAggregatorFunction {
private static final List<IntermediateStateDesc> INTERMEDIATE_STATE_DESC = List.of(
new IntermediateStateDesc("timestamps", ElementType.LONG),
new IntermediateStateDesc("values", ElementType.LONG) );
private final LastOverTimeLongAggregator.GroupingState state;
private final List<Integer> channels;
private final DriverContext driverContext;
public LastOverTimeLongGroupingAggregatorFunction(List<Integer> channels,
LastOverTimeLongAggregator.GroupingState state, DriverContext driverContext) {
this.channels = channels;
this.state = state;
this.driverContext = driverContext;
}
public static LastOverTimeLongGroupingAggregatorFunction create(List<Integer> channels,
DriverContext driverContext) {
return new LastOverTimeLongGroupingAggregatorFunction(channels, LastOverTimeLongAggregator.initGrouping(driverContext), driverContext);
}
public static List<IntermediateStateDesc> intermediateStateDesc() {
return INTERMEDIATE_STATE_DESC;
}
@Override
public int intermediateBlockCount() {
return INTERMEDIATE_STATE_DESC.size();
}
@Override
public GroupingAggregatorFunction.AddInput prepareProcessPage(SeenGroupIds seenGroupIds,
Page page) {
LongBlock valuesBlock = page.getBlock(channels.get(0));
LongVector valuesVector = valuesBlock.asVector();
LongBlock timestampsBlock = page.getBlock(channels.get(1));
LongVector timestampsVector = timestampsBlock.asVector();
if (timestampsVector == null) {
throw new IllegalStateException("expected @timestamp vector; but got a block");
}
if (valuesVector == null) {
if (valuesBlock.mayHaveNulls()) {
state.enableGroupIdTracking(seenGroupIds);
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesBlock, timestampsVector);
}
@Override
public void close() {
}
};
}
return new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntBlock groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void add(int positionOffset, IntVector groupIds) {
addRawInput(positionOffset, groupIds, valuesVector, timestampsVector);
}
@Override
public void close() {
}
};
}
private void addRawInput(int positionOffset, IntVector groups, LongBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v));
}
}
}
private void addRawInput(int positionOffset, IntVector groups, LongVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
var valuePosition = groupPosition + positionOffset;
LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition));
}
}
private void addRawInput(int positionOffset, IntBlock groups, LongBlock values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
if (values.isNull(groupPosition + positionOffset)) {
continue;
}
int valuesStart = values.getFirstValueIndex(groupPosition + positionOffset);
int valuesEnd = valuesStart + values.getValueCount(groupPosition + positionOffset);
for (int v = valuesStart; v < valuesEnd; v++) {
LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(v), values.getLong(v));
}
}
}
}
private void addRawInput(int positionOffset, IntBlock groups, LongVector values,
LongVector timestamps) {
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
if (groups.isNull(groupPosition)) {
continue;
}
int groupStart = groups.getFirstValueIndex(groupPosition);
int groupEnd = groupStart + groups.getValueCount(groupPosition);
for (int g = groupStart; g < groupEnd; g++) {
int groupId = groups.getInt(g);
var valuePosition = groupPosition + positionOffset;
LastOverTimeLongAggregator.combine(state, groupId, timestamps.getLong(valuePosition), values.getLong(valuePosition));
}
}
}
@Override
public void selectedMayContainUnseenGroups(SeenGroupIds seenGroupIds) {
state.enableGroupIdTracking(seenGroupIds);
}
@Override
public void addIntermediateInput(int positionOffset, IntVector groups, Page page) {
state.enableGroupIdTracking(new SeenGroupIds.Empty());
assert channels.size() == intermediateBlockCount();
Block timestampsUncast = page.getBlock(channels.get(0));
if (timestampsUncast.areAllValuesNull()) {
return;
}
LongBlock timestamps = (LongBlock) timestampsUncast;
Block valuesUncast = page.getBlock(channels.get(1));
if (valuesUncast.areAllValuesNull()) {
return;
}
LongBlock values = (LongBlock) valuesUncast;
assert timestamps.getPositionCount() == values.getPositionCount();
for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) {
int groupId = groups.getInt(groupPosition);
LastOverTimeLongAggregator.combineIntermediate(state, groupId, timestamps, values, groupPosition + positionOffset);
}
}
@Override
public void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position) {
if (input.getClass() != getClass()) {
throw new IllegalArgumentException("expected " + getClass() + "; got " + input.getClass());
}
LastOverTimeLongAggregator.GroupingState inState = ((LastOverTimeLongGroupingAggregatorFunction) input).state;
state.enableGroupIdTracking(new SeenGroupIds.Empty());
LastOverTimeLongAggregator.combineStates(state, groupId, inState, position);
}
@Override
public void evaluateIntermediate(Block[] blocks, int offset, IntVector selected) {
state.toIntermediate(blocks, offset, selected, driverContext);
}
@Override
public void evaluateFinal(Block[] blocks, int offset, IntVector selected,
GroupingAggregatorEvaluationContext evaluatorContext) {
blocks[offset] = LastOverTimeLongAggregator.evaluateFinal(state, selected, evaluatorContext);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName()).append("[");
sb.append("channels=").append(channels);
sb.append("]");
return sb.toString();
}
@Override
public void close() {
state.close();
}
}

View file

@ -0,0 +1,153 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.aggregation;
import org.elasticsearch.common.util.BigArrays;
$if(int||double||float)$
import org.elasticsearch.common.util.$Type$Array;
$endif$
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.compute.ann.GroupingAggregator;
import org.elasticsearch.compute.ann.IntermediateState;
import org.elasticsearch.compute.data.Block;
$if(int||double||float)$
import org.elasticsearch.compute.data.$Type$Block;
$endif$
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.core.Releasables;
/**
* A time-series aggregation function that collects the most recent value of a time series in a specified interval.
* This class is generated. Edit `X-LastOverTimeAggregator.java.st` instead.
*/
@GroupingAggregator(
timeseries = true,
value = { @IntermediateState(name = "timestamps", type = "LONG_BLOCK"), @IntermediateState(name = "values", type = "$TYPE$_BLOCK") }
)
public class LastOverTime$Type$Aggregator {
public static GroupingState initGrouping(DriverContext driverContext) {
return new GroupingState(driverContext.bigArrays());
}
// TODO: Since data in data_streams is sorted by `_tsid` and timestamp in descending order,
// we can read the first encountered value for each group of `_tsid` and time bucket.
public static void combine(GroupingState current, int groupId, long timestamp, $type$ value) {
current.collectValue(groupId, timestamp, value);
}
public static void combineIntermediate(
GroupingState current,
int groupId,
LongBlock timestamps, // stylecheck
$Type$Block values,
int otherPosition
) {
int valueCount = values.getValueCount(otherPosition);
if (valueCount > 0) {
long timestamp = timestamps.getLong(timestamps.getFirstValueIndex(otherPosition));
int firstIndex = values.getFirstValueIndex(otherPosition);
for (int i = 0; i < valueCount; i++) {
current.collectValue(groupId, timestamp, values.get$Type$(firstIndex + i));
}
}
}
public static void combineStates(GroupingState current, int currentGroupId, GroupingState otherState, int otherGroupId) {
if (otherGroupId < otherState.timestamps.size() && otherState.hasValue(otherGroupId)) {
var timestamp = otherState.timestamps.get(otherGroupId);
var value = otherState.values.get(otherGroupId);
current.collectValue(currentGroupId, timestamp, value);
}
}
public static Block evaluateFinal(GroupingState state, IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
return state.evaluateFinal(selected, evalContext);
}
public static final class GroupingState extends AbstractArrayState {
private final BigArrays bigArrays;
private LongArray timestamps;
private $Type$Array values;
GroupingState(BigArrays bigArrays) {
super(bigArrays);
this.bigArrays = bigArrays;
boolean success = false;
LongArray timestamps = null;
try {
timestamps = bigArrays.newLongArray(1, false);
this.timestamps = timestamps;
this.values = bigArrays.new$Type$Array(1, false);
success = true;
} finally {
if (success == false) {
Releasables.close(timestamps, values, super::close);
}
}
}
void collectValue(int groupId, long timestamp, $type$ value) {
if (groupId < timestamps.size()) {
// TODO: handle multiple values?
if (hasValue(groupId) == false || timestamps.get(groupId) < timestamp) {
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
} else {
timestamps = bigArrays.grow(timestamps, groupId + 1);
values = bigArrays.grow(values, groupId + 1);
timestamps.set(groupId, timestamp);
values.set(groupId, value);
}
trackGroupId(groupId);
}
@Override
public void close() {
Releasables.close(timestamps, values, super::close);
}
@Override
public void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) {
try (
var timestampsBuilder = driverContext.blockFactory().newLongBlockBuilder(selected.getPositionCount());
var valuesBuilder = driverContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())
) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
timestampsBuilder.appendLong(timestamps.get(group));
valuesBuilder.append$Type$(values.get(group));
} else {
timestampsBuilder.appendNull();
valuesBuilder.appendNull();
}
}
blocks[offset] = timestampsBuilder.build();
blocks[offset + 1] = valuesBuilder.build();
}
}
Block evaluateFinal(IntVector selected, GroupingAggregatorEvaluationContext evalContext) {
try (var builder = evalContext.blockFactory().new$Type$BlockBuilder(selected.getPositionCount())) {
for (int p = 0; p < selected.getPositionCount(); p++) {
int group = selected.getInt(p);
if (group < timestamps.size() && hasValue(group)) {
builder.append$Type$(values.get(group));
} else {
builder.appendNull();
}
}
return builder.build();
}
}
}
}

View file

@ -219,3 +219,22 @@ avg_cost:double | cluster:keyword | time_bucket:datetime
8.71875 | prod | 2024-05-10T00:22:00.000Z
8.5625 | qa | 2024-05-10T00:22:00.000Z
;
max_of_last_over_time
required_capability: metrics_command
required_capability: last_over_time
TS k8s | STATS max_cost=max(last_over_time(network.cost)) BY cluster, time_bucket = bucket(@timestamp,1minute) | SORT max_cost DESC, time_bucket DESC, cluster | LIMIT 10;
max_cost:double | cluster:keyword | time_bucket:datetime
12.5 | staging | 2024-05-10T00:09:00.000Z
12.375 | prod | 2024-05-10T00:17:00.000Z
12.375 | qa | 2024-05-10T00:06:00.000Z
12.375 | qa | 2024-05-10T00:01:00.000Z
12.25 | prod | 2024-05-10T00:19:00.000Z
12.125 | qa | 2024-05-10T00:17:00.000Z
12.125 | prod | 2024-05-10T00:00:00.000Z
12.0 | prod | 2024-05-10T00:08:00.000Z
12.0 | qa | 2024-05-10T00:08:00.000Z
11.875 | qa | 2024-05-10T00:21:00.000Z
;

View file

@ -1017,7 +1017,12 @@ public class EsqlCapabilities {
* During resolution (pre-analysis) we have to consider that joins or enriches can override EVALuated values
* https://github.com/elastic/elasticsearch/issues/126419
*/
FIX_JOIN_MASKING_EVAL;
FIX_JOIN_MASKING_EVAL,
/**
* Support last_over_time aggregation that gets evaluated per time-series
*/
LAST_OVER_TIME(Build.current().isSnapshot());
private final boolean enabled;

View file

@ -22,6 +22,7 @@ import org.elasticsearch.xpack.esql.expression.function.aggregate.Avg;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AvgOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Median;
@ -444,6 +445,7 @@ public class EsqlFunctionRegistry {
def(Rate.class, Rate::withUnresolvedTimestamp, "rate"),
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"),
def(LastOverTime.class, LastOverTime::withUnresolvedTimestamp, "last_over_time"),
def(Term.class, bi(Term::new), "term") } };
}

View file

@ -32,6 +32,7 @@ public class AggregateWritables {
Values.ENTRY,
MaxOverTime.ENTRY,
AvgOverTime.ENTRY,
LastOverTime.ENTRY,
// internal functions
ToPartial.ENTRY,
FromPartial.ENTRY,

View file

@ -0,0 +1,141 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.esql.expression.function.aggregate;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastOverTimeDoubleAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastOverTimeFloatAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastOverTimeIntAggregatorFunctionSupplier;
import org.elasticsearch.compute.aggregation.LastOverTimeLongAggregatorFunctionSupplier;
import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
import org.elasticsearch.xpack.esql.core.expression.UnresolvedAttribute;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
import org.elasticsearch.xpack.esql.expression.function.FunctionType;
import org.elasticsearch.xpack.esql.expression.function.OptionalArgument;
import org.elasticsearch.xpack.esql.expression.function.Param;
import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
import org.elasticsearch.xpack.esql.planner.ToAggregator;
import java.io.IOException;
import java.util.List;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
public class LastOverTime extends TimeSeriesAggregateFunction implements OptionalArgument, ToAggregator {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Expression.class,
"LastOverTime",
LastOverTime::new
);
private final Expression timestamp;
@FunctionInfo(
returnType = { "int", "double", "integer", "long" },
description = "Collect the most recent value of a time-series in the specified interval. Available with TS command only",
type = FunctionType.AGGREGATE
)
public LastOverTime(
Source source,
@Param(name = "field", type = { "long|int|double|float" }, description = "field") Expression field,
Expression timestamp
) {
this(source, field, Literal.TRUE, timestamp);
}
// compatibility constructor used when reading from the stream
private LastOverTime(Source source, Expression field, Expression filter, List<Expression> children) {
this(source, field, filter, children.getFirst());
}
private LastOverTime(Source source, Expression field, Expression filter, Expression timestamp) {
super(source, field, filter, List.of(timestamp));
this.timestamp = timestamp;
}
public LastOverTime(StreamInput in) throws IOException {
this(
Source.readFrom((PlanStreamInput) in),
in.readNamedWriteable(Expression.class),
in.readNamedWriteable(Expression.class),
in.readNamedWriteableCollectionAsList(Expression.class)
);
}
@Override
public String getWriteableName() {
return ENTRY.name;
}
public static LastOverTime withUnresolvedTimestamp(Source source, Expression field) {
return new LastOverTime(source, field, new UnresolvedAttribute(source, "@timestamp"));
}
@Override
protected NodeInfo<LastOverTime> info() {
return NodeInfo.create(this, LastOverTime::new, field(), timestamp);
}
@Override
public LastOverTime replaceChildren(List<Expression> newChildren) {
if (newChildren.size() != 3) {
assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren;
throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren);
}
return new LastOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
}
@Override
public LastOverTime withFilter(Expression filter) {
return new LastOverTime(source(), field(), filter, timestamp);
}
@Override
public DataType dataType() {
return field().dataType();
}
@Override
protected TypeResolution resolveType() {
return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long");
}
@Override
public AggregatorFunctionSupplier supplier() {
final DataType type = field().dataType();
return switch (type) {
case LONG -> new LastOverTimeLongAggregatorFunctionSupplier();
case INTEGER -> new LastOverTimeIntAggregatorFunctionSupplier();
case DOUBLE -> new LastOverTimeDoubleAggregatorFunctionSupplier();
case FLOAT -> new LastOverTimeFloatAggregatorFunctionSupplier();
default -> throw EsqlIllegalArgumentException.illegalDataType(type);
};
}
@Override
public LastOverTime perTimeSeriesAggregation() {
return this;
}
@Override
public String toString() {
return "last_over_time(" + field() + ")";
}
Expression timestamp() {
return timestamp;
}
}

View file

@ -33,7 +33,7 @@ setup:
path: /_query
parameters: []
# A snapshot function was removed in match_function_options, it can't work on mixed cluster tests otherwise.
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, avg_over_time]
capabilities: [ snapshot_test_for_telemetry, fn_byte_length, match_function_options, last_over_time]
reason: "Test that should only be executed on snapshot versions"
- do: {xpack.usage: {}}
@ -121,7 +121,7 @@ setup:
- match: {esql.functions.coalesce: $functions_coalesce}
- gt: {esql.functions.categorize: $functions_categorize}
# Testing for the entire function set isn't feasbile, so we just check that we return the correct count as an approximation.
- length: {esql.functions: 137} # check the "sister" test below for a likely update to the same esql.functions length check
- length: {esql.functions: 138} # check the "sister" test below for a likely update to the same esql.functions length check
---
"Basic ESQL usage output (telemetry) non-snapshot version":