This commit updates the low-level bounds checks in JDKVectorLibrary and add benchmark, so that we can more easily bench the low-level operations.
Note: I added the mr-jar gradle plugin to the benchmarks so that we can compile with preview features in Java 21, namely MemorySegment.
Keep better track of shard contexts using RefCounted, so they can be released more aggressively during operator processing. For example, during TopN, we can potentially release some contexts if they don't pass the limit filter.
This is done in preparation of TopN fetch optimization, which will delay the fetching of additional columns to the data node coordinator, instead of doing it in each individual worker, thereby reducing IO. Since the node coordinator would need to maintain the shard contexts for a potentially longer duration, it is important we try to release what we can eariler.
An even more advanced optimization is to delay fetching to the main cluster coordinator, but that would be more involved, since we need to first figure out how to transport the shard contexts between nodes.
Summary of main changes:
DocVector now maintains a RefCounted instance per shard.
Things which can build or release DocVectors (e.g., LuceneSourceOperator, TopNOperator), can also hold RefCounted instances, so they can pass them to DocVector and also ensure contexts aren't released if they can still be potentially used later.
Driver's main loop iteration (runSingleLoopIteration), now closes its operators even between different operator processing. This is extra aggressive, and was mostly done to improve testability.
Added a couple of tests to TopNOperator and a new integration test EsqlTopNShardManagementIT, which uses the pausable plugin framework to check that TopNOperator releases things as early as possible..
- Add a new `LongTopNBlockHash` implementation taking care of skipping unused values.
- Add a `TopNUniqueSet` to take care of storing the top N values (without nulls).
- Add a `TopNMultivalueDedupeLong` class helping with it (An adaptation of the existing `MultivalueDedupeLong`).
- Add some tests to `HashAggregationOperator`. It wasn't changed much, but helps a bit with the E2E.
- Add MicroBenchmarks for TopN groupings, to ensure we're actually improving things with this.
When parsing documents, we receive the document as UTF-8 encoded data which
we then parse and convert the fields to java-native UTF-16 encoded Strings.
We then convert these strings back to UTF-8 for storage in lucene.
This patch skips the redundant conversion, instead passing lucene a
direct reference to the received UTF-8 bytes when possible.
This introduces an optimization to pushdown to Lucense those language constructs that aim at case-insensitive regular expression matching, used with `LIKE` and `RLIKE` operators, such as:
* `| WHERE TO_LOWER(field) LIKE "abc*"`
* `| WHERE TO_UPPER(field) RLIKE "ABC.*"`
These are now pushed as case-insensitive `wildcard` and `regexp` respectively queries down to Lucene.
Closes#127479
Creates a `ROUND_TO` function that rounds it's input to one of the
provided values. Like so:
```
ROUND_TO(v, 0, 5000, 10000, 20000, 40000, 100000)
v | ROUND_TO
0 | 0
100 | 0
6000 | 5000
45001 | 40000
999999 | 100000
```
For some sequences of numbers you could do this with the `/` operator -
but for arbitrary sequences of numbers you needed `CASE` which is quite
slow. And hard to read!
Rewriting the example above would look like:
```
CASE (
v < 5000, 0,
v < 10000, 5000,
v < 20000, 10000,
v < 40000, 20000,
v < 100000, 40000,
100000
)
```
Even better, this is *fast*:
```
(operation) Mode Cnt Score Error Units
round_to_4_via_case avgt 7 138.124 ± 0.738 ns/op
round_to_4 avgt 7 0.805 ± 0.011 ns/op
round_to_3 avgt 7 0.739 ± 0.011 ns/op
round_to_2 avgt 7 0.651 ± 0.009 ns/op
date_trunc avgt 7 2.425 ± 0.018 ns/op
```
I've included a comparison to `DATE_TRUNC` above because we should be
able to rewrite `DATE_TRUNC` into `ROUND_TO` when we know the date range
of the index. This doesn't do it now, but it should be possible.
This change updates the code to always create SourceProvider instances via MappingLookup, avoiding direct exposure to the underlying source format (synthetic or stored).
It also aligns source filtering behaviour between SourceProvider and SourceLoader, ensuring consistent application of filters.
This change is needed to enable source filtering to occur earlier in the fetch phase, for example, when constructing a synthetic source.
Currently, time-series aggregations use the `values` aggregation to
collect dimension values. While we might introduce a specialized
aggregation for this in the future, for now, we are using `values`, and
the inputs are likely ordinal blocks. This change speeds up the `values`
aggregation when the inputs are ordinal-based.
Execution time reduced from 461ms to 192ms for 1000 groups.
```
ValuesAggregatorBenchmark.run BytesRef 10000 avgt 7 461.938 ± 6.089 ms/op
ValuesAggregatorBenchmark.run BytesRef 10000 avgt 7 192.898 ± 1.781 ms/op
```
This removes all non-test usage of
Metadata.Builder.put(IndexMetadata, boolean)
And replaces it with appropriate calls to the equivalent method on
`ProjectMetadata.Builder`
In most cases this _does not_ make the code project aware, but does
reduce the number of deprecated methods in use.
This speeds up loading from stored fields by opting more blocks into the
"sequential" strategy. This really kicks in when loading stored fields
like `text`. And when you need less than 100% of documents, but more than,
say, 10%. This is most useful when you need 99.9% of field documents.
That sort of thing. Here's the perf numbers:
```
%100.0 {"took": 403 -> 401,"documents_found":1000000}
%099.9 {"took":3990 -> 436,"documents_found": 999000}
%099.0 {"took":4069 -> 440,"documents_found": 990000}
%090.0 {"took":3468 -> 421,"documents_found": 900000}
%030.0 {"took":1213 -> 152,"documents_found": 300000}
%020.0 {"took": 766 -> 104,"documents_found": 200000}
%010.0 {"took": 397 -> 55,"documents_found": 100000}
%009.0 {"took": 352 -> 375,"documents_found": 90000}
%008.0 {"took": 304 -> 317,"documents_found": 80000}
%007.0 {"took": 273 -> 287,"documents_found": 70000}
%005.0 {"took": 199 -> 204,"documents_found": 50000}
%001.0 {"took": 46 -> 46,"documents_found": 10000}
```
Let's explain this with an example. First, jump to `main` and load a
million documents:
```
rm -f /tmp/bulk
for a in {1..1000}; do
echo '{"index":{}}' >> /tmp/bulk
echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk
done
curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test
for a in {1..1000}; do
echo -n $a:
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors
done
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1
curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh
echo
```
Now query them all. Run this a few times until it's stable:
```
echo -n "%100.0 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
"query": "FROM test | STATS SUM(LENGTH(text))",
"pragma": {
"data_partitioning": "shard"
}
}' | jq -c '{took, documents_found}'
```
Now fetch 99.9% of documents:
```
echo -n "%099.9 "
curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
"query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
"pragma": {
"data_partitioning": "shard"
}
}' | jq -c '{took, documents_found}'
```
This should spit out something like:
```
%100.0 { "took":403,"documents_found":1000000}
%099.9 {"took":4098, "documents_found":999000}
```
We're loading *fewer* documents but it's slower! What in the world?!
If you dig into the profile you'll see that it's value loading:
```
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
"query": "FROM test | STATS SUM(LENGTH(text))",
"pragma": {
"data_partitioning": "shard"
},
"profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
"operator": "ValuesSourceReaderOperator[fields = [text]]",
"status": {
"readers_built": {
"stored_fields[requires_source:true, fields:0, sequential: true]": 222,
"text:column_at_a_time:null": 222,
"text:row_stride:BlockSourceReader.Bytes": 1
},
"values_loaded": 1000000,
"process_nanos": 370687157,
"pages_processed": 222,
"rows_received": 1000000,
"rows_emitted": 1000000
}
}
$ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{
"query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))",
"pragma": {
"data_partitioning": "shard"
},
"profile": true
}' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))'
{
"operator": "ValuesSourceReaderOperator[fields = [text]]",
"status": {
"readers_built": {
"stored_fields[requires_source:true, fields:0, sequential: false]": 222,
"text:column_at_a_time:null": 222,
"text:row_stride:BlockSourceReader.Bytes": 1
},
"values_loaded": 999000,
"process_nanos": 3965803793,
"pages_processed": 222,
"rows_received": 999000,
"rows_emitted": 999000
}
}
```
It jumps from 370ms to almost four seconds! Loading fewer values! The
second big difference is in the `stored_fields` marker. In the second on
it's `sequential: false` and in the first `sequential: true`.
`sequential: true` uses Lucene's "merge" stored fields reader instead of
the default one. It's much more optimized at decoding sequences of
documents.
Previously we only enabled this reader when loading compact sequences of
documents - when the entire block looks like
```
1, 2, 3, 4, 5, ... 1230, 1231
```
If there are any gaps we wouldn't enable it. That was a very
conservative thing we did long ago without doing any experiments. We
knew it was faster without any gaps, but not otherwise. It turns out
it's a lot faster in a lot more cases. I've measured it as faster for
99% gaps, at least on simple documents. I'm a bit worried that this is
too aggressive, so I've set made it configurable and made the default
being to use the "merge" loader with 10% gaps. So we'd use the merge
loader with a block like:
```
1, 11, 21, 31, ..., 1231, 1241
```
Adds a simple script to run benchmarks for ESQL and collect their
results. The script has a `--test` mode which takes about ten minutes.
Running without `--test` takes a four hours fifteen minutes.
To speed up `--test` I reworked the "self test" that each benchmark runs
to be optional and disabled in `--test` mode.
Build jump table (disi) while iterating over SortedNumericDocValues for encoding the values, instead of separately iterating over SortedNumericDocValues just to build the jump table.
In case when indexing sorting is active, this requires an additional merge sort. Follow up from #125403
to use benchmark mode single shot time.
Which makes more sense for benchmarking force merge. The sample time mode would invoke the benchmark methods many times, which in case of force merge is a noop.
This removes all non-test usage of
Metadata.Builder.put(IndexMetadata.Builder)
And replaces it with appropriate calls to the equivalent method on
`ProjectMetadata.Builder`
In most cases this _does not_ make the code project aware, but does
reduce the number of deprecated methods in use
The doc values codec iterates a few times over the doc value instance that needs to be written to disk. In case when merging and index sorting is enabled, this is much more expensive, as each time the doc values instance is iterated a merge sorting is performed (in order to get the doc ids of new segment in order of index sorting).
There are several reasons why the doc value instance is iterated multiple times:
* To compute stats (num values, number of docs with value) required for writing values to disk.
* To write bitset that indicate which documents have a value. (indexed disi, jump table)
* To write the actual values to disk.
* To write the addresses to disk (in case docs have multiple values)
This applies for numeric doc values, but also for the ordinals of sorted (set) doc values.
This PR addresses solving the first reason why doc value instance needs to be iterated. This is done only when in case of merging and when the segments to be merged with are also of type es87 doc values, codec version is the same and there are no deletes. Note this optimized merged is behind a feature flag for now.
Speed up the TO_IP method by converting directly from utf-8 encoded
strings to the ip encoding. Previously we did:
```
utf-8 -> String -> INetAddress -> ip encoding
```
In a step towards solving #125460 this creates three IP parsing
functions, one the rejects leading zeros, one that interprets leading
zeros as decimal numbers, and one the interprets leading zeros as octal
numbers. IPs have historically been parsed in all three of those ways.
This plugs the "rejects leading zeros" parser into `TO_IP` because
that's the behavior it had before.
Here is the performance:
```
Benchmark Score Error Units
leadingZerosAreDecimal 14.007 ± 0.093 ns/op
leadingZerosAreOctal 15.020 ± 0.373 ns/op
leadingZerosRejected 14.176 ± 3.861 ns/op
original 32.950 ± 1.062 ns/op
```
So this is roughly 45% faster than what we had.
Make the conversion functions that process `BytesRef`s into `BytesRefs`
keep the `OrdinalBytesRefVector`s when processing. Let's use `TO_LOWER`
as an example. First, the performance numbers:
```
(operation) Mode Score Error -> Score Error Units
to_lower 30.662 ± 6.163 -> 30.048 ± 0.479 ns/op
to_lower_ords 30.773 ± 0.370 -> 0.025 ± 0.001 ns/op
to_upper 33.552 ± 0.529 -> 35.775 ± 1.799 ns/op
to_upper_ords 35.791 ± 0.658 -> 0.027 ± 0.001 ns/op
```
The test has a 8192 positions containing alternating `foo` and `bar`.
Running `TO_LOWER` via ordinals is super duper faster. No longer
`O(positions)` and now `O(unique_values)`.
Let's paint some pictures! `OrdinalBytesRefVector` is a lookup table.
Like this:
```
+-------+----------+
| bytes | ordinals |
| ----- | -------- |
| FOO | 0 |
| BAR | 1 |
| BAZ | 2 |
+-------+ 1 |
| 1 |
| 0 |
+----------+
```
That lookup table is one block. When you read it you look up the
`ordinal` and match it to the `bytes`. Previously `TO_LOWER` would
process each value one at a time and make:
```
bytes
-----
foo
bar
baz
bar
bar
foo
```
So it'd run `TO_LOWER` once per `ordinal` and it'd make an ordinal
non-lookup table. With this change `TO_LOWER` will now make:
```
+-------+----------+
| bytes | ordinals |
| ----- | -------- |
| foo | 0 |
| bar | 1 |
| baz | 2 |
+-------+ 1 |
| 1 |
| 0 |
+----------+
```
We don't even have to copy the `ordinals` - we can reuse those from the
input and just bump the reference count. That's why this goes from
`O(positions)` to `O(unique_values)`.
Fix the benchmark for `EVAL` which was failing because of a strange
logging error. The benchmarks really didn't want to run when we use
commons-logging. That's fine - we can use the ES logging facade thing. I
also added a test to the benchmarks which should run the self-tests for
`EVAL` on `gradle check`.
Speeds up the VALUES agg when collecting from many buckets.
Specifically, this speeds up the algorithm used to `finish` the
aggregation. Most specifically, this makes the algorithm more tollerant
to large numbers of groups being collected. The old algorithm was
`O(n^2)` with the number of groups. The new one is `O(n)`
```
(groups)
1 219.683 ± 1.069 -> 223.477 ± 1.990 ms/op
1000 426.323 ± 75.963 -> 463.670 ± 7.275 ms/op
100000 36690.871 ± 4656.350 -> 7800.332 ± 2775.869 ms/op
200000 89422.113 ± 2972.606 -> 21920.288 ± 3427.962 ms/op
400000 timed out at 10 minutes -> 40051.524 ± 2011.706 ms/op
```
The `1` group version was not changed at all. That's just noise in the
measurement. The small bump in the `1000` case is almost certainly worth
it and real. The huge drop in the `100000` case is quite real.
To avoid having AggregateMapper find aggregators based on their names with reflection, I'm doing some changes:
- Make the suppliers have methods returning the intermediate states
- To allow this, the suppliers constructor won't receive the chanells as params. Instead, its methods will ask for them
- Most changes in this PR are because of this
- After those changes, I'm leaving AggregateMapper still there, as it still converts AggregateFunctions to its NamedExpressions
```
before after
(operation) Score Error Score Error Units
coalesce_2_noop 75.949 ± 3.961 -> 0.010 ± 0.001 ns/op 99.9%
coalesce_2_eager 99.299 ± 6.959 -> 4.292 ± 0.227 ns/op 95.7%
coalesce_2_lazy 113.118 ± 5.747 -> 26.746 ± 0.954 ns/op 76.4%
```
We tend to advise folks that "COALESCE is faster than CASE", but, as of
8.16.0/https://github.com/elastic/elasticsearch/pull/112295 that wasn't the true. I was working with someone a few
days ago to port a scripted_metric aggregation to ESQL and we saw
COALESCE taking ~60% of the time. That won't do.
The trouble is that CASE and COALESCE have to be *lazy*, meaning that
operations like:
```
COALESCE(a, 1 / b)
```
should never emit a warning if `a` is not `null`, even if `b` is `0`. In
8.16/https://github.com/elastic/elasticsearch/pull/112295 CASE grew an optimization where it could operate non-lazily
if it was flagged as "safe". This brings a similar optimization to
COALESCE, see it above as "case_2_eager", a 95.7% improvement.
It also brings and arguably more important optimization - entire-block
execution for COALESCE. The schort version is that, if the first
parameter of COALESCE returns no nulls we can return it without doing
anything lazily. There are a few more cases, but the upshot is that
COALESCE is pretty much *free* in cases where long strings of results
are `null` or not `null`. That's the `coalesce_2_noop` line.
Finally, when there mixed null and non-null values we were using a
single builder with some fairly inefficient paths. This specializes them
per type and skips some slow null-checking where possible. That's the
`coalesce_2_lazy` result, a more modest 76.4%.
NOTE: These %s of improvements on COALESCE itself, or COALESCE with some load-overhead operators like `+`. If COALESCE isn't taking a *ton* time in your query don't get particularly excited about this. It's fun though.
Closes#119953