mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
ESQL: Fix a bug in TOP (#121552)
Fix a bug in TOP which surfaces when merging results from ordinals. We weren't always accounting for oversized arrays when checking if we'd ever seen a field. This changes the oversize itself to always size on a bucket boundary. The test for this required random `bucketSize` - without that the oversizing frequently wouldn't cause trouble.
This commit is contained in:
parent
85da28d919
commit
1e12b547ca
9 changed files with 146 additions and 40 deletions
5
docs/changelog/121552.yaml
Normal file
5
docs/changelog/121552.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 121552
|
||||
summary: Fix a bug in TOP
|
||||
area: ES|QL
|
||||
type: bug
|
||||
issues: []
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BitArray;
|
||||
import org.elasticsearch.common.util.DoubleArray;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -101,7 +102,7 @@ public class DoubleBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = rootIndex + bucketSize;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
assert 0 <= next && next < bucketSize
|
||||
|
@ -257,19 +258,25 @@ public class DoubleBucketedSort implements Releasable {
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = bigArrays.grow(values, minSize);
|
||||
assert oldMax % bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.DOUBLE_PAGE_SIZE, Double.BYTES);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketSize - 1) / bucketSize;
|
||||
values = bigArrays.resize(values, newSize * bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = getBucketSize() - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BitArray;
|
||||
import org.elasticsearch.common.util.FloatArray;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -101,7 +102,7 @@ public class FloatBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = rootIndex + bucketSize;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
assert 0 <= next && next < bucketSize
|
||||
|
@ -257,19 +258,25 @@ public class FloatBucketedSort implements Releasable {
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = bigArrays.grow(values, minSize);
|
||||
assert oldMax % bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.FLOAT_PAGE_SIZE, Float.BYTES);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketSize - 1) / bucketSize;
|
||||
values = bigArrays.resize(values, newSize * bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = getBucketSize() - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BitArray;
|
||||
import org.elasticsearch.common.util.IntArray;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -101,7 +102,7 @@ public class IntBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = rootIndex + bucketSize;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
assert 0 <= next && next < bucketSize
|
||||
|
@ -257,19 +258,25 @@ public class IntBucketedSort implements Releasable {
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = bigArrays.grow(values, minSize);
|
||||
assert oldMax % bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.INT_PAGE_SIZE, Integer.BYTES);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketSize - 1) / bucketSize;
|
||||
values = bigArrays.resize(values, newSize * bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = getBucketSize() - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BitArray;
|
||||
import org.elasticsearch.common.util.LongArray;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -101,7 +102,7 @@ public class LongBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = rootIndex + bucketSize;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
assert 0 <= next && next < bucketSize
|
||||
|
@ -257,19 +258,25 @@ public class LongBucketedSort implements Releasable {
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = bigArrays.grow(values, minSize);
|
||||
assert oldMax % bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.LONG_PAGE_SIZE, Long.BYTES);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketSize - 1) / bucketSize;
|
||||
values = bigArrays.resize(values, newSize * bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = getBucketSize() - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -8,10 +8,12 @@
|
|||
package org.elasticsearch.compute.data.sort;
|
||||
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.RamUsageEstimator;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.ByteUtils;
|
||||
import org.elasticsearch.common.util.ObjectArray;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -29,6 +31,11 @@ import java.util.stream.LongStream;
|
|||
/**
|
||||
* Aggregates the top N variable length {@link BytesRef} values per bucket.
|
||||
* See {@link BucketedSort} for more information.
|
||||
* <p>
|
||||
* This is substantially different from {@link IpBucketedSort} because
|
||||
* this has to handle variable length byte strings. To do that it allocates
|
||||
* a heap of {@link BreakingBytesRefBuilder}s.
|
||||
* </p>
|
||||
*/
|
||||
public class BytesRefBucketedSort implements Releasable {
|
||||
private final BucketedSortCommon common;
|
||||
|
@ -123,7 +130,7 @@ public class BytesRefBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = common.endIndex(rootIndex);
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
common.assertValidNextOffset(next);
|
||||
|
@ -271,13 +278,23 @@ public class BytesRefBucketedSort implements Releasable {
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long requiredSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = common.bigArrays.grow(values, requiredSize);
|
||||
assert oldMax % common.bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(
|
||||
((long) bucket + 1) * common.bucketSize,
|
||||
PageCacheRecycler.OBJECT_PAGE_SIZE,
|
||||
RamUsageEstimator.NUM_BYTES_OBJECT_REF
|
||||
);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + common.bucketSize - 1) / common.bucketSize;
|
||||
values = common.bigArrays.resize(values, newSize * common.bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
fillGatherOffsets(oldMax - (oldMax % common.bucketSize));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -296,6 +313,7 @@ public class BytesRefBucketedSort implements Releasable {
|
|||
bytes.grow(Integer.BYTES);
|
||||
bytes.setLength(Integer.BYTES);
|
||||
ByteUtils.writeIntLE(nextOffset, bytes.bytes(), 0);
|
||||
checkInvariant(Math.toIntExact(bucketRoot / common.bucketSize));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import org.apache.lucene.util.BytesRef;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.ByteArray;
|
||||
import org.elasticsearch.common.util.ByteUtils;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -26,6 +27,11 @@ import java.util.stream.IntStream;
|
|||
/**
|
||||
* Aggregates the top N IP values per bucket.
|
||||
* See {@link BucketedSort} for more information.
|
||||
* <p>
|
||||
* This is substantially different from {@link BytesRefBucketedSort} because
|
||||
* this takes advantage of IPs having a fixed length and allocates a dense
|
||||
* storage for them.
|
||||
* </p>
|
||||
*/
|
||||
public class IpBucketedSort implements Releasable {
|
||||
private static final int IP_LENGTH = 16; // Bytes. It's ipv6.
|
||||
|
@ -101,7 +107,7 @@ public class IpBucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = common.endIndex(rootIndex) * IP_LENGTH;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
common.assertValidNextOffset(next);
|
||||
|
@ -268,17 +274,23 @@ public class IpBucketedSort implements Releasable {
|
|||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size() / IP_LENGTH;
|
||||
values = common.bigArrays.grow(values, minSize);
|
||||
assert oldMax % common.bucketSize == 0;
|
||||
|
||||
int bucketBytes = common.bucketSize * IP_LENGTH;
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketBytes, PageCacheRecycler.BYTE_PAGE_SIZE, 1);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketBytes - 1) / bucketBytes;
|
||||
values = common.bigArrays.resize(values, newSize * bucketBytes);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % common.bucketSize));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = common.bucketSize - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size() / IP_LENGTH; bucketRoot += common.bucketSize) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -10,6 +10,7 @@ package org.elasticsearch.compute.data.sort;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.BitArray;
|
||||
import org.elasticsearch.common.util.$Type$Array;
|
||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.IntVector;
|
||||
|
@ -101,7 +102,7 @@ public class $Type$BucketedSort implements Releasable {
|
|||
// Gathering mode
|
||||
long requiredSize = rootIndex + bucketSize;
|
||||
if (values.size() < requiredSize) {
|
||||
grow(requiredSize);
|
||||
grow(bucket);
|
||||
}
|
||||
int next = getNextGatherOffset(rootIndex);
|
||||
assert 0 <= next && next < bucketSize
|
||||
|
@ -261,19 +262,25 @@ $endif$
|
|||
|
||||
/**
|
||||
* Allocate storage for more buckets and store the "next gather offset"
|
||||
* for those new buckets.
|
||||
* for those new buckets. We always grow the storage by whole bucket's
|
||||
* worth of slots at a time. We never allocate space for partial buckets.
|
||||
*/
|
||||
private void grow(long minSize) {
|
||||
private void grow(int bucket) {
|
||||
long oldMax = values.size();
|
||||
values = bigArrays.grow(values, minSize);
|
||||
assert oldMax % bucketSize == 0;
|
||||
|
||||
long newSize = BigArrays.overSize(((long) bucket + 1) * bucketSize, PageCacheRecycler.$TYPE$_PAGE_SIZE, $BYTES$);
|
||||
// Round up to the next full bucket.
|
||||
newSize = (newSize + bucketSize - 1) / bucketSize;
|
||||
values = bigArrays.resize(values, newSize * bucketSize);
|
||||
// Set the next gather offsets for all newly allocated buckets.
|
||||
setNextGatherOffsets(oldMax - (oldMax % getBucketSize()));
|
||||
fillGatherOffsets(oldMax);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maintain the "next gather offsets" for newly allocated buckets.
|
||||
*/
|
||||
private void setNextGatherOffsets(long startingAt) {
|
||||
private void fillGatherOffsets(long startingAt) {
|
||||
int nextOffset = getBucketSize() - 1;
|
||||
for (long bucketRoot = startingAt; bucketRoot < values.size(); bucketRoot += getBucketSize()) {
|
||||
setNextGatherOffset(bucketRoot, nextOffset);
|
||||
|
|
|
@ -409,6 +409,42 @@ public abstract class BucketedSortTestCase<T extends Releasable, V extends Compa
|
|||
}
|
||||
}
|
||||
|
||||
public final void testMergePastEnd() {
|
||||
int buckets = 10000;
|
||||
int bucketSize = between(1, 1000);
|
||||
int target = between(0, buckets);
|
||||
List<V> values = randomList(buckets, buckets, this::randomValue);
|
||||
Collections.sort(values);
|
||||
try (T sort = build(SortOrder.ASC, bucketSize)) {
|
||||
// Add a single value to the main sort.
|
||||
for (int b = 0; b < buckets; b++) {
|
||||
collect(sort, values.get(b), b);
|
||||
}
|
||||
|
||||
try (T other = build(SortOrder.ASC, bucketSize)) {
|
||||
// Add *all* values to the target bucket of the secondary sort.
|
||||
for (int i = 0; i < values.size(); i++) {
|
||||
if (i != target) {
|
||||
collect(other, values.get(i), target);
|
||||
}
|
||||
}
|
||||
|
||||
// Merge all buckets pairwise. Most of the secondary ones are empty.
|
||||
for (int b = 0; b < buckets; b++) {
|
||||
merge(sort, b, other, b);
|
||||
}
|
||||
}
|
||||
|
||||
for (int b = 0; b < buckets; b++) {
|
||||
if (b == target) {
|
||||
assertBlock(sort, b, values.subList(0, bucketSize));
|
||||
} else {
|
||||
assertBlock(sort, b, List.of(values.get(b)));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void assertBlock(T sort, int groupId, List<V> values) {
|
||||
var blockFactory = TestBlockFactory.getNonBreakingInstance();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue