ESQL: Begin optimizing Block#lookup (#108482)

This creates the infrastructure to allow optimizing the `lookup` method
when applied to `Vector`s and then implements that optimization for
constant vectors. Constant vectors now take one of six paths:
1. An empty positions `Block` yields an empty result set.
2. If `positions` is a `Block`, perform the un-optimized lookup.
3. If the `min` of the `positions` *Vector* is less that 0 then throw an
   exception.
4. If the `min` of the positions Vector is greater than the number of
   positions in the lookup block then return a single
   `ConstantNullBlock` because you are looking up outside the range.
5. If the `max` of the positions Vector is less than the number of
   positions in the lookup block then return a `Constant$Type$Block`
   with the same value as the lookup block. This is a lookup that's
   entirely within range.
6. Otherwise return the unoptimized lookup.

This is *fairly* simple but demonstrates how we can plug in more complex
optimizations later.
This commit is contained in:
Nik Everett 2024-05-10 13:45:42 -04:00 committed by GitHub
parent 2d14095ebf
commit 04d3b9989f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
37 changed files with 431 additions and 28 deletions

View file

@ -46,4 +46,30 @@ public interface ReleasableIterator<T> extends Releasable, Iterator<T> {
};
}
/**
* Returns an empty iterator over the supplied value.
*/
static <T extends Releasable> ReleasableIterator<T> empty() {
return new ReleasableIterator<>() {
@Override
public boolean hasNext() {
return false;
}
@Override
public T next() {
assert false : "hasNext is always false so next should never be called";
return null;
}
@Override
public void close() {}
@Override
public String toString() {
return "ReleasableIterator[<empty>]";
}
};
}
}

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
import java.util.stream.Collectors;
@ -91,6 +93,11 @@ final class BooleanArrayVector extends AbstractVector implements BooleanVector {
}
}
@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated(boolean[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -10,8 +10,10 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -87,6 +89,11 @@ public final class BooleanBigArrayVector extends AbstractVector implements Boole
return new BooleanBigArrayVector(filtered, positions.length, blockFactory);
}
@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link BitArray} is adjusted outside

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -27,6 +29,9 @@ public sealed interface BooleanVector extends Vector permits ConstantBooleanVect
@Override
BooleanVector filter(int... positions);
@Override
ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BooleanVector, and both vectors are {@link #equals(BooleanVector, BooleanVector) equal}.

View file

@ -52,9 +52,8 @@ public final class BooleanVectorBlock extends AbstractVectorBlock implements Boo
}
@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BooleanLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -11,7 +11,9 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import java.io.IOException;
@ -91,6 +93,11 @@ final class BytesRefArrayVector extends AbstractVector implements BytesRefVector
}
}
@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated(BytesRefArray values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -11,6 +11,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -34,6 +36,9 @@ public sealed interface BytesRefVector extends Vector permits ConstantBytesRefVe
@Override
BytesRefVector filter(int... positions);
@Override
ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a BytesRefVector, and both vectors are {@link #equals(BytesRefVector, BytesRefVector) equal}.

View file

@ -63,9 +63,8 @@ public final class BytesRefVectorBlock extends AbstractVectorBlock implements By
}
@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new BytesRefLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant boolean value.
@ -39,6 +41,28 @@ final class ConstantBooleanVector extends AbstractVector implements BooleanVecto
return blockFactory().newConstantBooleanVector(value, positions.length);
}
@Override
public ReleasableIterator<BooleanBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BooleanBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBooleanBlockWith(value, positions.getPositionCount()));
}
return new BooleanLookup(asBlock(), positions, targetBlockSize);
}
@Override
public ElementType elementType() {
return ElementType.BOOLEAN;

View file

@ -9,6 +9,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant BytesRef value.
@ -45,6 +47,28 @@ final class ConstantBytesRefVector extends AbstractVector implements BytesRefVec
return blockFactory().newConstantBytesRefVector(value, positions.length);
}
@Override
public ReleasableIterator<BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((BytesRefBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantBytesRefBlockWith(value, positions.getPositionCount()));
}
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
@Override
public ElementType elementType() {
return ElementType.BYTES_REF;

View file

@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant double value.
@ -39,6 +41,28 @@ final class ConstantDoubleVector extends AbstractVector implements DoubleVector
return blockFactory().newConstantDoubleVector(value, positions.length);
}
@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((DoubleBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantDoubleBlockWith(value, positions.getPositionCount()));
}
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
@Override
public ElementType elementType() {
return ElementType.DOUBLE;

View file

@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant int value.
@ -39,6 +41,28 @@ final class ConstantIntVector extends AbstractVector implements IntVector {
return blockFactory().newConstantIntVector(value, positions.length);
}
@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new IntLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((IntBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantIntBlockWith(value, positions.getPositionCount()));
}
return new IntLookup(asBlock(), positions, targetBlockSize);
}
/**
* The minimum value in the block.
*/

