diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java index 55d46078830c..573bb4d723aa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/TimeSeriesBlockHash.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation.blockhash; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.RamUsageEstimator; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; @@ -22,7 +21,6 @@ import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.LongBigArrayVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.OrdinalBytesRefBlock; @@ -52,15 +50,13 @@ public final class TimeSeriesBlockHash extends BlockHash { private int currentTimestampCount; private final IntArrayWithSize perTsidCountArray; - int groupOrdinal = -1; - public TimeSeriesBlockHash(int tsHashChannel, int timestampIntervalChannel, BlockFactory blockFactory) { super(blockFactory); this.tsHashChannel = tsHashChannel; this.timestampIntervalChannel = timestampIntervalChannel; - this.tsidArray = new BytesRefArrayWithSize(); - this.timestampArray = new LongArrayWithSize(); - this.perTsidCountArray = new IntArrayWithSize(); + this.tsidArray = new BytesRefArrayWithSize(blockFactory); + this.timestampArray = new LongArrayWithSize(blockFactory); + this.perTsidCountArray = new IntArrayWithSize(blockFactory); } @Override @@ -90,8 +86,8 @@ public final class TimeSeriesBlockHash extends BlockHash { private int addOnePosition(BytesRef tsid, long timestamp) { boolean newGroup = false; - if (groupOrdinal == -1 || lastTsid.equals(tsid) == false) { - assert groupOrdinal == -1 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward "; + if (positionCount() == 0 || lastTsid.equals(tsid) == false) { + assert positionCount() == 0 || lastTsid.compareTo(tsid) < 0 : "tsid goes backward "; endTsidGroup(); tsidArray.append(tsid); tsidArray.get(tsidArray.count - 1, lastTsid); @@ -101,10 +97,9 @@ public final class TimeSeriesBlockHash extends BlockHash { assert newGroup || lastTimestamp >= timestamp : "@timestamp goes backward " + lastTimestamp + " < " + timestamp; timestampArray.append(timestamp); lastTimestamp = timestamp; - groupOrdinal++; currentTimestampCount++; } - return groupOrdinal; + return positionCount() - 1; } private void endTsidGroup() { @@ -177,7 +172,7 @@ public final class TimeSeriesBlockHash extends BlockHash { } private int positionCount() { - return groupOrdinal + 1; + return timestampArray.count; } @Override @@ -197,15 +192,17 @@ public final class TimeSeriesBlockHash extends BlockHash { + "], LongKey[channel=" + timestampIntervalChannel + "]], entries=" - + groupOrdinal + + positionCount() + "b}"; } - private class LongArrayWithSize implements Releasable { + private static class LongArrayWithSize implements Releasable { + private final BlockFactory blockFactory; private LongArray array; private int count = 0; - LongArrayWithSize() { + LongArrayWithSize(BlockFactory blockFactory) { + this.blockFactory = blockFactory; this.array = blockFactory.bigArrays().newLongArray(1, false); } @@ -216,10 +213,12 @@ public final class TimeSeriesBlockHash extends BlockHash { } LongBlock toBlock() { - LongBlock block = new LongBigArrayVector(array, count, blockFactory).asBlock(); - blockFactory.adjustBreaker(block.ramBytesUsed() - RamUsageEstimator.sizeOf(array)); - array = null; - return block; + try (var builder = blockFactory.newLongVectorFixedBuilder(count)) { + for (int i = 0; i < count; i++) { + builder.appendLong(array.get(i)); + } + return builder.build().asBlock(); + } } @Override @@ -228,11 +227,13 @@ public final class TimeSeriesBlockHash extends BlockHash { } } - private class IntArrayWithSize implements Releasable { + private static class IntArrayWithSize implements Releasable { + private final BlockFactory blockFactory; private IntArray array; private int count = 0; - IntArrayWithSize() { + IntArrayWithSize(BlockFactory blockFactory) { + this.blockFactory = blockFactory; this.array = blockFactory.bigArrays().newIntArray(1, false); } @@ -248,11 +249,13 @@ public final class TimeSeriesBlockHash extends BlockHash { } } - private class BytesRefArrayWithSize implements Releasable { - private final BytesRefArray array; + private static class BytesRefArrayWithSize implements Releasable { + private final BlockFactory blockFactory; + private BytesRefArray array; private int count = 0; - BytesRefArrayWithSize() { + BytesRefArrayWithSize(BlockFactory blockFactory) { + this.blockFactory = blockFactory; this.array = new BytesRefArray(1, blockFactory.bigArrays()); } @@ -266,8 +269,9 @@ public final class TimeSeriesBlockHash extends BlockHash { } BytesRefVector toVector() { - BytesRefVector vector = blockFactory.newBytesRefArrayVector(tsidArray.array, tsidArray.count); - blockFactory.adjustBreaker(vector.ramBytesUsed() - tsidArray.array.bigArraysRamBytesUsed()); + BytesRefVector vector = blockFactory.newBytesRefArrayVector(array, count); + blockFactory.adjustBreaker(vector.ramBytesUsed() - array.bigArraysRamBytesUsed()); + array = null; return vector; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java index 69e9e0fd6948..73147332590c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java @@ -11,7 +11,6 @@ import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; -import org.elasticsearch.compute.aggregation.blockhash.TimeSeriesBlockHash; import java.util.List; import java.util.function.Supplier; @@ -32,18 +31,17 @@ public class TimeSeriesAggregationOperator extends HashAggregationOperator { ) implements OperatorFactory { @Override public Operator get(DriverContext driverContext) { - return new HashAggregationOperator(aggregators, () -> { - if (aggregatorMode.isInputPartial()) { - return BlockHash.build( - List.of(tsidGroup, timestampGroup), - driverContext.blockFactory(), - maxPageSize, - true // we can enable optimizations as the inputs are vectors - ); - } else { - return new TimeSeriesBlockHash(timestampGroup.channel(), timestampGroup.channel(), driverContext.blockFactory()); - } - }, driverContext); + // TODO: use TimeSeriesBlockHash when possible + return new HashAggregationOperator( + aggregators, + () -> BlockHash.build( + List.of(tsidGroup, timestampGroup), + driverContext.blockFactory(), + maxPageSize, + true // we can enable optimizations as the inputs are vectors + ), + driverContext + ); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index 2cf05dbd4efb..954a8d73d7da 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -27,9 +27,9 @@ import org.elasticsearch.xpack.esql.core.expression.Attribute; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.FoldContext; +import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.evaluator.EvalMapper; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; @@ -176,8 +176,7 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper // time-series aggregation if (Expressions.anyMatch(aggregates, a -> a instanceof ToTimeSeriesAggregator) && groupSpecs.size() == 2 - && groupSpecs.get(0).attribute.dataType() == DataType.TSID_DATA_TYPE - && groupSpecs.get(1).attribute.dataType() == DataType.LONG) { + && groupSpecs.get(0).attribute.name().equals(MetadataAttribute.TSID_FIELD)) { operatorFactory = new TimeSeriesAggregationOperator.Factory( groupSpecs.get(0).toHashGroupSpec(), groupSpecs.get(1).toHashGroupSpec(),