Fix channels in TimeSeriesAggregationOperator (#125736)

Fix the channel in TimeSeriesAggregationOperator.

Relates #125537
This commit is contained in:
Nhat Nguyen 2025-03-26 16:53:45 -07:00 committed by GitHub
parent cb44d7a727
commit b882e76a9a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 43 additions and 42 deletions

View file

@ -8,7 +8,6 @@
package org.elasticsearch.compute.aggregation.blockhash; package org.elasticsearch.compute.aggregation.blockhash;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray; import org.elasticsearch.common.util.BitArray;
@ -22,7 +21,6 @@ import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.BytesRefVector;
import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.LongBigArrayVector;
import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.OrdinalBytesRefBlock; import org.elasticsearch.compute.data.OrdinalBytesRefBlock;
@ -52,15 +50,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
private int currentTimestampCount; private int currentTimestampCount;
private final IntArrayWithSize perTsidCountArray; private final IntArrayWithSize perTsidCountArray;
int groupOrdinal = -1;
public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) { public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) {
super(blockFactory); super(blockFactory);
this.tsHashChannel = tsHashChannel; this.tsHashChannel = tsHashChannel;
this.timestampIntervalChannel = timestampIntervalChannel; this.timestampIntervalChannel = timestampIntervalChannel;
this.tsidArray = new BytesRefArrayWithSize(); this.tsidArray = new BytesRefArrayWithSize(blockFactory);
this.timestampArray = new LongArrayWithSize(); this.timestampArray = new LongArrayWithSize(blockFactory);
this.perTsidCountArray = new IntArrayWithSize(); this.perTsidCountArray = new IntArrayWithSize(blockFactory);
} }
@Override @Override
@ -90,8 +86,8 @@ public final class TimeSeriesBlockHash extends BlockHash {
private int addOnePosition(BytesRef tsid, long timestamp) { private int addOnePosition(BytesRef tsid, long timestamp) {
boolean newGroup = false; boolean newGroup = false;
if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) { if (positionCount() == 0 || lastTsid.equals(tsid) == false) {
assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward "; assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward ";
endTsidGroup(); endTsidGroup();
tsidArray.append(tsid); tsidArray.append(tsid);
tsidArray.get(tsidArray.count - 1, lastTsid); tsidArray.get(tsidArray.count - 1, lastTsid);
@ -101,10 +97,9 @@ public final class TimeSeriesBlockHash extends BlockHash {
assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp; assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp;
timestampArray.append(timestamp); timestampArray.append(timestamp);
lastTimestamp = timestamp; lastTimestamp = timestamp;
groupOrdinal++;
currentTimestampCount++; currentTimestampCount++;
} }
return groupOrdinal; return positionCount() - 1;
} }
private void endTsidGroup() { private void endTsidGroup() {
@ -177,7 +172,7 @@ public final class TimeSeriesBlockHash extends BlockHash {
} }
private int positionCount() { private int positionCount() {
return groupOrdinal + 1; return timestampArray.count;
} }
@Override @Override
@ -197,15 +192,17 @@ public final class TimeSeriesBlockHash extends BlockHash {
+ "], LongKey[channel=" + "], LongKey[channel="
+ timestampIntervalChannel + timestampIntervalChannel
+ "]], entries=" + "]], entries="
+ groupOrdinal + positionCount()
+ "b}"; + "b}";
} }
private class LongArrayWithSize implements Releasable { private static class LongArrayWithSize implements Releasable {
private final BlockFactory blockFactory;
private LongArray array; private LongArray array;
private int count = 0; private int count = 0;
LongArrayWithSize() { LongArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = blockFactory.bigArrays().newLongArray(1, false); this.array = blockFactory.bigArrays().newLongArray(1, false);
} }
@ -216,10 +213,12 @@ public final class TimeSeriesBlockHash extends BlockHash {
} }
LongBlock toBlock() { LongBlock toBlock() {
LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock(); try (var builder = blockFactory.newLongVectorFixedBuilder(count)) {
blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array)); for (int i = 0; i < count; i++) {
array = null; builder.appendLong(array.get(i));
return block; }
return builder.build().asBlock();
}
} }
@Override @Override
@ -228,11 +227,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
} }
} }
private class IntArrayWithSize implements Releasable { private static class IntArrayWithSize implements Releasable {
private final BlockFactory blockFactory;
private IntArray array; private IntArray array;
private int count = 0; private int count = 0;
IntArrayWithSize() { IntArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = blockFactory.bigArrays().newIntArray(1, false); this.array = blockFactory.bigArrays().newIntArray(1, false);
} }
@ -248,11 +249,13 @@ public final class TimeSeriesBlockHash extends BlockHash {
} }
} }
private class BytesRefArrayWithSize implements Releasable { private static class BytesRefArrayWithSize implements Releasable {
private final BytesRefArray array; private final BlockFactory blockFactory;
private BytesRefArray array;
private int count = 0; private int count = 0;
BytesRefArrayWithSize() { BytesRefArrayWithSize(BlockFactory blockFactory) {
this.blockFactory = blockFactory;
this.array = new BytesRefArray(1, blockFactory.bigArrays()); this.array = new BytesRefArray(1, blockFactory.bigArrays());
} }
@ -266,8 +269,9 @@ public final class TimeSeriesBlockHash extends BlockHash {
} }
BytesRefVector toVector() { BytesRefVector toVector() {
BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count); BytesRefVector vector = blockFactory.newBytesRefArrayVector(array, count);
blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed()); blockFactory.adjustBreaker(vector.ramBytesUsed() - array.bigArraysRamBytesUsed());
array = null;
return vector; return vector;
} }

View file

@ -11,7 +11,6 @@ import org.elasticsearch.compute.Describable;
import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.AggregatorMode;
import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.GroupingAggregator;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash;
import java.util.List; import java.util.List;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -32,18 +31,17 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator {
) implements OperatorFactory { ) implements OperatorFactory {
@Override @Override
public Operator get(DriverContext driverContext) { public Operator get(DriverContext driverContext) {
return new HashAggregationOperator(aggregators, () -> { // TODO: use TimeSeriesBlockHash when possible
if (aggregatorMode.isInputPartial()) { return new HashAggregationOperator(
return BlockHash.build( aggregators,
List.of(tsidGroup, timestampGroup), () -> BlockHash.build(
driverContext.blockFactory(), List.of(tsidGroup, timestampGroup),
maxPageSize, driverContext.blockFactory(),
true // we can enable optimizations as the inputs are vectors maxPageSize,
); true // we can enable optimizations as the inputs are vectors
} else { ),
return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory()); driverContext
} );
}, driverContext);
} }
@Override @Override

View file

@ -27,9 +27,9 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.Expressions;
import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.FoldContext;
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.expression.NamedExpression;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.evaluator.EvalMapper; import org.elasticsearch.xpack.esql.evaluator.EvalMapper;
import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count;
@ -176,8 +176,7 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper
// time-series aggregation // time-series aggregation
if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator) if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator)
&& groupSpecs.size() == 2 && groupSpecs.size() == 2
&& groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE && groupSpecs.get(0).attribute.name().equals(MetadataAttribute.TSID_FIELD)) {
&& groupSpecs.get(1).attribute.dataType() == DataType.LONG) {
operatorFactory = new TimeSeriesAggregationOperator.Factory( operatorFactory = new TimeSeriesAggregationOperator.Factory(
groupSpecs.get(0).toHashGroupSpec(), groupSpecs.get(0).toHashGroupSpec(),
groupSpecs.get(1).toHashGroupSpec(), groupSpecs.get(1).toHashGroupSpec(),