View file

@ -8,6 +8,8 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant long value.
@ -39,6 +41,28 @@ final class ConstantLongVector extends AbstractVector implements LongVector {
return blockFactory().newConstantLongVector(value, positions.length);
}
@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new LongLookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single((LongBlock) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstantLongBlockWith(value, positions.getPositionCount()));
}
return new LongLookup(asBlock(), positions, targetBlockSize);
}
@Override
public ElementType elementType() {
return ElementType.LONG;

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
import java.util.stream.Collectors;
@ -90,6 +92,11 @@ final class DoubleArrayVector extends AbstractVector implements DoubleVector {
}
}
@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated(double[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -10,8 +10,10 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.DoubleArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -86,6 +88,11 @@ public final class DoubleBigArrayVector extends AbstractVector implements Double
return new DoubleBigArrayVector(filtered, positions.length, blockFactory);
}
@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new DoubleLookup(asBlock(), positions, targetBlockSize);
}
@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link DoubleArray} is adjusted outside

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -27,6 +29,9 @@ public sealed interface DoubleVector extends Vector permits ConstantDoubleVector
@Override
DoubleVector filter(int... positions);
@Override
ReleasableIterator<? extends DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a DoubleVector, and both vectors are {@link #equals(DoubleVector, DoubleVector) equal}.

View file

@ -52,9 +52,8 @@ public final class DoubleVectorBlock extends AbstractVectorBlock implements Doub
}
@Override
public ReleasableIterator<DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new DoubleLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends DoubleBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
import java.util.stream.Collectors;
@ -100,6 +102,11 @@ final class IntArrayVector extends AbstractVector implements IntVector {
}
}
@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new IntLookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated(int[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -10,8 +10,10 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.IntArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -126,6 +128,11 @@ public final class IntBigArrayVector extends AbstractVector implements IntVector
return new IntBigArrayVector(filtered, positions.length, blockFactory);
}
@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new IntLookup(asBlock(), positions, targetBlockSize);
}
@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link IntArray} is adjusted outside

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -27,6 +29,9 @@ public sealed interface IntVector extends Vector permits ConstantIntVector, IntA
@Override
IntVector filter(int... positions);
@Override
ReleasableIterator<? extends IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* The minimum value in the Vector. An empty Vector will return {@link Integer#MAX_VALUE}.
*/

View file

@ -52,9 +52,8 @@ public final class IntVectorBlock extends AbstractVectorBlock implements IntBloc
}
@Override
public ReleasableIterator<IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new IntLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends IntBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
import java.util.stream.Collectors;
@ -90,6 +92,11 @@ final class LongArrayVector extends AbstractVector implements LongVector {
}
}
@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new LongLookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated(long[] values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -10,8 +10,10 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.LongArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -86,6 +88,11 @@ public final class LongBigArrayVector extends AbstractVector implements LongVect
return new LongBigArrayVector(filtered, positions.length, blockFactory);
}
@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new LongLookup(asBlock(), positions, targetBlockSize);
}
@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link LongArray} is adjusted outside

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -27,6 +29,9 @@ public sealed interface LongVector extends Vector permits ConstantLongVector, Lo
@Override
LongVector filter(int... positions);
@Override
ReleasableIterator<? extends LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* Compares the given object with this vector for equality. Returns {@code true} if and only if the
* given object is a LongVector, and both vectors are {@link #equals(LongVector, LongVector) equal}.

View file

@ -52,9 +52,8 @@ public final class LongVectorBlock extends AbstractVectorBlock implements LongBl
}
@Override
public ReleasableIterator<LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new LongLookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends LongBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -139,19 +139,19 @@ public interface Block extends Accountable, BlockLoader.Block, NamedWriteable, R
* same number of {@link #getPositionCount() positions} as the {@code positions}
* parameter.
* <p>
* For example, this this block contained {@code [a, b, [b, c]]}
* For example, if this block contained {@code [a, b, [b, c]]}
* and were called with the block {@code [0, 1, 1, [1, 2]]} then the
* result would be {@code [a, b, b, [b, b, c]]}.
* </p>
* <p>
* This process produces {@code count(this) * count(positions)} values per
* positions which could be quite quite large. Instead of returning a single
* positions which could be quite large. Instead of returning a single
* Block, this returns an Iterator of Blocks containing all of the promised
* values.
* </p>
* <p>
* The returned {@link ReleasableIterator} may retain a reference to {@link Block}s
* inside the {@link Page}. Close it to release those references.
* The returned {@link ReleasableIterator} may retain a reference to the
* {@code positions} parameter. Close it to release those references.
* </p>
* <p>
* This block is built using the same {@link BlockFactory} as was used to

View file

@ -9,6 +9,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -45,6 +47,12 @@ public final class ConstantNullVector extends AbstractVector implements BooleanV
throw new UnsupportedOperationException("null vector");
}
@Override
public ReleasableIterator<ConstantNullBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
assert false : "null vector";
throw new UnsupportedOperationException("null vector");
}
@Override
public boolean getBoolean(int position) {
assert false : "null vector";

View file

@ -52,7 +52,7 @@ public class DocBlock extends AbstractVectorBlock implements Block {
@Override
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("can't lookup values from DocBlock");
}
@Override

View file

@ -9,6 +9,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.IntroSorter;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import java.util.Objects;
@ -235,6 +237,11 @@ public final class DocVector extends AbstractVector implements Vector {
}
}
@Override
public ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
throw new UnsupportedOperationException("can't lookup values from DocVector");
}
@Override
public ElementType elementType() {
return ElementType.DOC;

View file

@ -10,6 +10,8 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import java.io.IOException;
@ -120,6 +122,11 @@ public final class OrdinalBytesRefVector extends AbstractNonThreadSafeRefCounted
}
}
@Override
public ReleasableIterator<? extends BytesRefBlock> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new BytesRefLookup(asBlock(), positions, targetBlockSize);
}
@Override
public ElementType elementType() {
return bytes.elementType();

View file

@ -8,8 +8,10 @@
package org.elasticsearch.compute.data;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
/**
* A dense Vector of single values.
@ -35,6 +37,33 @@ public interface Vector extends Accountable, RefCounted, Releasable {
*/
Vector filter(int... positions);
/**
* Builds an Iterator of new {@link Block}s with the same {@link #elementType}
* as this {@link Vector} whose values are copied from positions in this Vector.
* It has the same number of {@link #getPositionCount() positions} as the
* {@code positions} parameter.
* <p>
* For example, if this vector contained {@code [a, b, c]}
* and were called with the block {@code [0, 1, 1, [1, 2]]} then the
* result would be {@code [a, b, b, [b, c]]}.
* </p>
* <p>
* This process produces {@code count(positions)} values per
* positions which could be quite large. Instead of returning a single
* Block, this returns an Iterator of Blocks containing all of the promised
* values.
* </p>
* <p>
* The returned {@link ReleasableIterator} may retain a reference to the
* {@code positions} parameter. Close it to release those references.
* </p>
* <p>
* This block is built using the same {@link BlockFactory} as was used to
* build the {@code positions} parameter.
* </p>
*/
ReleasableIterator<? extends Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
/**
* {@return the element type of this vector}
*/

View file

@ -12,7 +12,9 @@ import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BytesRefArray;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import java.io.IOException;
@ -21,6 +23,8 @@ $else$
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
import java.util.stream.Collectors;
@ -168,6 +172,11 @@ $endif$
}
}
@Override
public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new $Type$Lookup(asBlock(), positions, targetBlockSize);
}
public static long ramBytesEstimated($if(BytesRef)$BytesRefArray$else$$type$[]$endif$ values) {
return BASE_RAM_BYTES_USED + RamUsageEstimator.sizeOf(values);
}

View file

@ -10,8 +10,10 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.$Array$;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -148,6 +150,11 @@ $endif$
return new $Type$BigArrayVector(filtered, positions.length, blockFactory);
}
@Override
public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return new $Type$Lookup(asBlock(), positions, targetBlockSize);
}
@Override
public void closeInternal() {
// The circuit breaker that tracks the values {@link $if(boolean)$Bit$else$$Type$$endif$Array} is adjusted outside

View file

@ -11,6 +11,8 @@ $if(BytesRef)$
import org.apache.lucene.util.BytesRef;
$endif$
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
/**
* Vector implementation that stores a constant $type$ value.
@ -58,6 +60,28 @@ $endif$
return blockFactory().newConstant$Type$Vector(value, positions.length);
}
@Override
public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
if (positions.getPositionCount() == 0) {
return ReleasableIterator.empty();
}
IntVector positionsVector = positions.asVector();
if (positionsVector == null) {
return new $Type$Lookup(asBlock(), positions, targetBlockSize);
}
int min = positionsVector.min();
if (min < 0) {
throw new IllegalArgumentException("invalid position [" + min + "]");
}
if (min > getPositionCount()) {
return ReleasableIterator.single(($Type$Block) positions.blockFactory().newConstantNullBlock(positions.getPositionCount()));
}
if (positionsVector.max() < getPositionCount()) {
return ReleasableIterator.single(positions.blockFactory().newConstant$Type$BlockWith(value, positions.getPositionCount()));
}
return new $Type$Lookup(asBlock(), positions, targetBlockSize);
}
$if(int)$
/**
* The minimum value in the block.

View file

@ -13,6 +13,8 @@ $endif$
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.ReleasableIterator;
import java.io.IOException;
@ -54,6 +56,9 @@ $endif$
@Override
$Type$Vector filter(int... positions);
@Override
ReleasableIterator<? extends $Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize);
$if(int)$
/**
* The minimum value in the Vector. An empty Vector will return {@link Integer#MAX_VALUE}.

View file

@ -72,9 +72,8 @@ $endif$
}
@Override
public ReleasableIterator<$Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
// TODO optimizations
return new $Type$Lookup(this, positions, targetBlockSize);
public ReleasableIterator<? extends $Type$Block> lookup(IntBlock positions, ByteSizeValue targetBlockSize) {
return vector.lookup(positions, targetBlockSize);
}
@Override

View file

@ -35,6 +35,7 @@ import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@ -283,8 +284,19 @@ public class BasicBlockTests extends ESTestCase {
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
List.of(List.of(value), List.of(value), List.of(value, value))
);
assertLookup(
block,
positions(blockFactory, 1, 2),
List.of(List.of(value), List.of(value)),
b -> assertThat(b.asVector(), instanceOf(ConstantIntVector.class))
);
}
assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null));
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
assertEmptyLookup(blockFactory, block);
assertThat(block.asVector().min(), equalTo(value));
assertThat(block.asVector().max(), equalTo(value));
@ -365,8 +377,19 @@ public class BasicBlockTests extends ESTestCase {
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
List.of(List.of(value), List.of(value), List.of(value, value))
);
assertLookup(
block,
positions(blockFactory, 1, 2),
List.of(List.of(value), List.of(value)),
b -> assertThat(b.asVector(), instanceOf(ConstantLongVector.class))
);
}
assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null));
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
assertEmptyLookup(blockFactory, block);
releaseAndAssertBreaker(block);
}
@ -447,8 +470,19 @@ public class BasicBlockTests extends ESTestCase {
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
List.of(List.of(value), List.of(value), List.of(value, value))
);
assertLookup(
block,
positions(blockFactory, 1, 2),
List.of(List.of(value), List.of(value)),
b -> assertThat(b.asVector(), instanceOf(ConstantDoubleVector.class))
);
}
assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null));
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
assertEmptyLookup(blockFactory, block);
releaseAndAssertBreaker(block);
}
@ -605,8 +639,19 @@ public class BasicBlockTests extends ESTestCase {
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
List.of(List.of(value), List.of(value), List.of(value, value))
);
assertLookup(
block,
positions(blockFactory, 1, 2),
List.of(List.of(value), List.of(value)),
b -> assertThat(b.asVector(), instanceOf(ConstantBytesRefVector.class))
);
}
assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null));
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
assertEmptyLookup(blockFactory, block);
releaseAndAssertBreaker(block);
}
@ -689,8 +734,19 @@ public class BasicBlockTests extends ESTestCase {
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
List.of(List.of(value), List.of(value), List.of(value, value))
);
assertLookup(
block,
positions(blockFactory, 1, 2),
List.of(List.of(value), List.of(value)),
b -> assertThat(b.asVector(), instanceOf(ConstantBooleanVector.class))
);
}
assertLookup(block, positions(blockFactory, positionCount + 1000), singletonList(null));
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
assertEmptyLookup(blockFactory, block);
releaseAndAssertBreaker(block);
}
@ -716,6 +772,24 @@ public class BasicBlockTests extends ESTestCase {
assertThat(positionCount, is(block.getPositionCount()));
assertThat(block.getPositionCount(), is(positionCount));
assertThat(block.isNull(randomPosition(positionCount)), is(true));
if (positionCount > 2) {
List<List<Object>> expected = new ArrayList<>();
expected.add(null);
expected.add(null);
expected.add(null);
assertLookup(
block,
positions(blockFactory, 1, 2, new int[] { 1, 2 }),
expected,
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
}
assertLookup(
block,
positions(blockFactory, positionCount + 1000),
singletonList(null),
b -> assertThat(b, instanceOf(ConstantNullBlock.class))
);
releaseAndAssertBreaker(block);
}
}
@ -1544,11 +1618,16 @@ public class BasicBlockTests extends ESTestCase {
}
static void assertLookup(Block block, IntBlock positions, List<List<Object>> expected) {
assertLookup(block, positions, expected, l -> {});
}
static void assertLookup(Block block, IntBlock positions, List<List<Object>> expected, Consumer<Block> extra) {
try (positions; ReleasableIterator<? extends Block> lookup = block.lookup(positions, ByteSizeValue.ofKb(100))) {
assertThat(lookup.hasNext(), equalTo(true));
try (Block b = lookup.next()) {
assertThat(valuesAtPositions(b, 0, b.getPositionCount()), equalTo(expected));
assertThat(b.blockFactory(), sameInstance(positions.blockFactory()));
extra.accept(b);
}
assertThat(lookup.hasNext(), equalTo(false));
}