Merge branch 'main' into index_settings_serverless

This commit is contained in:
shainaraskas 2025-06-26 14:34:13 -04:00 committed by GitHub
commit acb4ec8197
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
227 changed files with 5900 additions and 1011 deletions

View file

@ -1,9 +1,9 @@
{
"upstream" : "elastic/elasticsearch",
"targetBranchChoices" : [ "main", "9.0", "8.19", "8.18", "8.17", "8.16", "8.15", "8.14", "8.13", "8.12", "8.11", "8.10", "8.9", "8.8", "8.7", "8.6", "8.5", "8.4", "8.3", "8.2", "8.1", "8.0", "7.17", "6.8" ],
"targetBranchChoices" : [ "main", "9.1", "9.0", "8.19", "8.18", "8.17", "8.16", "8.15", "8.14", "8.13", "8.12", "8.11", "8.10", "8.9", "8.8", "8.7", "8.6", "8.5", "8.4", "8.3", "8.2", "8.1", "8.0", "7.17", "6.8" ],
"targetPRLabels" : [ "backport" ],
"branchLabelMapping" : {
"^v9.1.0$" : "main",
"^v9.2.0$" : "main",
"^v(\\d+).(\\d+).\\d+(?:-(?:alpha|beta|rc)\\d+)?$" : "$1.$2"
}
}

View file

@ -65,7 +65,7 @@ steps:
timeout_in_minutes: 300
matrix:
setup:
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0"]
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0", "9.2.0"]
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2404

View file

@ -382,6 +382,22 @@ steps:
env:
BWC_VERSION: 9.1.0
- label: "{{matrix.image}} / 9.2.0 / packaging-tests-upgrade"
command: ./.ci/scripts/packaging-test.sh -Dbwc.checkout.align=true destructiveDistroUpgradeTest.v9.2.0
timeout_in_minutes: 300
matrix:
setup:
image:
- rocky-8
- ubuntu-2404
agents:
provider: gcp
image: family/elasticsearch-{{matrix.image}}
machineType: custom-16-32768
buildDirectory: /dev/shm/bk
env:
BWC_VERSION: 9.2.0
- group: packaging-tests-windows
steps:
- label: "{{matrix.image}} / packaging-tests-windows"

View file

@ -420,6 +420,25 @@ steps:
- signal_reason: agent_stop
limit: 3
- label: 9.2.0 / bwc
command: .ci/scripts/run-gradle.sh -Dbwc.checkout.align=true v9.2.0#bwcTest
timeout_in_minutes: 300
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2404
machineType: n1-standard-32
buildDirectory: /dev/shm/bk
preemptible: true
env:
BWC_VERSION: 9.2.0
retry:
automatic:
- exit_status: "-1"
limit: 3
signal_reason: none
- signal_reason: agent_stop
limit: 3
- label: concurrent-search-tests
command: .ci/scripts/run-gradle.sh -Dbwc.checkout.align=true -Dtests.jvm.argline=-Des.concurrent_search=true -Des.concurrent_search=true functionalTests
timeout_in_minutes: 420
@ -487,7 +506,7 @@ steps:
setup:
ES_RUNTIME_JAVA:
- openjdk21
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0"]
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0", "9.2.0"]
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2404
@ -531,7 +550,7 @@ steps:
ES_RUNTIME_JAVA:
- openjdk21
- openjdk23
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0"]
BWC_VERSION: ["8.17.9", "8.18.4", "8.19.0", "9.0.4", "9.1.0", "9.2.0"]
agents:
provider: gcp
image: family/elasticsearch-ubuntu-2404

View file

@ -21,3 +21,4 @@ BWC_VERSION:
- "8.19.0"
- "9.0.4"
- "9.1.0"
- "9.2.0"

View file

@ -6,7 +6,8 @@ strip_version() {
}
fetch_build() {
curl -sS https://artifacts-$1.elastic.co/$2/latest/$3.json \
>&2 echo "Checking for build id: https://artifacts-$1.elastic.co/$2/latest/$3.json"
curl -sSf https://artifacts-$1.elastic.co/$2/latest/$3.json \
| jq -r '.build_id'
}
@ -15,7 +16,15 @@ BRANCH="${BRANCH:-$2}"
ES_VERSION="${ES_VERSION:-$3}"
WORKFLOW=${WORKFLOW:-$4}
LATEST_BUILD=$(fetch_build $WORKFLOW $ARTIFACT $BRANCH)
if [[ "$WORKFLOW" == "staging" ]]; then
LATEST_BUILD=$(fetch_build $WORKFLOW $ARTIFACT $ES_VERSION)
elif [[ "$WORKFLOW" == "snapshot" ]]; then
LATEST_BUILD=$(fetch_build $WORKFLOW $ARTIFACT $BRANCH)
else
echo "Unknown workflow: $WORKFLOW"
exit 1
fi
LATEST_VERSION=$(strip_version $LATEST_BUILD)
# If the latest artifact version doesn't match what we expect, try the corresponding version branch.

View file

@ -4,3 +4,4 @@ BWC_VERSION:
- "8.19.0"
- "9.0.4"
- "9.1.0"
- "9.2.0"

View file

@ -40,6 +40,7 @@ import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ShardRefCounted;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.IOUtils;
@ -477,6 +478,7 @@ public class ValuesSourceReaderBenchmark {
pages.add(
new Page(
new DocVector(
ShardRefCounted.ALWAYS_REFERENCED,
blockFactory.newConstantIntBlockWith(0, end - begin).asVector(),
blockFactory.newConstantIntBlockWith(ctx.ord, end - begin).asVector(),
docs.build(),
@ -512,7 +514,14 @@ public class ValuesSourceReaderBenchmark {
if (size >= BLOCK_LENGTH) {
pages.add(
new Page(
new DocVector(blockFactory.newConstantIntVector(0, size), leafs.build(), docs.build(), null).asBlock()
new DocVector(
ShardRefCounted.ALWAYS_REFERENCED,
blockFactory.newConstantIntVector(0, size),
leafs.build(),
docs.build(),
null
).asBlock()
)
);
docs = blockFactory.newIntVectorBuilder(BLOCK_LENGTH);
@ -525,6 +534,8 @@ public class ValuesSourceReaderBenchmark {
pages.add(
new Page(
new DocVector(
ShardRefCounted.ALWAYS_REFERENCED,
blockFactory.newConstantIntBlockWith(0, size).asVector(),
leafs.build().asBlock().asVector(),
docs.build(),
@ -551,6 +562,8 @@ public class ValuesSourceReaderBenchmark {
pages.add(
new Page(
new DocVector(
ShardRefCounted.ALWAYS_REFERENCED,
blockFactory.newConstantIntVector(0, 1),
blockFactory.newConstantIntVector(next.ord, 1),
blockFactory.newConstantIntVector(next.itr.nextInt(), 1),

View file

@ -37,6 +37,7 @@
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]TsdbIT.java" checks="LineLength" />
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]TsdbIndexingRollingUpgradeIT.java" checks="LineLength" />
<suppress files="qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]LogsdbIndexingRollingUpgradeIT.java" checks="LineLength" />
<suppress files="plugin[/\\]logsdb[/\\]qa[/\\]rolling-upgrade[/\\]src[/\\]javaRestTest[/\\]java[/\\]org[/\\]elasticsearch[/\\]upgrades[/\\]MatchOnlyTextRollingUpgradeIT.java" checks="LineLength" />
<!-- Gradle requires inputs to be seriablizable -->
<suppress files="build-tools-internal[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gradle[/\\]internal[/\\]precommit[/\\]TestingConventionRule.java" checks="RegexpSinglelineJava" />

View file

@ -1,4 +1,4 @@
elasticsearch = 9.1.0
elasticsearch = 9.2.0
lucene = 10.2.2
bundled_jdk_vendor = openjdk

View file

@ -0,0 +1,5 @@
pr: 122497
summary: Check if index patterns conform to valid format before validation
area: CCS
type: enhancement
issues: []

View file

@ -0,0 +1,7 @@
pr: 129370
summary: Avoid dropping aggregate groupings in local plans
area: ES|QL
type: bug
issues:
- 129811
- 128054

View file

@ -0,0 +1,5 @@
pr: 129454
summary: Aggressive release of shard contexts
area: ES|QL
type: enhancement
issues: []

View file

@ -0,0 +1,6 @@
pr: 129967
summary: Support returning default `index_options` for `semantic_text` fields when
`include_defaults` is true
area: Search
type: bug
issues: []

View file

@ -0,0 +1,6 @@
pr: 130027
summary: "Fix: prevent duplication of \"invalid index name\" string in the final exception\
\ error message"
area: ES|QL
type: bug
issues: []

View file

@ -0,0 +1,12 @@
pr: 130032
summary: ES|QL cross-cluster querying is now generally available
area: ES|QL
type: feature
issues: []
highlight:
title: ES|QL cross-cluster querying is now generally available
body: |-
The ES|QL Cross-Cluster querying feature has been in technical preview since 8.13.
As of releases 8.19.0 and 9.1.0 this is now generally available.
This feature allows you to run ES|QL queries across multiple clusters.
notable: true

View file

@ -0,0 +1,5 @@
pr: 130083
summary: Fix timeout bug in DBQ deletion of unused and orphan ML data
area: Machine Learning
type: bug
issues: []

View file

@ -33,7 +33,7 @@ $$$search-throttled$$$`search_throttled`
: For analyze requests. Thread pool type is `fixed` with a size of `1`, queue size of `16`.
`write`
: For write operations and ingest processors. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
: For write operations and ingest processors. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `max(10000, (`[`# of allocated processors`](#node.processors)`* 750))`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).
`write_coordination`
: For bulk request coordination operations. Thread pool type is `fixed` with a size of [`# of allocated processors`](#node.processors), queue_size of `10000`. The maximum size for this pool is `1 + `[`# of allocated processors`](#node.processors).

View file

@ -13,7 +13,7 @@ applies_to:
The slow log records database searching and indexing events that have execution durations above specified thresholds. You can use these logs to investigate analyze or troubleshoot your clusters historical search and indexing performance.
Slow logs report task duration at the shard level for searches, and at the index level for indexing, but might not encompass the full task execution time observed on the client. For example, slow logs dont surface HTTP network delays or the impact of [task queues](docs-content://troubleshoot/elasticsearch/task-queue-backlog.md).
Slow logs report task duration at the shard level for searches, and at the index level for indexing, but might not encompass the full task execution time observed on the client. For example, slow logs dont surface HTTP network delays or the impact of [task queues](docs-content://troubleshoot/elasticsearch/task-queue-backlog.md). For more information about the higher-level operations affecting response times, refer to [Reading and writing documents](docs-content://deploy-manage/distributed-architecture/reading-and-writing-documents.md).
Events that meet the specified threshold are emitted into [{{es}} logging](docs-content://deploy-manage/monitor/logging-configuration/update-elasticsearch-logging-levels.md) under the `fileset.name` of `slowlog`. These logs can be viewed in the following locations:

View file

@ -112,32 +112,11 @@ to create the endpoint. If not specified, the {{infer}} endpoint defined by
`inference_id` will be used at both index and query time.
`index_options`
: (Optional, string) Specifies the index options to override default values
: (Optional, object) Specifies the index options to override default values
for the field. Currently, `dense_vector` index options are supported.
For text embeddings, `index_options` may match any allowed
[dense_vector index options](/reference/elasticsearch/mapping-reference/dense-vector.md#dense-vector-index-options).
An example of how to set index_options for a `semantic_text` field:
```console
PUT my-index-000004
{
"mappings": {
"properties": {
"inference_field": {
"type": "semantic_text",
"inference_id": "my-text-embedding-endpoint",
"index_options": {
"dense_vector": {
"type": "int4_flat"
}
}
}
}
}
}
```
`chunking_settings`
: (Optional, object) Settings for chunking text into smaller passages.
If specified, these will override the chunking settings set in the {{infer-cap}}
@ -165,7 +144,7 @@ To completely disable chunking, use the `none` chunking strategy.
or `1`. Required for `sentence` type chunking settings
::::{warning}
If the input exceeds the maximum token limit of the underlying model, some
When using the `none` chunking strategy, if the input exceeds the maximum token limit of the underlying model, some
services (such as OpenAI) may return an
error. In contrast, the `elastic` and `elasticsearch` services will
automatically truncate the input to fit within the
@ -315,18 +294,38 @@ specified. It enables you to quickstart your semantic search by providing
automatic {{infer}} and a dedicated query so you dont need to provide further
details.
In case you want to customize data indexing, use the [
`sparse_vector`](/reference/elasticsearch/mapping-reference/sparse-vector.md)
or [`dense_vector`](/reference/elasticsearch/mapping-reference/dense-vector.md)
field types and create an ingest pipeline with
an [{{infer}} processor](/reference/enrich-processor/inference-processor.md) to
generate the
embeddings. [This tutorial](docs-content://solutions/search/semantic-search/semantic-search-inference.md)
walks you through the process. In these cases - when you use `sparse_vector` or
`dense_vector` field types instead of the `semantic_text` field type to
customize indexing - using the [
`semantic_query`](/reference/query-languages/query-dsl/query-dsl-semantic-query.md)
is not supported for querying the field data.
If you want to override those defaults and customize the embeddings that
`semantic_text` indexes, you can do so by modifying <<semantic-text-params,
parameters>>:
- Use `index_options` to specify alternate index options such as specific
`dense_vector` quantization methods
- Use `chunking_settings` to override the chunking strategy associated with the
{{infer}} endpoint, or completely disable chunking using the `none` type
Here is an example of how to set these parameters for a text embedding endpoint:
```console
PUT my-index-000004
{
"mappings": {
"properties": {
"inference_field": {
"type": "semantic_text",
"inference_id": "my-text-embedding-endpoint",
"index_options": {
"dense_vector": {
"type": "int4_flat"
}
},
"chunking_settings": {
"type": "none"
}
}
}
}
}
```
## Updates to `semantic_text` fields [update-script]

View file

@ -12,23 +12,15 @@ If you are migrating from a version prior to version 9.0, you must first upgrade
% ## Next version [elasticsearch-nextversion-breaking-changes]
```{applies_to}
stack: coming 9.0.3
```
## 9.0.3 [elasticsearch-9.0.3-breaking-changes]
No breaking changes in this version.
```{applies_to}
stack: coming 9.0.2
```
## 9.0.2 [elasticsearch-9.0.2-breaking-changes]
Snapshot/Restore:
* Make S3 custom query parameter optional [#128043](https://github.com/elastic/elasticsearch/pull/128043)
## 9.0.1 [elasticsearch-9.0.1-breaking-changes]
No breaking changes in this version.

View file

@ -1,6 +1,6 @@
version: 9.0.3
released: false
generated: 2025-06-21T00:06:16.346021604Z
released: true
generated: 2025-06-24T15:19:29.859630035Z
changelogs:
- pr: 120869
summary: Threadpool merge scheduler

View file

@ -16,19 +16,11 @@ To give you insight into what deprecated features youre using, {{es}}:
% ## Next version [elasticsearch-nextversion-deprecations]
```{applies_to}
stack: coming 9.0.3
```
## 9.0.3 [elasticsearch-9.0.3-deprecations]
Engine:
* Deprecate `indices.merge.scheduler.use_thread_pool` setting [#129464](https://github.com/elastic/elasticsearch/pull/129464)
```{applies_to}
stack: coming 9.0.2
```
## 9.0.2 [elasticsearch-9.0.2-deprecations]
No deprecations in this version.

View file

@ -21,9 +21,6 @@ To check for security updates, go to [Security announcements for the Elastic sta
% *
## 9.0.3 [elasticsearch-9.0.3-release-notes]
```{applies_to}
stack: coming 9.0.3
```
### Features and enhancements [elasticsearch-9.0.3-features-enhancements]
@ -92,7 +89,6 @@ Searchable Snapshots:
Security:
* Fix error message when changing the password for a user in the file realm [#127621](https://github.com/elastic/elasticsearch/pull/127621)
## 9.0.2 [elasticsearch-9.0.2-release-notes]
### Features and enhancements [elasticsearch-9.0.2-features-enhancements]

View file

@ -1153,16 +1153,16 @@
<sha256 value="88ac9fd1bb51f82bcc664cc1eb9c225c90dc4389d660231b4cc737bebfe7d0aa" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.squareup.okhttp3" name="okhttp" version="4.11.0">
<artifact name="okhttp-4.11.0.jar">
<sha256 value="ee8f6bd6cd1257013d748330f4ca147638a9fbcb52fb388d5ac93cf53408745d" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.squareup.okhttp3" name="logging-interceptor" version="4.12.0">
<artifact name="logging-interceptor-4.12.0.jar">
<sha256 value="f3e8d5f0903c250c2b55d2f47fcfe008e80634385da8385161c7a63aaed0c74c" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.squareup.okhttp3" name="okhttp" version="4.11.0">
<artifact name="okhttp-4.11.0.jar">
<sha256 value="ee8f6bd6cd1257013d748330f4ca147638a9fbcb52fb388d5ac93cf53408745d" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="com.squareup.okhttp3" name="okhttp" version="4.12.0">
<artifact name="okhttp-4.12.0.jar">
<sha256 value="b1050081b14bb7a3a7e55a4d3ef01b5dcfabc453b4573a4fc019767191d5f4e0" origin="Generated by Gradle"/>
@ -4601,6 +4601,11 @@
<sha256 value="8cadd43ac5eb6d09de05faecca38b917a040bb9139c7edeb4cc81c740b713281" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm" version="9.8">
<artifact name="asm-9.8.jar">
<sha256 value="876eab6a83daecad5ca67eb9fcabb063c97b5aeb8cf1fca7a989ecde17522051" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-analysis" version="7.2">
<artifact name="asm-analysis-7.2.jar">
<sha256 value="be922aae60ff1ff1768e8e6544a38a7f92bd0a6d6b0b9791f94955d1bd453de2" origin="Generated by Gradle"/>
@ -4621,6 +4626,11 @@
<sha256 value="85b29371884ba31bb76edf22323c2c24e172c3267a67152eba3d1ccc2e041ef2" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-analysis" version="9.8">
<artifact name="asm-analysis-9.8.jar">
<sha256 value="e640732fbcd3c6271925a504f125e38384688f4dfbbf92c8622dfcee0d09edb9" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-commons" version="7.2">
<artifact name="asm-commons-7.2.jar">
<sha256 value="0e86b8b179c5fb223d1a880a0ff4960b6978223984b94e62e71135f2d8ea3558" origin="Generated by Gradle"/>
@ -4686,6 +4696,11 @@
<sha256 value="9929881f59eb6b840e86d54570c77b59ce721d104e6dfd7a40978991c2d3b41f" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-tree" version="9.8">
<artifact name="asm-tree-9.8.jar">
<sha256 value="14b7880cb7c85eed101e2710432fc3ffb83275532a6a894dc4c4095d49ad59f1" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-util" version="7.2">
<artifact name="asm-util-7.2.jar">
<sha256 value="6e24913b021ffacfe8e7e053d6e0ccc731941148cfa078d4f1ed3d96904530f8" origin="Generated by Gradle"/>
@ -4696,6 +4711,11 @@
<sha256 value="f885be71b5c90556f5f1ad1c4f9276b29b96057c497d46666fe4ddbec3cb43c6" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.ow2.asm" name="asm-util" version="9.8">
<artifact name="asm-util-9.8.jar">
<sha256 value="8ba0460ecb28fd0e2980e5f3ef3433a513a457bc077f81a53bdc75b587a08d15" origin="Generated by Gradle"/>
</artifact>
</component>
<component group="org.reactivestreams" name="reactive-streams" version="1.0.4">
<artifact name="reactive-streams-1.0.4.jar">
<sha256 value="f75ca597789b3dac58f61857b9ac2e1034a68fa672db35055a8fb4509e325f28" origin="Generated by Gradle"/>

View file

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.core;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Annotation to identify a block of code (a whole class, a method, a field, or a local variable) that is intentionally not fully
* project-aware because it's not intended to be used in a serverless environment. Some features are unavailable in serverless and are
* thus not worth the investment to make fully project-aware. This annotation makes it easier to identify blocks of code that require
* attention in case those features are revisited from a multi-project POV.
*/
@Retention(RetentionPolicy.SOURCE)
@Target(
{ ElementType.LOCAL_VARIABLE, ElementType.CONSTRUCTOR, ElementType.FIELD, ElementType.METHOD, ElementType.TYPE, ElementType.MODULE }
)
public @interface NotMultiProjectCapable {
/**
* Some explanation on why the block of code would not work in a multi-project context and/or what would need to be done to make it
* properly project-aware.
*/
String description() default "";
}

View file

@ -202,6 +202,11 @@ public enum Releasables {
}
}
/** Creates a {@link Releasable} that calls {@link RefCounted#decRef()} when closed. */
public static Releasable fromRefCounted(RefCounted refCounted) {
return () -> refCounted.decRef();
}
private static class ReleaseOnce extends AtomicReference<Releasable> implements Releasable {
ReleaseOnce(Releasable releasable) {
super(releasable);

View file

@ -13,10 +13,10 @@ dependencies {
compileOnly project(':libs:entitlement')
compileOnly project(':libs:core')
compileOnly project(':libs:logging')
implementation 'org.ow2.asm:asm:9.7.1'
implementation 'org.ow2.asm:asm-util:9.7.1'
implementation 'org.ow2.asm:asm-tree:9.7.1'
implementation 'org.ow2.asm:asm-analysis:9.7.1'
implementation 'org.ow2.asm:asm:9.8'
implementation 'org.ow2.asm:asm-util:9.8'
implementation 'org.ow2.asm:asm-tree:9.8'
implementation 'org.ow2.asm:asm-analysis:9.8'
testImplementation project(":test:framework")
testImplementation project(":libs:entitlement:bridge")
}

View file

@ -98,6 +98,10 @@ public class EntitlementBootstrap {
);
exportInitializationToAgent();
loadAgent(findAgentJar(), EntitlementInitialization.class.getName());
if (EntitlementInitialization.getError() != null) {
throw EntitlementInitialization.getError();
}
}
private static Path getUserHome() {

View file

@ -117,6 +117,10 @@ class DynamicInstrumentation {
// We should have failed already in the loop above, but just in case we did not, rethrow.
throw e;
}
if (transformer.hadErrors()) {
throw new RuntimeException("Failed to transform JDK classes for entitlements");
}
}
private static Map<MethodKey, CheckMethod> getMethodsToInstrument(Class<?> checkerInterface) throws ClassNotFoundException,

View file

@ -16,11 +16,14 @@ import org.elasticsearch.entitlement.runtime.policy.PathLookup;
import org.elasticsearch.entitlement.runtime.policy.PolicyChecker;
import org.elasticsearch.entitlement.runtime.policy.PolicyCheckerImpl;
import org.elasticsearch.entitlement.runtime.policy.PolicyManager;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import java.lang.instrument.Instrumentation;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Objects.requireNonNull;
@ -32,17 +35,26 @@ import static java.util.Objects.requireNonNull;
* to begin injecting our instrumentation.
*/
public class EntitlementInitialization {
private static final Logger logger = LogManager.getLogger(EntitlementInitialization.class);
private static final Module ENTITLEMENTS_MODULE = PolicyManager.class.getModule();
public static InitializeArgs initializeArgs;
private static ElasticsearchEntitlementChecker checker;
private static AtomicReference<RuntimeException> error = new AtomicReference<>();
// Note: referenced by bridge reflectively
public static EntitlementChecker checker() {
return checker;
}
/**
* Return any exception that occurred during initialization
*/
public static RuntimeException getError() {
return error.get();
}
/**
* Initializes the Entitlement system:
* <ol>
@ -62,10 +74,16 @@ public class EntitlementInitialization {
*
* @param inst the JVM instrumentation class instance
*/
public static void initialize(Instrumentation inst) throws Exception {
// the checker _MUST_ be set before _any_ instrumentation is done
checker = initChecker(initializeArgs.policyManager());
initInstrumentation(inst);
public static void initialize(Instrumentation inst) {
try {
// the checker _MUST_ be set before _any_ instrumentation is done
checker = initChecker(initializeArgs.policyManager());
initInstrumentation(inst);
} catch (Exception e) {
// exceptions thrown within the agent will be swallowed, so capture it here
// instead so that it can be retrieved by bootstrap
error.set(new RuntimeException("Failed to initialize entitlements", e));
}
}
/**

View file

@ -9,16 +9,22 @@
package org.elasticsearch.entitlement.instrumentation;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import java.lang.instrument.ClassFileTransformer;
import java.security.ProtectionDomain;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A {@link ClassFileTransformer} that applies an {@link Instrumenter} to the appropriate classes.
*/
public class Transformer implements ClassFileTransformer {
private static final Logger logger = LogManager.getLogger(Transformer.class);
private final Instrumenter instrumenter;
private final Set<String> classesToTransform;
private final AtomicBoolean hadErrors = new AtomicBoolean(false);
private boolean verifyClasses;
@ -33,6 +39,10 @@ public class Transformer implements ClassFileTransformer {
this.verifyClasses = true;
}
public boolean hadErrors() {
return hadErrors.get();
}
@Override
public byte[] transform(
ClassLoader loader,
@ -42,13 +52,19 @@ public class Transformer implements ClassFileTransformer {
byte[] classfileBuffer
) {
if (classesToTransform.contains(className)) {
// System.out.println("Transforming " + className);
return instrumenter.instrumentClass(className, classfileBuffer, verifyClasses);
logger.debug("Transforming " + className);
try {
return instrumenter.instrumentClass(className, classfileBuffer, verifyClasses);
} catch (Throwable t) {
hadErrors.set(true);
logger.error("Failed to instrument class " + className, t);
// throwing an exception from a transformer results in the exception being swallowed,
// effectively the same as returning null anyways, so we instead log it here completely
return null;
}
} else {
// System.out.println("Not transforming " + className);
logger.trace("Not transforming " + className);
return null;
}
}
// private static final Logger LOGGER = LogManager.getLogger(Transformer.class);
}

View file

@ -54,6 +54,7 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
@ -261,13 +262,17 @@ public class TransportGetDataStreamsAction extends TransportLocalProjectMetadata
Settings settings = dataStream.getEffectiveSettings(state.metadata());
ilmPolicyName = settings.get(IndexMetadata.LIFECYCLE_NAME);
if (indexMode == null && state.metadata().templatesV2().get(indexTemplate) != null) {
indexMode = resolveMode(
state,
indexSettingProviders,
dataStream,
settings,
dataStream.getEffectiveIndexTemplate(state.metadata())
);
try {
indexMode = resolveMode(
state,
indexSettingProviders,
dataStream,
settings,
dataStream.getEffectiveIndexTemplate(state.metadata())
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
indexTemplatePreferIlmValue = PREFER_ILM_SETTING.get(settings);
} else {

View file

@ -258,6 +258,7 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
2,
ds2.getMetadata(),
ds2.getSettings(),
ds2.getMappings(),
ds2.isHidden(),
ds2.isReplicated(),
ds2.isSystem(),

View file

@ -22,3 +22,8 @@ dependencies {
tasks.withType(Test).configureEach {
it.systemProperty "tests.multi_project.enabled", true
}
// Exclude multi-project tests from release build
tasks.named { it == "javaRestTest" || it == "yamlRestTest" }.configureEach {
it.onlyIf("snapshot build") { buildParams.snapshotBuild }
}

View file

@ -14,4 +14,6 @@ module org.elasticsearch.mapper.extras {
requires org.apache.lucene.core;
requires org.apache.lucene.memory;
requires org.apache.lucene.queries;
exports org.elasticsearch.index.mapper.extras;
}

View file

@ -173,7 +173,7 @@ public class MatchOnlyTextFieldMapper extends FieldMapper {
super(name, true, false, false, tsi, meta);
this.indexAnalyzer = Objects.requireNonNull(indexAnalyzer);
this.textFieldType = new TextFieldType(name, isSyntheticSource);
this.originalName = isSyntheticSource ? name() + "._original" : null;
this.originalName = isSyntheticSource ? name + "._original" : null;
}
public MatchOnlyTextFieldType(String name) {
@ -362,10 +362,38 @@ public class MatchOnlyTextFieldMapper extends FieldMapper {
return toQuery(query, queryShardContext);
}
private static class BytesFromMixedStringsBytesRefBlockLoader extends BlockStoredFieldsReader.StoredFieldsBlockLoader {
BytesFromMixedStringsBytesRefBlockLoader(String field) {
super(field);
}
@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.bytesRefs(expectedCount);
}
@Override
public RowStrideReader rowStrideReader(LeafReaderContext context) throws IOException {
return new BlockStoredFieldsReader.Bytes(field) {
private final BytesRef scratch = new BytesRef();
@Override
protected BytesRef toBytesRef(Object v) {
if (v instanceof BytesRef b) {
return b;
} else {
assert v instanceof String;
return BlockSourceReader.toBytesRef(scratch, v.toString());
}
}
};
}
}
@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (textFieldType.isSyntheticSource()) {
return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(storedFieldNameForSyntheticSource());
return new BytesFromMixedStringsBytesRefBlockLoader(storedFieldNameForSyntheticSource());
}
SourceValueFetcher fetcher = SourceValueFetcher.toString(blContext.sourcePaths(name()));
// MatchOnlyText never has norms, so we have to use the field names field
@ -386,7 +414,12 @@ public class MatchOnlyTextFieldMapper extends FieldMapper {
) {
@Override
protected BytesRef storedToBytesRef(Object stored) {
return (BytesRef) stored;
if (stored instanceof BytesRef storedBytes) {
return storedBytes;
} else {
assert stored instanceof String;
return new BytesRef(stored.toString());
}
}
};
}
@ -477,7 +510,12 @@ public class MatchOnlyTextFieldMapper extends FieldMapper {
() -> new StringStoredFieldFieldLoader(fieldType().storedFieldNameForSyntheticSource(), fieldType().name(), leafName()) {
@Override
protected void write(XContentBuilder b, Object value) throws IOException {
b.value(((BytesRef) value).utf8ToString());
if (value instanceof BytesRef valueBytes) {
b.value(valueBytes.utf8ToString());
} else {
assert value instanceof String;
b.value(value.toString());
}
}
}
);

View file

@ -10,6 +10,9 @@
package org.elasticsearch.index.mapper.extras;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValuesType;
import org.apache.lucene.index.IndexOptions;
@ -21,6 +24,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.CannedTokenStream;
import org.apache.lucene.tests.analysis.Token;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexSettings;
@ -350,4 +354,29 @@ public class MatchOnlyTextFieldMapperTests extends MapperTestCase {
assertThat(fields, empty());
}
}
public void testLoadSyntheticSourceFromStringOrBytesRef() throws IOException {
DocumentMapper mapper = createSytheticSourceMapperService(mapping(b -> {
b.startObject("field1").field("type", "match_only_text").endObject();
b.startObject("field2").field("type", "match_only_text").endObject();
})).documentMapper();
try (Directory directory = newDirectory()) {
RandomIndexWriter iw = indexWriterForSyntheticSource(directory);
LuceneDocument document = new LuceneDocument();
document.add(new StringField("field1", "foo", Field.Store.NO));
document.add(new StoredField("field1._original", "foo"));
document.add(new StringField("field2", "bar", Field.Store.NO));
document.add(new StoredField("field2._original", new BytesRef("bar")));
iw.addDocument(document);
iw.close();
try (DirectoryReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
String syntheticSource = syntheticSource(mapper, null, indexReader, 0);
assertEquals("{\"field1\":\"foo\",\"field2\":\"bar\"}", syntheticSource);
}
}
}
}

View file

@ -3,8 +3,6 @@ setup:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
pagerank:

View file

@ -10,8 +10,6 @@
indices.create:
index: test2
body:
settings:
number_of_replicas: 0
mappings:
properties:
pagerank:
@ -29,8 +27,6 @@
indices.create:
index: test1
body:
settings:
number_of_replicas: 0
mappings:
properties:
pagerank:

View file

@ -3,8 +3,6 @@ setup:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
tags:

View file

@ -3,8 +3,6 @@ setup:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
"properties":
"number":

View file

@ -7,8 +7,6 @@ setup:
indices.create:
index: test
body:
settings:
number_of_replicas: 0
mappings:
properties:
a_field:

View file

@ -182,9 +182,6 @@ tests:
- class: org.elasticsearch.blocks.SimpleBlocksIT
method: testConcurrentAddBlock
issue: https://github.com/elastic/elasticsearch/issues/122324
- class: org.elasticsearch.xpack.ilm.TimeSeriesLifecycleActionsIT
method: testHistoryIsWrittenWithFailure
issue: https://github.com/elastic/elasticsearch/issues/123203
- class: org.elasticsearch.packaging.test.DockerTests
method: test151MachineDependentHeapWithSizeOverride
issue: https://github.com/elastic/elasticsearch/issues/123437
@ -281,9 +278,6 @@ tests:
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
method: testSearchWithRandomDisconnects
issue: https://github.com/elastic/elasticsearch/issues/122707
- class: org.elasticsearch.index.engine.ThreadPoolMergeSchedulerTests
method: testSchedulerCloseWaitsForRunningMerge
issue: https://github.com/elastic/elasticsearch/issues/125236
- class: org.elasticsearch.packaging.test.DockerTests
method: test020PluginsListWithNoPlugins
issue: https://github.com/elastic/elasticsearch/issues/126232
@ -473,12 +467,6 @@ tests:
- class: org.elasticsearch.packaging.test.DockerTests
method: test081SymlinksAreFollowedWithEnvironmentVariableFiles
issue: https://github.com/elastic/elasticsearch/issues/128867
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
method: testAvailableDiskSpaceMonitorWhenFileSystemStatErrors
issue: https://github.com/elastic/elasticsearch/issues/129149
- class: org.elasticsearch.index.engine.ThreadPoolMergeExecutorServiceDiskSpaceTests
method: testUnavailableBudgetBlocksNewMergeTasksFromStartingExecution
issue: https://github.com/elastic/elasticsearch/issues/129148
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeForkIT
method: test {lookup-join.EnrichLookupStatsBug ASYNC}
issue: https://github.com/elastic/elasticsearch/issues/129228
@ -536,9 +524,6 @@ tests:
- class: org.elasticsearch.search.query.VectorIT
method: testFilteredQueryStrategy
issue: https://github.com/elastic/elasticsearch/issues/129517
- class: org.elasticsearch.test.apmintegration.TracesApmIT
method: testApmIntegration
issue: https://github.com/elastic/elasticsearch/issues/129651
- class: org.elasticsearch.snapshots.SnapshotShutdownIT
method: testSnapshotShutdownProgressTracker
issue: https://github.com/elastic/elasticsearch/issues/129752
@ -548,9 +533,6 @@ tests:
- class: org.elasticsearch.qa.verify_version_constants.VerifyVersionConstantsIT
method: testLuceneVersionConstant
issue: https://github.com/elastic/elasticsearch/issues/125638
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/129819
- class: org.elasticsearch.index.store.FsDirectoryFactoryTests
method: testPreload
issue: https://github.com/elastic/elasticsearch/issues/129852
@ -564,7 +546,26 @@ tests:
method: "builds distribution from branches via archives extractedAssemble [bwcDistVersion: 8.2.1, bwcProject: bugfix, expectedAssembleTaskName:
extractedAssemble, #2]"
issue: https://github.com/elastic/elasticsearch/issues/119871
- class: org.elasticsearch.xpack.inference.qa.mixed.CohereServiceMixedIT
method: testRerank
issue: https://github.com/elastic/elasticsearch/issues/130009
- class: org.elasticsearch.xpack.inference.qa.mixed.CohereServiceMixedIT
method: testCohereEmbeddings
issue: https://github.com/elastic/elasticsearch/issues/130010
- class: org.elasticsearch.xpack.esql.qa.multi_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/130067
- class: geoip.GeoIpMultiProjectIT
issue: https://github.com/elastic/elasticsearch/issues/130073
- class: org.elasticsearch.xpack.esql.qa.single_node.GenerativeIT
method: test
issue: https://github.com/elastic/elasticsearch/issues/130067
- class: org.elasticsearch.xpack.esql.action.EnrichIT
method: testTopN
issue: https://github.com/elastic/elasticsearch/issues/130122
- class: org.elasticsearch.action.support.ThreadedActionListenerTests
method: testRejectionHandling
issue: https://github.com/elastic/elasticsearch/issues/130129
# Examples:
#

View file

@ -23,6 +23,7 @@ apply plugin: 'elasticsearch.rest-resources'
dependencies {
restTestConfig project(path: ':modules:aggregations', configuration: 'restTests')
restTestConfig project(path: ':modules:mapper-extras', configuration: 'restTests')
}
restResources {

View file

@ -208,6 +208,7 @@ public class TransportVersions {
public static final TransportVersion SPARSE_VECTOR_FIELD_PRUNING_OPTIONS_8_19 = def(8_841_0_58);
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);
@ -323,6 +324,7 @@ public class TransportVersions {
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED = def(9_109_00_0);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION = def(9_110_0_00);
public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00);
public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00);
/*
* STOP! READ THIS FIRST! No, really,

View file

@ -217,7 +217,8 @@ public class Version implements VersionId<Version>, ToXContentFragment {
public static final Version V_9_0_3 = new Version(9_00_03_99);
public static final Version V_9_0_4 = new Version(9_00_04_99);
public static final Version V_9_1_0 = new Version(9_01_00_99);
public static final Version CURRENT = V_9_1_0;
public static final Version V_9_2_0 = new Version(9_02_00_99);
public static final Version CURRENT = V_9_2_0;
private static final NavigableMap<Integer, Version> VERSION_IDS;
private static final Map<String, Version> VERSION_STRINGS;

View file

@ -19,18 +19,24 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -51,6 +57,14 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
private static final ParseField ALLOW_AUTO_CREATE = new ParseField("allow_auto_create");
private static final ParseField IGNORE_MISSING_COMPONENT_TEMPLATES = new ParseField("ignore_missing_component_templates");
private static final ParseField DEPRECATED = new ParseField("deprecated");
public static final CompressedXContent EMPTY_MAPPINGS;
static {
try {
EMPTY_MAPPINGS = new CompressedXContent(Map.of());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<ComposableIndexTemplate, Void> PARSER = new ConstructingObjectParser<>(
@ -338,6 +352,64 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
return mergedIndexTemplateBuilder.build();
}
public ComposableIndexTemplate mergeMappings(CompressedXContent mappings) throws IOException {
Objects.requireNonNull(mappings);
if (Mapping.EMPTY.toCompressedXContent().equals(mappings) && this.template() != null && this.template().mappings() != null) {
return this;
}
ComposableIndexTemplate.Builder mergedIndexTemplateBuilder = this.toBuilder();
Template.Builder mergedTemplateBuilder;
CompressedXContent templateMappings;
if (this.template() == null) {
mergedTemplateBuilder = Template.builder();
templateMappings = null;
} else {
mergedTemplateBuilder = Template.builder(this.template());
templateMappings = this.template().mappings();
}
mergedTemplateBuilder.mappings(templateMappings == null ? mappings : merge(templateMappings, mappings));
mergedIndexTemplateBuilder.template(mergedTemplateBuilder);
return mergedIndexTemplateBuilder.build();
}
@SuppressWarnings("unchecked")
private CompressedXContent merge(CompressedXContent originalMapping, CompressedXContent mappingAddition) throws IOException {
Map<String, Object> mappingAdditionMap = XContentHelper.convertToMap(mappingAddition.uncompressed(), true, XContentType.JSON).v2();
Map<String, Object> combinedMappingMap = new HashMap<>();
if (originalMapping != null) {
Map<String, Object> originalMappingMap = XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON)
.v2();
if (originalMappingMap.containsKey(MapperService.SINGLE_MAPPING_NAME)) {
combinedMappingMap.putAll((Map<String, ?>) originalMappingMap.get(MapperService.SINGLE_MAPPING_NAME));
} else {
combinedMappingMap.putAll(originalMappingMap);
}
}
XContentHelper.update(combinedMappingMap, mappingAdditionMap, true);
return convertMappingMapToXContent(combinedMappingMap);
}
private static CompressedXContent convertMappingMapToXContent(Map<String, Object> rawAdditionalMapping) throws IOException {
CompressedXContent compressedXContent;
if (rawAdditionalMapping.isEmpty()) {
compressedXContent = EMPTY_MAPPINGS;
} else {
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) {
compressedXContent = mappingFromXContent(parser);
}
}
return compressedXContent;
}
private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException {
XContentParser.Token token = parser.nextToken();
if (token == XContentParser.Token.START_OBJECT) {
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}
@Override
public int hashCode() {
return Objects.hash(

View file

@ -29,6 +29,7 @@ import org.elasticsearch.cluster.metadata.DataStreamLifecycle.DownsamplingRound;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
@ -47,9 +48,11 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
@ -58,6 +61,7 @@ import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -70,6 +74,7 @@ import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.IndexSettings.LIFECYCLE_ORIGINATION_DATE;
@ -89,6 +94,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
public static final String FAILURE_STORE_PREFIX = ".fs-";
public static final DateFormatter DATE_FORMATTER = DateFormatter.forPattern("uuuu.MM.dd");
public static final String TIMESTAMP_FIELD_NAME = "@timestamp";
// Timeseries indices' leaf readers should be sorted by desc order of their timestamp field, as it allows search time optimizations
public static final Comparator<LeafReader> TIMESERIES_LEAF_READERS_SORTER = Comparator.comparingLong((LeafReader r) -> {
try {
@ -120,6 +126,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
@Nullable
private final Map<String, Object> metadata;
private final Settings settings;
private final CompressedXContent mappings;
private final boolean hidden;
private final boolean replicated;
private final boolean system;
@ -156,6 +163,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
generation,
metadata,
Settings.EMPTY,
EMPTY_MAPPINGS,
hidden,
replicated,
system,
@ -176,6 +184,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
long generation,
Map<String, Object> metadata,
Settings settings,
CompressedXContent mappings,
boolean hidden,
boolean replicated,
boolean system,
@ -192,6 +201,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,
@ -210,6 +220,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
long generation,
Map<String, Object> metadata,
Settings settings,
CompressedXContent mappings,
boolean hidden,
boolean replicated,
boolean system,
@ -225,6 +236,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
this.generation = generation;
this.metadata = metadata;
this.settings = Objects.requireNonNull(settings);
this.mappings = Objects.requireNonNull(mappings);
assert system == false || hidden; // system indices must be hidden
this.hidden = hidden;
this.replicated = replicated;
@ -286,11 +298,18 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
} else {
settings = Settings.EMPTY;
}
CompressedXContent mappings;
if (in.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
mappings = CompressedXContent.readCompressedString(in);
} else {
mappings = EMPTY_MAPPINGS;
}
return new DataStream(
name,
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,
@ -381,8 +400,8 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
return backingIndices.rolloverOnWrite;
}
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) {
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings);
public ComposableIndexTemplate getEffectiveIndexTemplate(ProjectMetadata projectMetadata) throws IOException {
return getMatchingIndexTemplate(projectMetadata).mergeSettings(settings).mergeMappings(mappings);
}
public Settings getEffectiveSettings(ProjectMetadata projectMetadata) {
@ -391,6 +410,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
return templateSettings.merge(settings);
}
public CompressedXContent getEffectiveMappings(ProjectMetadata projectMetadata) throws IOException {
return getMatchingIndexTemplate(projectMetadata).mergeMappings(mappings).template().mappings();
}
private ComposableIndexTemplate getMatchingIndexTemplate(ProjectMetadata projectMetadata) {
return lookupTemplateForDataStream(name, projectMetadata);
}
@ -510,6 +533,10 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
return settings;
}
public CompressedXContent getMappings() {
return mappings;
}
@Override
public boolean isHidden() {
return hidden;
@ -1354,6 +1381,9 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
|| out.getTransportVersion().isPatchFrom(TransportVersions.SETTINGS_IN_DATA_STREAMS_8_19)) {
settings.writeTo(out);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.MAPPINGS_IN_DATA_STREAMS)) {
mappings.writeTo(out);
}
}
public static final ParseField NAME_FIELD = new ParseField("name");
@ -1376,6 +1406,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");
public static final ParseField SETTINGS_FIELD = new ParseField("settings");
public static final ParseField MAPPINGS_FIELD = new ParseField("mappings");
@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>(
@ -1385,6 +1416,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
(Long) args[2],
(Map<String, Object>) args[3],
args[17] == null ? Settings.EMPTY : (Settings) args[17],
args[18] == null ? EMPTY_MAPPINGS : (CompressedXContent) args[18],
args[4] != null && (boolean) args[4],
args[5] != null && (boolean) args[5],
args[6] != null && (boolean) args[6],
@ -1456,6 +1488,18 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
DATA_STREAM_OPTIONS_FIELD
);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> Settings.fromXContent(p), SETTINGS_FIELD);
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> {
XContentParser.Token token = p.currentToken();
if (token == XContentParser.Token.VALUE_STRING) {
return new CompressedXContent(Base64.getDecoder().decode(p.text()));
} else if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
return new CompressedXContent(p.binaryValue());
} else if (token == XContentParser.Token.START_OBJECT) {
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(p.mapOrdered())));
} else {
throw new IllegalArgumentException("Unexpected token: " + token);
}
}, MAPPINGS_FIELD, ObjectParser.ValueType.VALUE_OBJECT_ARRAY);
}
public static DataStream fromXContent(XContentParser parser) throws IOException {
@ -1520,6 +1564,20 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
builder.startObject(SETTINGS_FIELD.getPreferredName());
this.settings.toXContent(builder, params);
builder.endObject();
String context = params.param(Metadata.CONTEXT_MODE_PARAM, Metadata.CONTEXT_MODE_API);
boolean binary = params.paramAsBoolean("binary", false);
if (Metadata.CONTEXT_MODE_API.equals(context) || binary == false) {
Map<String, Object> uncompressedMapping = XContentHelper.convertToMap(this.mappings.uncompressed(), true, XContentType.JSON)
.v2();
if (uncompressedMapping.isEmpty() == false) {
builder.field(MAPPINGS_FIELD.getPreferredName());
builder.map(uncompressedMapping);
}
} else {
builder.field(MAPPINGS_FIELD.getPreferredName(), mappings.compressed());
}
builder.endObject();
return builder;
}
@ -1864,6 +1922,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
@Nullable
private Map<String, Object> metadata = null;
private Settings settings = Settings.EMPTY;
private CompressedXContent mappings = EMPTY_MAPPINGS;
private boolean hidden = false;
private boolean replicated = false;
private boolean system = false;
@ -1892,6 +1951,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
generation = dataStream.generation;
metadata = dataStream.metadata;
settings = dataStream.settings;
mappings = dataStream.mappings;
hidden = dataStream.hidden;
replicated = dataStream.replicated;
system = dataStream.system;
@ -1928,6 +1988,11 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
return this;
}
public Builder setMappings(CompressedXContent mappings) {
this.mappings = mappings;
return this;
}
public Builder setHidden(boolean hidden) {
this.hidden = hidden;
return this;
@ -1989,6 +2054,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
generation,
metadata,
settings,
mappings,
hidden,
replicated,
system,

View file

@ -332,6 +332,7 @@ public class MetadataCreateDataStreamService {
initialGeneration,
template.metadata() != null ? Map.copyOf(template.metadata()) : null,
Settings.EMPTY,
ComposableIndexTemplate.EMPTY_MAPPINGS,
hidden,
false,
isSystem,

View file

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.RatioValue;
import org.elasticsearch.core.TimeValue;
/**
* Settings definitions for the write load allocation decider and associated infrastructure
*/
public class WriteLoadConstraintSettings {
private static final String SETTING_PREFIX = "cluster.routing.allocation.write_load_decider.";
public enum WriteLoadDeciderStatus {
/**
* The decider is disabled
*/
DISABLED,
/**
* Only the low-threshold is enabled (write-load will not trigger rebalance)
*/
LOW_ONLY,
/**
* The decider is enabled
*/
ENABLED
}
public static final Setting<WriteLoadDeciderStatus> WRITE_LOAD_DECIDER_ENABLED_SETTING = Setting.enumSetting(
WriteLoadDeciderStatus.class,
SETTING_PREFIX + "enabled",
WriteLoadDeciderStatus.DISABLED,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* The threshold over which we consider write thread pool utilization "high"
*/
public static final Setting<RatioValue> WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING = new Setting<>(
SETTING_PREFIX + "high_utilization_threshold",
"90%",
RatioValue::parseRatioValue,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* The duration for which we need to see "high" utilization before we consider the low threshold exceeded
*/
public static final Setting<TimeValue> WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING = Setting.timeSetting(
SETTING_PREFIX + "high_utilization_duration",
TimeValue.timeValueMinutes(10),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* When the decider is {@link WriteLoadDeciderStatus#ENABLED}, the write-load monitor will call
* {@link RerouteService#reroute(String, Priority, ActionListener)} when we see tasks being delayed by this amount of time
* (but no more often than {@link #WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING})
*/
public static final Setting<TimeValue> WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING = Setting.timeSetting(
SETTING_PREFIX + "queue_latency_threshold",
TimeValue.timeValueSeconds(30),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* How often the data node calculates the write-loads for the individual shards
*/
public static final Setting<TimeValue> WRITE_LOAD_DECIDER_SHARD_WRITE_LOAD_POLLING_INTERVAL_SETTING = Setting.timeSetting(
SETTING_PREFIX + "shard_write_load_polling_interval",
TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
/**
* The minimum amount of time between successive calls to reroute to address write load hot-spots
*/
public static final Setting<TimeValue> WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING = Setting.timeSetting(
SETTING_PREFIX + "reroute_interval",
TimeValue.timeValueSeconds(60),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
}

View file

@ -46,6 +46,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.allocation.DataTier;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings;
import org.elasticsearch.cluster.routing.allocation.allocator.AllocationBalancingRoundSummaryService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceComputer;
@ -643,6 +644,12 @@ public final class ClusterSettings extends AbstractScopedSettings {
DataStreamFailureStoreSettings.DATA_STREAM_FAILURE_STORED_ENABLED_SETTING,
IndexingStatsSettings.RECENT_WRITE_LOAD_HALF_LIFE_SETTING,
SearchStatsSettings.RECENT_READ_LOAD_HALF_LIFE_SETTING,
TransportGetAllocationStatsAction.CACHE_TTL_SETTING
TransportGetAllocationStatsAction.CACHE_TTL_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_HIGH_UTILIZATION_DURATION_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_QUEUE_LATENCY_THRESHOLD_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_SHARD_WRITE_LOAD_POLLING_INTERVAL_SETTING,
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_REROUTE_INTERVAL_SETTING
);
}

View file

@ -2199,7 +2199,7 @@ public abstract class Engine implements Closeable {
awaitPendingClose();
}
private void awaitPendingClose() {
protected final void awaitPendingClose() {
try {
closedLatch.await();
} catch (InterruptedException e) {

View file

@ -469,7 +469,7 @@ public abstract class BlockSourceReader implements BlockLoader.RowStrideReader {
/**
* Convert a {@link String} into a utf-8 {@link BytesRef}.
*/
static BytesRef toBytesRef(BytesRef scratch, String v) {
public static BytesRef toBytesRef(BytesRef scratch, String v) {
int len = UnicodeUtil.maxUTF8Length(v.length());
if (scratch.bytes.length < len) {
scratch.bytes = new byte[len];

View file

@ -35,10 +35,10 @@ public abstract class BlockStoredFieldsReader implements BlockLoader.RowStrideRe
return true;
}
private abstract static class StoredFieldsBlockLoader implements BlockLoader {
public abstract static class StoredFieldsBlockLoader implements BlockLoader {
protected final String field;
StoredFieldsBlockLoader(String field) {
public StoredFieldsBlockLoader(String field) {
this.field = field;
}
@ -112,10 +112,10 @@ public abstract class BlockStoredFieldsReader implements BlockLoader.RowStrideRe
}
}
private abstract static class Bytes extends BlockStoredFieldsReader {
public abstract static class Bytes extends BlockStoredFieldsReader {
private final String field;
Bytes(String field) {
public Bytes(String field) {
this.field = field;
}

View file

@ -124,7 +124,8 @@ public class HighlightPhase implements FetchSubPhase {
if (fieldNameContainsWildcards) {
if (fieldType.typeName().equals(TextFieldMapper.CONTENT_TYPE) == false
&& fieldType.typeName().equals(KeywordFieldMapper.CONTENT_TYPE) == false
&& fieldType.typeName().equals("match_only_text") == false) {
&& fieldType.typeName().equals("match_only_text") == false
&& fieldType.typeName().equals("patterned_text") == false) {
continue;
}
if (highlighter.canHighlight(fieldType) == false) {

View file

@ -115,6 +115,10 @@ public abstract class SearchContext implements Releasable {
closeFuture.onResponse(null);
}
public final boolean isClosed() {
return closeFuture.isDone();
}
/**
* Should be called before executing the main query and after all other parameters have been set.
*/

View file

@ -54,7 +54,8 @@ public class DefaultBuiltInExecutorBuilders implements BuiltInExecutorBuilders {
settings,
ThreadPool.Names.WRITE,
allocatedProcessors,
10000,
// 10,000 for all nodes with 8 cores or fewer. Scale up once we have more than 8 cores.
Math.max(allocatedProcessors * 750, 10000),
new EsExecutors.TaskTrackingConfig(true, indexAutoscalingEWMA)
)
);

View file

@ -29,6 +29,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.cluster.metadata.ComposableIndexTemplate.EMPTY_MAPPINGS;
import static org.elasticsearch.cluster.metadata.DataStream.TIMESTAMP_FIELD_NAME;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
@ -283,9 +284,7 @@ public class ComposableIndexTemplateTests extends SimpleDiffableSerializationTes
}
public void testMergeEmptySettingsIntoTemplateWithNonEmptySettings() {
// We only have settings from the template, so the effective template will just be the original template
Settings templateSettings = randomSettings();
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(randomMappings(null));
// Attempting to merge in null settings ought to fail
ComposableIndexTemplate indexTemplate = randomInstance();
expectThrows(NullPointerException.class, () -> indexTemplate.mergeSettings(null));
assertThat(indexTemplate.mergeSettings(Settings.EMPTY), equalTo(indexTemplate));
@ -325,12 +324,14 @@ public class ComposableIndexTemplateTests extends SimpleDiffableSerializationTes
.put("index.setting3", "templateValue")
.put("index.setting4", "templateValue")
.build();
List<String> componentTemplates = List.of("component_template_1");
CompressedXContent templateMappings = randomMappings(randomDataStreamTemplate());
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.componentTemplates(componentTemplates)
.build();
Settings mergedSettings = Settings.builder()
.put("index.setting1", "dataStreamValue")
@ -342,7 +343,67 @@ public class ComposableIndexTemplateTests extends SimpleDiffableSerializationTes
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(expectedTemplateBuilder)
.componentTemplates(componentTemplates)
.build();
assertThat(indexTemplate.mergeSettings(dataStreamSettings), equalTo(expectedEffectiveTemplate));
}
public void testMergeEmptyMappingsIntoTemplateWithNonEmptySettings() throws IOException {
// Attempting to merge in null mappings ought to fail
ComposableIndexTemplate indexTemplate = randomInstance();
expectThrows(NullPointerException.class, () -> indexTemplate.mergeMappings(null));
ComposableIndexTemplate mergedTemplate = indexTemplate.mergeMappings(EMPTY_MAPPINGS);
if (indexTemplate.template() == null || indexTemplate.template().mappings() == null) {
assertThat(mergedTemplate.template().mappings(), equalTo(EMPTY_MAPPINGS));
} else {
assertThat(mergedTemplate, equalTo(indexTemplate));
}
assertThat(indexTemplate.mergeSettings(Settings.EMPTY), equalTo(indexTemplate));
}
public void testMergeNonEmptyMappingsIntoTemplateWithEmptyMappings() throws IOException {
// We only have settings from the data stream, so we expect to get only those back in the effective template
CompressedXContent dataStreamMappings = randomMappings(randomDataStreamTemplate());
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
Settings templateSettings = Settings.EMPTY;
CompressedXContent templateMappings = new CompressedXContent(Map.of("_doc", Map.of()));
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.build();
Template.Builder expectedTemplateBuilder = Template.builder().settings(templateSettings).mappings(dataStreamMappings);
ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(expectedTemplateBuilder)
.build();
assertThat(indexTemplate.mergeMappings(dataStreamMappings), equalTo(expectedEffectiveTemplate));
}
public void testMergeMappings() throws IOException {
// Here we have settings from both the template and the data stream, so we expect the data stream settings to take precedence
CompressedXContent dataStreamMappings = new CompressedXContent(Map.of());
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
CompressedXContent templateMappings = new CompressedXContent(Map.of("_doc", Map.of()));
Settings templateSettings = randomSettings();
List<String> componentTemplates = List.of("component_template_1");
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.componentTemplates(componentTemplates)
.build();
Template.Builder expectedTemplateBuilder = Template.builder().settings(templateSettings).mappings(EMPTY_MAPPINGS);
ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(expectedTemplateBuilder)
.componentTemplates(componentTemplates)
.build();
ComposableIndexTemplate merged = indexTemplate.mergeMappings(dataStreamMappings);
assertThat(merged, equalTo(expectedEffectiveTemplate));
}
}

View file

@ -53,7 +53,6 @@ import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultFailureStoreName;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance;
@ -98,6 +97,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
var generation = instance.getGeneration();
var metadata = instance.getMetadata();
var settings = instance.getSettings();
var mappings = instance.getMappings();
var isHidden = instance.isHidden();
var isReplicated = instance.isReplicated();
var isSystem = instance.isSystem();
@ -110,7 +110,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
var autoShardingEvent = instance.getAutoShardingEvent();
var failureRolloverOnWrite = instance.getFailureComponent().isRolloverOnWrite();
var failureAutoShardingEvent = instance.getDataComponent().getAutoShardingEvent();
switch (between(0, 16)) {
switch (between(0, 17)) {
case 0 -> name = randomAlphaOfLength(10);
case 1 -> indices = randomNonEmptyIndexInstances();
case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10);
@ -179,6 +179,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
? null
: new DataStreamAutoShardingEvent(indices.getLast().getName(), randomIntBetween(1, 10), randomMillisUpToYear9999());
case 16 -> settings = randomValueOtherThan(settings, DataStreamTestHelper::randomSettings);
case 17 -> mappings = randomValueOtherThan(mappings, ComponentTemplateTests::randomMappings);
}
return new DataStream(
@ -186,6 +187,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
generation,
metadata,
settings,
mappings,
isHidden,
isReplicated,
isSystem,
@ -1948,6 +1950,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
generation,
metadata,
randomSettings(),
randomMappings(),
isSystem,
randomBoolean(),
isSystem,
@ -2141,6 +2144,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2160,6 +2164,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2186,6 +2191,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2211,6 +2217,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2234,6 +2241,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2266,6 +2274,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
randomNonNegativeInt(),
null,
randomSettings(),
randomMappings(),
hidden,
replicated,
system,
@ -2529,13 +2538,31 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.componentTemplates(List.of("component-template-1"))
.build();
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault())
.indexTemplates(Map.of(dataStream.getName(), indexTemplate));
.indexTemplates(Map.of(dataStream.getName(), indexTemplate))
.componentTemplates(
Map.of(
"component-template-1",
new ComponentTemplate(
Template.builder()
.settings(
Settings.builder()
.put("index.setting1", "componentTemplateValue")
.put("index.setting5", "componentTemplateValue")
)
.build(),
1L,
Map.of()
)
)
);
Settings mergedSettings = Settings.builder()
.put("index.setting1", "dataStreamValue")
.put("index.setting2", "dataStreamValue")
.put("index.setting4", "templateValue")
.put("index.setting5", "componentTemplateValue")
.build();
assertThat(dataStream.getEffectiveSettings(projectMetadataBuilder.build()), equalTo(mergedSettings));
}
@ -2547,28 +2574,40 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
assertThrows(IllegalArgumentException.class, () -> dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()));
}
public void testGetEffectiveIndexTemplateTemplateSettingsOnly() {
// We only have settings from the template, so the effective template will just be the original template
DataStream dataStream = createDataStream(Settings.EMPTY);
public void testGetEffectiveIndexTemplateTemplateNoOverrides() throws IOException {
// We only have settings and mappings from the template, so the effective template will just be the original template
DataStream dataStream = createDataStream(Settings.EMPTY, ComposableIndexTemplate.EMPTY_MAPPINGS);
Settings templateSettings = randomSettings();
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(randomMappings());
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.componentTemplates(List.of("component-template-1"))
.build();
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault())
.indexTemplates(Map.of(dataStream.getName(), indexTemplate));
.indexTemplates(Map.of(dataStream.getName(), indexTemplate))
.componentTemplates(
Map.of(
"component-template-1",
new ComponentTemplate(
Template.builder().settings(Settings.builder().put("index.setting5", "componentTemplateValue")).build(),
1L,
Map.of()
)
)
);
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(indexTemplate));
}
public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() {
public void testGetEffectiveIndexTemplateDataStreamSettingsOnly() throws IOException {
// We only have settings from the data stream, so we expect to get only those back in the effective template
Settings dataStreamSettings = randomSettings();
DataStream dataStream = createDataStream(dataStreamSettings);
DataStream dataStream = createDataStream(dataStreamSettings, ComposableIndexTemplate.EMPTY_MAPPINGS);
Settings templateSettings = Settings.EMPTY;
CompressedXContent templateMappings = randomMappings();
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
@ -2585,20 +2624,80 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate));
}
public void testGetEffectiveIndexTemplate() {
public void testGetEffectiveIndexTemplate() throws IOException {
// Here we have settings from both the template and the data stream, so we expect the data stream settings to take precedence
Settings dataStreamSettings = Settings.builder()
.put("index.setting1", "dataStreamValue")
.put("index.setting2", "dataStreamValue")
.put("index.setting3", (String) null) // This one gets removed from the effective settings
.build();
DataStream dataStream = createDataStream(dataStreamSettings);
CompressedXContent dataStreamMappings = new CompressedXContent(
Map.of("properties", Map.of("field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")))
);
DataStream dataStream = createDataStream(dataStreamSettings, dataStreamMappings);
Settings templateSettings = Settings.builder()
.put("index.setting1", "templateValue")
.put("index.setting3", "templateValue")
.put("index.setting4", "templateValue")
.build();
CompressedXContent templateMappings = randomMappings();
CompressedXContent templateMappings = new CompressedXContent(
Map.of("_doc", Map.of("properties", Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword"))))
);
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
List<String> componentTemplates = List.of("component-template-1");
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(templateBuilder)
.componentTemplates(componentTemplates)
.build();
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault())
.indexTemplates(Map.of(dataStream.getName(), indexTemplate))
.componentTemplates(
Map.of(
"component-template-1",
new ComponentTemplate(
Template.builder().settings(Settings.builder().put("index.setting5", "componentTemplateValue")).build(),
1L,
Map.of()
)
)
);
Settings mergedSettings = Settings.builder()
.put("index.setting1", "dataStreamValue")
.put("index.setting2", "dataStreamValue")
.put("index.setting4", "templateValue")
.build();
CompressedXContent mergedMappings = new CompressedXContent(
Map.of(
"properties",
Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "text"), "field3", Map.of("type", "keyword"))
)
);
Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(mergedMappings);
ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(expectedTemplateBuilder)
.componentTemplates(componentTemplates)
.build();
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate));
}
public void testGetEffectiveMappingsNoMatchingTemplate() {
// No matching template, so we expect an IllegalArgumentException
DataStream dataStream = createTestInstance();
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault());
assertThrows(IllegalArgumentException.class, () -> dataStream.getEffectiveMappings(projectMetadataBuilder.build()));
}
public void testGetEffectiveIndexTemplateDataStreamMappingsOnly() throws IOException {
// We only have mappings from the data stream, so we expect to get only those back in the effective template
CompressedXContent dataStreamMappings = randomMappings();
DataStream dataStream = createDataStream(Settings.EMPTY, dataStreamMappings);
Settings templateSettings = Settings.EMPTY;
CompressedXContent templateMappings = new CompressedXContent(Map.of("_doc", Map.of()));
;
Template.Builder templateBuilder = Template.builder().settings(templateSettings).mappings(templateMappings);
ComposableIndexTemplate indexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
@ -2607,12 +2706,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
.build();
ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(randomProjectIdOrDefault())
.indexTemplates(Map.of(dataStream.getName(), indexTemplate));
Settings mergedSettings = Settings.builder()
.put("index.setting1", "dataStreamValue")
.put("index.setting2", "dataStreamValue")
.put("index.setting4", "templateValue")
.build();
Template.Builder expectedTemplateBuilder = Template.builder().settings(mergedSettings).mappings(templateMappings);
Template.Builder expectedTemplateBuilder = Template.builder().settings(templateSettings).mappings(dataStreamMappings);
ComposableIndexTemplate expectedEffectiveTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream.getName()))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
@ -2621,11 +2715,30 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
assertThat(dataStream.getEffectiveIndexTemplate(projectMetadataBuilder.build()), equalTo(expectedEffectiveTemplate));
}
private static CompressedXContent randomMappings() {
try {
return new CompressedXContent("{\"_doc\": {\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}}");
} catch (IOException e) {
fail("got an IO exception creating fake mappings: " + e);
return null;
}
}
private DataStream createDataStream(Settings settings) {
DataStream dataStream = createTestInstance();
return dataStream.copy().setSettings(settings).build();
}
private DataStream createDataStream(CompressedXContent mappings) {
DataStream dataStream = createTestInstance();
return dataStream.copy().setMappings(mappings).build();
}
private DataStream createDataStream(Settings settings, CompressedXContent mappings) {
DataStream dataStream = createTestInstance();
return dataStream.copy().setSettings(settings).setMappings(mappings).build();
}
private record DataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis, Long originationTimeInMillis) {
public static DataStreamMetadata dataStreamMetadata(Long creationTimeInMillis, Long rolloverTimeInMillis) {
return new DataStreamMetadata(creationTimeInMillis, rolloverTimeInMillis, null);
@ -2669,4 +2782,19 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
builder.put(im, false);
}
}
@Override
protected ToXContent.Params getToXContentParams() {
if (randomBoolean()) {
return ToXContent.EMPTY_PARAMS;
}
return new ToXContent.MapParams(
Map.of(
"binary",
randomFrom("true", "false"),
Metadata.CONTEXT_MODE_PARAM,
randomFrom(Metadata.XContentContext.values()).toString()
)
);
}
}

View file

@ -324,10 +324,19 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
}
public void testAvailableDiskSpaceMonitorWhenFileSystemStatErrors() throws Exception {
aFileStore.usableSpace = randomLongBetween(1L, 100L);
aFileStore.totalSpace = randomLongBetween(1L, 100L);
bFileStore.usableSpace = randomLongBetween(1L, 100L);
bFileStore.totalSpace = randomLongBetween(1L, 100L);
long aUsableSpace;
long bUsableSpace;
do {
aFileStore.usableSpace = randomLongBetween(1L, 1000L);
aFileStore.totalSpace = randomLongBetween(1L, 1000L);
bFileStore.usableSpace = randomLongBetween(1L, 1000L);
bFileStore.totalSpace = randomLongBetween(1L, 1000L);
// the default 5% (same as flood stage level)
aUsableSpace = Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L);
bUsableSpace = Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L);
} while (aUsableSpace == bUsableSpace); // they must be different in order to distinguish the available disk space updates
long finalBUsableSpace = bUsableSpace;
long finalAUsableSpace = aUsableSpace;
boolean aErrorsFirst = randomBoolean();
if (aErrorsFirst) {
// the "a" file system will error when collecting stats
@ -355,18 +364,10 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
assertThat(availableDiskSpaceUpdates.size(), is(1));
if (aErrorsFirst) {
// uses the stats from "b"
assertThat(
availableDiskSpaceUpdates.getLast().getBytes(),
// the default 5% (same as flood stage level)
is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L))
);
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(finalBUsableSpace));
} else {
// uses the stats from "a"
assertThat(
availableDiskSpaceUpdates.getLast().getBytes(),
// the default 5% (same as flood stage level)
is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L))
);
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(finalAUsableSpace));
}
}
});
@ -393,21 +394,14 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
}
assertBusy(() -> {
synchronized (availableDiskSpaceUpdates) {
// the updates are different values
assertThat(availableDiskSpaceUpdates.size(), is(3));
if (aErrorsFirst) {
// uses the stats from "a"
assertThat(
availableDiskSpaceUpdates.getLast().getBytes(),
// the default 5% (same as flood stage level)
is(Math.max(aFileStore.usableSpace - aFileStore.totalSpace / 20, 0L))
);
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(finalAUsableSpace));
} else {
// uses the stats from "b"
assertThat(
availableDiskSpaceUpdates.getLast().getBytes(),
// the default 5% (same as flood stage level)
is(Math.max(bFileStore.usableSpace - bFileStore.totalSpace / 20, 0L))
);
assertThat(availableDiskSpaceUpdates.getLast().getBytes(), is(finalBUsableSpace));
}
}
});
@ -847,6 +841,7 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
when(mergeTask2.schedule()).thenReturn(RUN);
boolean task1Runs = randomBoolean();
long currentAvailableBudget = expectedAvailableBudget.get();
// the over-budget here can be larger than the total initial available budget
long overBudget = randomLongBetween(currentAvailableBudget + 1L, currentAvailableBudget + 100L);
long underBudget = randomLongBetween(0L, currentAvailableBudget);
if (task1Runs) {
@ -882,11 +877,18 @@ public class ThreadPoolMergeExecutorServiceDiskSpaceTests extends ESTestCase {
// update the expected budget given that one task now finished
expectedAvailableBudget.set(expectedAvailableBudget.get() + completedMergeTask.estimatedRemainingMergeSize());
}
// let the test finish cleanly
assertBusy(() -> {
assertThat(threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(), is(aHasMoreSpace ? 112_500L : 103_000L));
assertThat(threadPoolMergeExecutorService.allDone(), is(true));
});
assertBusy(
() -> assertThat(
threadPoolMergeExecutorService.getDiskSpaceAvailableForNewMergeTasks(),
is(aHasMoreSpace ? 112_500L : 103_000L)
)
);
// let the test finish cleanly (some tasks can be over budget even if all the other tasks finished running)
aFileStore.totalSpace = Long.MAX_VALUE;
bFileStore.totalSpace = Long.MAX_VALUE;
aFileStore.usableSpace = Long.MAX_VALUE;
bFileStore.usableSpace = Long.MAX_VALUE;
assertBusy(() -> assertThat(threadPoolMergeExecutorService.allDone(), is(true)));
}
if (setThreadPoolMergeSchedulerSetting) {
assertWarnings(

View file

@ -612,11 +612,12 @@ public class ThreadPoolMergeSchedulerTests extends ESTestCase {
fail(e);
}
});
// test expects that there definitely is a running merge before closing the merge scheduler
mergeRunningLatch.await();
// closes the merge scheduler
t.start();
try {
assertTrue(t.isAlive());
// wait for the merge to actually run
mergeRunningLatch.await();
// ensure the merge scheduler is effectively "closed"
assertBusy(() -> {
MergeSource mergeSource2 = mock(MergeSource.class);

View file

@ -15,7 +15,6 @@ import com.sun.net.httpserver.HttpServer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.xcontent.spi.XContentProvider;
import org.junit.rules.ExternalResource;
import java.io.BufferedReader;
@ -25,7 +24,6 @@ import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -35,14 +33,12 @@ import java.util.function.Consumer;
public class RecordingApmServer extends ExternalResource {
private static final Logger logger = LogManager.getLogger(RecordingApmServer.class);
private static final XContentProvider.FormatProvider XCONTENT = XContentProvider.provider().getJsonXContent();
final ArrayBlockingQueue<String> received = new ArrayBlockingQueue<>(1000);
private static HttpServer server;
private final Thread messageConsumerThread = consumerThread();
private volatile Consumer<String> consumer;
private volatile boolean consumerRunning = true;
private volatile boolean running = true;
@Override
protected void before() throws Throwable {
@ -56,7 +52,7 @@ public class RecordingApmServer extends ExternalResource {
private Thread consumerThread() {
return new Thread(() -> {
while (consumerRunning) {
while (running) {
if (consumer != null) {
try {
String msg = received.poll(1L, TimeUnit.SECONDS);
@ -74,28 +70,38 @@ public class RecordingApmServer extends ExternalResource {
@Override
protected void after() {
running = false;
server.stop(1);
consumerRunning = false;
consumer = null;
}
private void handle(HttpExchange exchange) throws IOException {
try (exchange) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
if (running) {
try {
try (InputStream requestBody = exchange.getRequestBody()) {
if (requestBody != null) {
var read = readJsonMessages(requestBody);
received.addAll(read);
}
}
}
} catch (RuntimeException e) {
logger.warn("failed to parse request", e);
} catch (Throwable t) {
// The lifetime of HttpServer makes message handling "brittle": we need to start handling and recording received
// messages before the test starts running. We should also stop handling them before the test ends (and the test
// cluster is torn down), or we may run into IOException as the communication channel is interrupted.
// Coordinating the lifecycle of the mock HttpServer and of the test ES cluster is difficult and error-prone, so
// we just handle Throwable and don't care (log, but don't care): if we have an error in communicating to/from
// the mock server while the test is running, the test would fail anyway as the expected messages will not arrive, and
// if we have an error outside the test scope (before or after) that is OK.
logger.warn("failed to parse request", t);
}
}
exchange.sendResponseHeaders(201, 0);
}
}
private List<String> readJsonMessages(InputStream input) throws IOException {
private List<String> readJsonMessages(InputStream input) {
// parse NDJSON
return new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8)).lines().toList();
}
@ -104,14 +110,7 @@ public class RecordingApmServer extends ExternalResource {
return server.getAddress().getPort();
}
public List<String> getMessages() {
List<String> list = new ArrayList<>(received.size());
received.drainTo(list);
return list;
}
public void addMessageConsumer(Consumer<String> messageConsumer) {
this.consumer = messageConsumer;
}
}

View file

@ -91,7 +91,8 @@ public class TracesApmIT extends ESRestTestCase {
client().performRequest(nodeStatsRequest);
finished.await(30, TimeUnit.SECONDS);
var completed = finished.await(30, TimeUnit.SECONDS);
assertTrue("Timeout when waiting for assertions to complete", completed);
assertThat(assertions, equalTo(Collections.emptySet()));
}
@ -143,5 +144,4 @@ public class TracesApmIT extends ESRestTestCase {
return Collections.emptyMap();
}
}
}

View file

@ -17,6 +17,7 @@ import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedFunction;
@ -58,6 +59,7 @@ import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
@ -85,6 +87,7 @@ import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.elasticsearch.test.ESTestCase.randomMap;
import static org.elasticsearch.test.ESTestCase.randomMillisUpToYear9999;
import static org.elasticsearch.test.ESTestCase.randomPositiveTimeValue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -365,6 +368,7 @@ public final class DataStreamTestHelper {
generation,
metadata,
randomSettings(),
randomMappings(),
system ? true : randomBoolean(),
replicated,
system,
@ -400,6 +404,15 @@ public final class DataStreamTestHelper {
);
}
private static CompressedXContent randomMappings() {
try {
return new CompressedXContent("{\"properties\":{\"" + randomAlphaOfLength(5) + "\":{\"type\":\"keyword\"}}}");
} catch (IOException e) {
fail("got an IO exception creating fake mappings: " + e);
return null;
}
}
public static DataStreamAlias randomAliasInstance() {
List<String> dataStreams = List.of(generateRandomStringArray(5, 5, false, false));
return new DataStreamAlias(

View file

@ -152,7 +152,12 @@ public class MockSearchService extends SearchService {
@Override
public SearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException {
SearchContext searchContext = super.createSearchContext(request, timeout);
onPutContext.accept(searchContext.readerContext());
try {
onCreateSearchContext.accept(searchContext);
} catch (Exception e) {
searchContext.close();
throw e;
}
searchContext.addReleasable(() -> onRemoveContext.accept(searchContext.readerContext()));
return searchContext;
}

View file

@ -14,6 +14,7 @@ import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.autoscaling.LocalStateAutoscaling;
import org.elasticsearch.xpack.autoscaling.action.GetAutoscalingCapacityAction;
@ -39,7 +40,10 @@ public abstract class AutoscalingStorageIntegTestCase extends DiskUsageIntegTest
builder.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey(), LOW_WATERMARK_BYTES + "b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey(), HIGH_WATERMARK_BYTES + "b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey(), "0b")
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms");
.put(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.getKey(), "0ms")
// the periodicity for the checker for the available disk space as well as the merge tasks' aborting status
// the default of 5 seconds might timeout some tests
.put(ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING.getKey(), "100ms");
return builder.build();
}

View file

@ -80,6 +80,11 @@ public class LifecycleOperationMetadata implements Metadata.ProjectCustom {
);
}
@Deprecated(forRemoval = true)
public static OperationMode currentSLMMode(final ClusterState state) {
return currentSLMMode(state.metadata().getProject());
}
/**
* Returns the current ILM mode based on the given cluster state. It first checks the newer
* storage mechanism ({@link LifecycleOperationMetadata#getSLMOperationMode()}) before falling
@ -87,9 +92,9 @@ public class LifecycleOperationMetadata implements Metadata.ProjectCustom {
* value for an empty state is used.
*/
@SuppressWarnings("deprecated")
public static OperationMode currentSLMMode(final ClusterState state) {
SnapshotLifecycleMetadata oldMetadata = state.metadata().getProject().custom(SnapshotLifecycleMetadata.TYPE);
LifecycleOperationMetadata currentMetadata = state.metadata().getProject().custom(LifecycleOperationMetadata.TYPE);
public static OperationMode currentSLMMode(ProjectMetadata project) {
SnapshotLifecycleMetadata oldMetadata = project.custom(SnapshotLifecycleMetadata.TYPE);
LifecycleOperationMetadata currentMetadata = project.custom(LifecycleOperationMetadata.TYPE);
return Optional.ofNullable(currentMetadata)
.map(LifecycleOperationMetadata::getSLMOperationMode)
.orElse(

View file

@ -14,7 +14,10 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
@ -29,6 +32,8 @@ import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.curren
*/
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
private final ProjectId projectId;
@Nullable
private final OperationMode ilmMode;
@Nullable
@ -47,18 +52,21 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
};
}
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
private OperationModeUpdateTask(Priority priority, ProjectId projectId, OperationMode ilmMode, OperationMode slmMode) {
super(priority);
this.projectId = projectId;
this.ilmMode = ilmMode;
this.slmMode = slmMode;
}
public static OperationModeUpdateTask ilmMode(OperationMode mode) {
return new OperationModeUpdateTask(getPriority(mode), mode, null);
public static OperationModeUpdateTask ilmMode(ProjectId projectId, OperationMode mode) {
return new OperationModeUpdateTask(getPriority(mode), projectId, mode, null);
}
public static OperationModeUpdateTask slmMode(OperationMode mode) {
return new OperationModeUpdateTask(getPriority(mode), null, mode);
@FixForMultiProject // Use non-default ID when SLM has been made project-aware
final var projectId = ProjectId.DEFAULT;
return new OperationModeUpdateTask(getPriority(mode), projectId, null, mode);
}
private static Priority getPriority(OperationMode mode) {
@ -79,22 +87,24 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterState newState = currentState;
newState = updateILMState(newState);
newState = updateSLMState(newState);
return newState;
}
private ClusterState updateILMState(final ClusterState currentState) {
if (ilmMode == null) {
ProjectMetadata oldProject = currentState.metadata().getProject(projectId);
ProjectMetadata newProject = updateILMState(oldProject);
newProject = updateSLMState(newProject);
if (newProject == oldProject) {
return currentState;
}
return ClusterState.builder(currentState).putProjectMetadata(newProject).build();
}
final var project = currentState.metadata().getProject();
final OperationMode currentMode = currentILMMode(project);
private ProjectMetadata updateILMState(final ProjectMetadata currentProject) {
if (ilmMode == null) {
return currentProject;
}
final OperationMode currentMode = currentILMMode(currentProject);
if (currentMode.equals(ilmMode)) {
// No need for a new state
return currentState;
return currentProject;
}
final OperationMode newMode;
@ -102,24 +112,23 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
newMode = ilmMode;
} else {
// The transition is invalid, return the current state
return currentState;
return currentProject;
}
logger.info("updating ILM operation mode to {}", newMode);
final var updatedMetadata = new LifecycleOperationMetadata(newMode, currentSLMMode(currentState));
return currentState.copyAndUpdateProject(project.id(), b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
final var updatedMetadata = new LifecycleOperationMetadata(newMode, currentSLMMode(currentProject));
return currentProject.copyAndUpdate(b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
}
private ClusterState updateSLMState(final ClusterState currentState) {
private ProjectMetadata updateSLMState(final ProjectMetadata currentProject) {
if (slmMode == null) {
return currentState;
return currentProject;
}
final var project = currentState.metadata().getProject();
final OperationMode currentMode = currentSLMMode(currentState);
final OperationMode currentMode = currentSLMMode(currentProject);
if (currentMode.equals(slmMode)) {
// No need for a new state
return currentState;
return currentProject;
}
final OperationMode newMode;
@ -127,12 +136,12 @@ public class OperationModeUpdateTask extends ClusterStateUpdateTask {
newMode = slmMode;
} else {
// The transition is invalid, return the current state
return currentState;
return currentProject;
}
logger.info("updating SLM operation mode to {}", newMode);
final var updatedMetadata = new LifecycleOperationMetadata(currentILMMode(project), newMode);
return currentState.copyAndUpdateProject(project.id(), b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
final var updatedMetadata = new LifecycleOperationMetadata(currentILMMode(currentProject), newMode);
return currentProject.copyAndUpdate(b -> b.putCustom(LifecycleOperationMetadata.TYPE, updatedMetadata));
}
@Override

View file

@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.ilm;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
@ -102,23 +103,23 @@ public class OperationModeUpdateTaskTests extends ESTestCase {
currentMode,
new SnapshotLifecycleStats()
);
Metadata.Builder metadata = Metadata.builder().persistentSettings(settings(IndexVersion.current()).build());
ProjectMetadata.Builder project = ProjectMetadata.builder(randomProjectIdOrDefault());
if (metadataInstalled) {
metadata.projectCustoms(
project.customs(
Map.of(IndexLifecycleMetadata.TYPE, indexLifecycleMetadata, SnapshotLifecycleMetadata.TYPE, snapshotLifecycleMetadata)
);
}
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(metadata).build();
OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(requestMode);
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
OperationModeUpdateTask task = OperationModeUpdateTask.ilmMode(project.getId(), requestMode);
ClusterState newState = task.execute(state);
if (assertSameClusterState) {
assertSame("expected the same state instance but they were different", state, newState);
} else {
assertThat("expected a different state instance but they were the same", state, not(equalTo(newState)));
}
LifecycleOperationMetadata newMetadata = newState.metadata().getProject().custom(LifecycleOperationMetadata.TYPE);
LifecycleOperationMetadata newMetadata = newState.metadata().getProject(project.getId()).custom(LifecycleOperationMetadata.TYPE);
IndexLifecycleMetadata oldMetadata = newState.metadata()
.getProject()
.getProject(project.getId())
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
return Optional.ofNullable(newMetadata)
.map(LifecycleOperationMetadata::getILMOperationMode)

View file

@ -9,6 +9,8 @@ package org.elasticsearch.compute.data;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.lucene.ShardRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
@ -17,7 +19,7 @@ import java.io.IOException;
/**
* Wrapper around {@link DocVector} to make a valid {@link Block}.
*/
public class DocBlock extends AbstractVectorBlock implements Block {
public class DocBlock extends AbstractVectorBlock implements Block, RefCounted {
private final DocVector vector;
@ -96,6 +98,12 @@ public class DocBlock extends AbstractVectorBlock implements Block {
private final IntVector.Builder shards;
private final IntVector.Builder segments;
private final IntVector.Builder docs;
private ShardRefCounted shardRefCounters = ShardRefCounted.ALWAYS_REFERENCED;
public Builder setShardRefCounted(ShardRefCounted shardRefCounters) {
this.shardRefCounters = shardRefCounters;
return this;
}
private Builder(BlockFactory blockFactory, int estimatedSize) {
IntVector.Builder shards = null;
@ -183,7 +191,7 @@ public class DocBlock extends AbstractVectorBlock implements Block {
shards = this.shards.build();
segments = this.segments.build();
docs = this.docs.build();
result = new DocVector(shards, segments, docs, null);
result = new DocVector(shardRefCounters, shards, segments, docs, null);
return result.asBlock();
} finally {
if (result == null) {

View file

@ -10,10 +10,13 @@ package org.elasticsearch.compute.data;
import org.apache.lucene.util.IntroSorter;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.compute.lucene.ShardRefCounted;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.ReleasableIterator;
import org.elasticsearch.core.Releasables;
import java.util.Objects;
import java.util.function.Consumer;
/**
* {@link Vector} where each entry references a lucene document.
@ -48,8 +51,21 @@ public final class DocVector extends AbstractVector implements Vector {
*/
private int[] shardSegmentDocMapBackwards;
public DocVector(IntVector shards, IntVector segments, IntVector docs, Boolean singleSegmentNonDecreasing) {
private final ShardRefCounted shardRefCounters;
public ShardRefCounted shardRefCounted() {
return shardRefCounters;
}
public DocVector(
ShardRefCounted shardRefCounters,
IntVector shards,
IntVector segments,
IntVector docs,
Boolean singleSegmentNonDecreasing
) {
super(shards.getPositionCount(), shards.blockFactory());
this.shardRefCounters = shardRefCounters;
this.shards = shards;
this.segments = segments;
this.docs = docs;
@ -65,10 +81,19 @@ public final class DocVector extends AbstractVector implements Vector {
);
}
blockFactory().adjustBreaker(BASE_RAM_BYTES_USED);
forEachShardRefCounter(RefCounted::mustIncRef);
}
public DocVector(IntVector shards, IntVector segments, IntVector docs, int[] docMapForwards, int[] docMapBackwards) {
this(shards, segments, docs, null);
public DocVector(
ShardRefCounted shardRefCounters,
IntVector shards,
IntVector segments,
IntVector docs,
int[] docMapForwards,
int[] docMapBackwards
) {
this(shardRefCounters, shards, segments, docs, null);
this.shardSegmentDocMapForwards = docMapForwards;
this.shardSegmentDocMapBackwards = docMapBackwards;
}
@ -238,7 +263,7 @@ public final class DocVector extends AbstractVector implements Vector {
filteredShards = shards.filter(positions);
filteredSegments = segments.filter(positions);
filteredDocs = docs.filter(positions);
result = new DocVector(filteredShards, filteredSegments, filteredDocs, null);
result = new DocVector(shardRefCounters, filteredShards, filteredSegments, filteredDocs, null);
return result;
} finally {
if (result == null) {
@ -317,5 +342,20 @@ public final class DocVector extends AbstractVector implements Vector {
segments,
docs
);
forEachShardRefCounter(RefCounted::decRef);
}
private void forEachShardRefCounter(Consumer<RefCounted> consumer) {
switch (shards) {
case ConstantIntVector constantIntVector -> consumer.accept(shardRefCounters.get(constantIntVector.getInt(0)));
case ConstantNullVector ignored -> {
// Noop
}
default -> {
for (int i = 0; i < shards.getPositionCount(); i++) {
consumer.accept(shardRefCounters.get(shards.getInt(i)));
}
}
}
}
}

View file

@ -18,6 +18,7 @@ import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import java.io.IOException;
@ -40,6 +41,7 @@ public class LuceneCountOperator extends LuceneOperator {
private final LeafCollector leafCollector;
public static class Factory extends LuceneOperator.Factory {
private final List<? extends RefCounted> shardRefCounters;
public Factory(
List<? extends ShardContext> contexts,
@ -58,11 +60,12 @@ public class LuceneCountOperator extends LuceneOperator {
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.shardRefCounters = contexts;
}
@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneCountOperator(driverContext.blockFactory(), sliceQueue, limit);
return new LuceneCountOperator(shardRefCounters, driverContext.blockFactory(), sliceQueue, limit);
}
@Override
@ -71,8 +74,13 @@ public class LuceneCountOperator extends LuceneOperator {
}
}
public LuceneCountOperator(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int limit) {
super(blockFactory, PAGE_SIZE, sliceQueue);
public LuceneCountOperator(
List<? extends RefCounted> shardRefCounters,
BlockFactory blockFactory,
LuceneSliceQueue sliceQueue,
int limit
) {
super(shardRefCounters, blockFactory, PAGE_SIZE, sliceQueue);
this.remainingDocs = limit;
this.leafCollector = new LeafCollector() {
@Override

View file

@ -108,6 +108,7 @@ public final class LuceneMaxFactory extends LuceneOperator.Factory {
abstract long bytesToLong(byte[] bytes);
}
private final List<? extends ShardContext> contexts;
private final String fieldName;
private final NumberType numberType;
@ -130,13 +131,14 @@ public final class LuceneMaxFactory extends LuceneOperator.Factory {
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.contexts = contexts;
this.fieldName = fieldName;
this.numberType = numberType;
}
@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
return new LuceneMinMaxOperator(contexts, driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MIN_VALUE);
}
@Override

View file

@ -16,6 +16,7 @@ import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
@ -108,6 +109,7 @@ public final class LuceneMinFactory extends LuceneOperator.Factory {
abstract long bytesToLong(byte[] bytes);
}
private final List<? extends RefCounted> shardRefCounters;
private final String fieldName;
private final NumberType numberType;
@ -130,13 +132,22 @@ public final class LuceneMinFactory extends LuceneOperator.Factory {
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.shardRefCounters = contexts;
this.fieldName = fieldName;
this.numberType = numberType;
}
@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneMinMaxOperator(driverContext.blockFactory(), sliceQueue, fieldName, numberType, limit, Long.MAX_VALUE);
return new LuceneMinMaxOperator(
shardRefCounters,
driverContext.blockFactory(),
sliceQueue,
fieldName,
numberType,
limit,
Long.MAX_VALUE
);
}
@Override

View file

@ -20,10 +20,12 @@ import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.search.MultiValueMode;
import java.io.IOException;
import java.util.List;
/**
* Operator that finds the min or max value of a field using Lucene searches
@ -65,6 +67,7 @@ final class LuceneMinMaxOperator extends LuceneOperator {
private final String fieldName;
LuceneMinMaxOperator(
List<? extends RefCounted> shardRefCounters,
BlockFactory blockFactory,
LuceneSliceQueue sliceQueue,
String fieldName,
@ -72,7 +75,7 @@ final class LuceneMinMaxOperator extends LuceneOperator {
int limit,
long initialResult
) {
super(blockFactory, PAGE_SIZE, sliceQueue);
super(shardRefCounters, blockFactory, PAGE_SIZE, sliceQueue);
this.remainingDocs = limit;
this.numberType = numberType;
this.fieldName = fieldName;

View file

@ -25,6 +25,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
@ -52,6 +53,7 @@ public abstract class LuceneOperator extends SourceOperator {
public static final int NO_LIMIT = Integer.MAX_VALUE;
protected final List<? extends RefCounted> shardContextCounters;
protected final BlockFactory blockFactory;
/**
@ -77,7 +79,14 @@ public abstract class LuceneOperator extends SourceOperator {
*/
long rowsEmitted;
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
protected LuceneOperator(
List<? extends RefCounted> shardContextCounters,
BlockFactory blockFactory,
int maxPageSize,
LuceneSliceQueue sliceQueue
) {
this.shardContextCounters = shardContextCounters;
shardContextCounters.forEach(RefCounted::mustIncRef);
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.sliceQueue = sliceQueue;
@ -138,7 +147,12 @@ public abstract class LuceneOperator extends SourceOperator {
protected abstract Page getCheckedOutput() throws IOException;
@Override
public void close() {}
public final void close() {
shardContextCounters.forEach(RefCounted::decRef);
additionalClose();
}
protected void additionalClose() { /* Override this method to add any additional cleanup logic if needed */ }
LuceneScorer getCurrentOrLoadNextScorer() {
while (currentScorer == null || currentScorer.isDone()) {

View file

@ -28,6 +28,7 @@ import org.elasticsearch.compute.lucene.LuceneSliceQueue.PartitioningStrategy;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Limiter;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
@ -59,7 +60,7 @@ public class LuceneSourceOperator extends LuceneOperator {
private final int minPageSize;
public static class Factory extends LuceneOperator.Factory {
private final List<? extends RefCounted> contexts;
private final int maxPageSize;
private final Limiter limiter;
@ -82,6 +83,7 @@ public class LuceneSourceOperator extends LuceneOperator {
needsScore,
needsScore ? COMPLETE : COMPLETE_NO_SCORES
);
this.contexts = contexts;
this.maxPageSize = maxPageSize;
// TODO: use a single limiter for multiple stage execution
this.limiter = limit == NO_LIMIT ? Limiter.NO_LIMIT : new Limiter(limit);
@ -89,7 +91,7 @@ public class LuceneSourceOperator extends LuceneOperator {
@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneSourceOperator(driverContext.blockFactory(), maxPageSize, sliceQueue, limit, limiter, needsScore);
return new LuceneSourceOperator(contexts, driverContext.blockFactory(), maxPageSize, sliceQueue, limit, limiter, needsScore);
}
public int maxPageSize() {
@ -216,6 +218,7 @@ public class LuceneSourceOperator extends LuceneOperator {
@SuppressWarnings("this-escape")
public LuceneSourceOperator(
List<? extends RefCounted> shardContextCounters,
BlockFactory blockFactory,
int maxPageSize,
LuceneSliceQueue sliceQueue,
@ -223,7 +226,7 @@ public class LuceneSourceOperator extends LuceneOperator {
Limiter limiter,
boolean needsScore
) {
super(blockFactory, maxPageSize, sliceQueue);
super(shardContextCounters, blockFactory, maxPageSize, sliceQueue);
this.minPageSize = Math.max(1, maxPageSize / 2);
this.remainingDocs = limit;
this.limiter = limiter;
@ -324,12 +327,14 @@ public class LuceneSourceOperator extends LuceneOperator {
Block[] blocks = new Block[1 + (scoreBuilder == null ? 0 : 1) + scorer.tags().size()];
currentPagePos -= discardedDocs;
try {
shard = blockFactory.newConstantIntVector(scorer.shardContext().index(), currentPagePos);
int shardId = scorer.shardContext().index();
shard = blockFactory.newConstantIntVector(shardId, currentPagePos);
leaf = blockFactory.newConstantIntVector(scorer.leafReaderContext().ord, currentPagePos);
docs = buildDocsVector(currentPagePos);
docsBuilder = blockFactory.newIntVectorBuilder(Math.min(remainingDocs, maxPageSize));
int b = 0;
blocks[b++] = new DocVector(shard, leaf, docs, true).asBlock();
ShardRefCounted refCounted = ShardRefCounted.single(shardId, shardContextCounters.get(shardId));
blocks[b++] = new DocVector(refCounted, shard, leaf, docs, true).asBlock();
shard = null;
leaf = null;
docs = null;
@ -387,7 +392,7 @@ public class LuceneSourceOperator extends LuceneOperator {
}
@Override
public void close() {
public void additionalClose() {
Releasables.close(docsBuilder, scoreBuilder);
}

View file

@ -53,6 +53,7 @@ import static org.apache.lucene.search.ScoreMode.TOP_DOCS_WITH_SCORES;
public final class LuceneTopNSourceOperator extends LuceneOperator {
public static class Factory extends LuceneOperator.Factory {
private final List<? extends ShardContext> contexts;
private final int maxPageSize;
private final List<SortBuilder<?>> sorts;
@ -76,13 +77,14 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
needsScore,
needsScore ? TOP_DOCS_WITH_SCORES : TOP_DOCS
);
this.contexts = contexts;
this.maxPageSize = maxPageSize;
this.sorts = sorts;
}
@Override
public SourceOperator get(DriverContext driverContext) {
return new LuceneTopNSourceOperator(driverContext.blockFactory(), maxPageSize, sorts, limit, sliceQueue, needsScore);
return new LuceneTopNSourceOperator(contexts, driverContext.blockFactory(), maxPageSize, sorts, limit, sliceQueue, needsScore);
}
public int maxPageSize() {
@ -116,11 +118,13 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
private int offset = 0;
private PerShardCollector perShardCollector;
private final List<? extends ShardContext> contexts;
private final List<SortBuilder<?>> sorts;
private final int limit;
private final boolean needsScore;
public LuceneTopNSourceOperator(
List<? extends ShardContext> contexts,
BlockFactory blockFactory,
int maxPageSize,
List<SortBuilder<?>> sorts,
@ -128,7 +132,8 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
LuceneSliceQueue sliceQueue,
boolean needsScore
) {
super(blockFactory, maxPageSize, sliceQueue);
super(contexts, blockFactory, maxPageSize, sliceQueue);
this.contexts = contexts;
this.sorts = sorts;
this.limit = limit;
this.needsScore = needsScore;
@ -236,10 +241,12 @@ public final class LuceneTopNSourceOperator extends LuceneOperator {
}
}
shard = blockFactory.newConstantIntBlockWith(perShardCollector.shardContext.index(), size);
int shardId = perShardCollector.shardContext.index();
shard = blockFactory.newConstantIntBlockWith(shardId, size);
segments = currentSegmentBuilder.build();
docs = currentDocsBuilder.build();
docBlock = new DocVector(shard.asVector(), segments, docs, null).asBlock();
ShardRefCounted shardRefCounted = ShardRefCounted.single(shardId, contexts.get(shardId));
docBlock = new DocVector(shardRefCounted, shard.asVector(), segments, docs, null).asBlock();
shard = null;
segments = null;
docs = null;

View file

@ -9,6 +9,7 @@ package org.elasticsearch.compute.lucene;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.SourceLoader;
@ -22,7 +23,7 @@ import java.util.Optional;
/**
* Context of each shard we're operating against.
*/
public interface ShardContext {
public interface ShardContext extends RefCounted {
/**
* The index of this shard in the list of shards being processed.
*/

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.compute.lucene;
import org.elasticsearch.core.RefCounted;
import java.util.List;
/** Manages reference counting for {@link ShardContext}. */
public interface ShardRefCounted {
/**
* @param shardId The shard index used by {@link org.elasticsearch.compute.data.DocVector}.
* @return the {@link RefCounted} for the given shard. In production, this will almost always be a {@link ShardContext}.
*/
RefCounted get(int shardId);
static ShardRefCounted fromList(List<? extends RefCounted> refCounters) {
return shardId -> refCounters.get(shardId);
}
static ShardRefCounted fromShardContext(ShardContext shardContext) {
return single(shardContext.index(), shardContext);
}
static ShardRefCounted single(int index, RefCounted refCounted) {
return shardId -> {
if (shardId != index) {
throw new IllegalArgumentException("Invalid shardId: " + shardId + ", expected: " + index);
}
return refCounted;
};
}
ShardRefCounted ALWAYS_REFERENCED = shardId -> RefCounted.ALWAYS_REFERENCED;
}

View file

@ -36,12 +36,12 @@ import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public final class TimeSeriesSourceOperator extends LuceneOperator {
private final int maxPageSize;
private final BlockFactory blockFactory;
private final LuceneSliceQueue sliceQueue;
@ -55,8 +55,14 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
private DocIdCollector docCollector;
private long tsidsLoaded;
TimeSeriesSourceOperator(BlockFactory blockFactory, LuceneSliceQueue sliceQueue, int maxPageSize, int limit) {
super(blockFactory, maxPageSize, sliceQueue);
TimeSeriesSourceOperator(
List<? extends ShardContext> contexts,
BlockFactory blockFactory,
LuceneSliceQueue sliceQueue,
int maxPageSize,
int limit
) {
super(contexts, blockFactory, maxPageSize, sliceQueue);
this.maxPageSize = maxPageSize;
this.blockFactory = blockFactory;
this.remainingDocs = limit;
@ -131,7 +137,7 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
}
@Override
public void close() {
public void additionalClose() {
Releasables.closeExpectNoException(timestampsBuilder, tsHashesBuilder, docCollector);
}
@ -382,7 +388,7 @@ public final class TimeSeriesSourceOperator extends LuceneOperator {
segments = segmentsBuilder.build();
segmentsBuilder = null;
shards = blockFactory.newConstantIntVector(shardContext.index(), docs.getPositionCount());
docVector = new DocVector(shards, segments, docs, segments.isConstant());
docVector = new DocVector(ShardRefCounted.fromShardContext(shardContext), shards, segments, docs, segments.isConstant());
return docVector;
} finally {
if (docVector == null) {

View file

@ -27,7 +27,7 @@ import java.util.function.Function;
* in order to read tsdb indices in parallel.
*/
public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
private final List<? extends ShardContext> contexts;
private final int maxPageSize;
private TimeSeriesSourceOperatorFactory(
@ -47,12 +47,13 @@ public class TimeSeriesSourceOperatorFactory extends LuceneOperator.Factory {
false,
ScoreMode.COMPLETE_NO_SCORES
);
this.contexts = contexts;
this.maxPageSize = maxPageSize;
}
@Override
public SourceOperator get(DriverContext driverContext) {
return new TimeSeriesSourceOperator(driverContext.blockFactory(), sliceQueue, maxPageSize, limit);
return new TimeSeriesSourceOperator(contexts, driverContext.blockFactory(), sliceQueue, maxPageSize, limit);
}
@Override

View file

@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
@ -47,6 +48,7 @@ import java.util.function.IntFunction;
import java.util.function.Supplier;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
/**
* Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
@ -529,7 +531,7 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
}
private LeafReaderContext ctx(int shard, int segment) {
return shardContexts.get(shard).reader.leaves().get(segment);
return shardContexts.get(shard).reader().leaves().get(segment);
}
@Override
@ -617,18 +619,23 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
Status(StreamInput in) throws IOException {
super(in);
readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(readersBuilt, StreamOutput::writeVInt);
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsValuesLoaded(out.getTransportVersion())) {
out.writeVLong(valuesLoaded);
}
}
private static boolean supportsValuesLoaded(TransportVersion version) {
return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
|| version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
}
@Override
public String getWriteableName() {
return ENTRY.name;

View file

@ -24,6 +24,7 @@ import org.elasticsearch.tasks.TaskCancelledException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@ -75,7 +76,7 @@ public class Driver implements Releasable, Describable {
private final long startNanos;
private final DriverContext driverContext;
private final Supplier<String> description;
private final List<Operator> activeOperators;
private List<Operator> activeOperators;
private final List<OperatorStatus> statusOfCompletedOperators = new ArrayList<>();
private final Releasable releasable;
private final long statusNanos;
@ -184,7 +185,7 @@ public class Driver implements Releasable, Describable {
assert driverContext.assertBeginRunLoop();
isBlocked = runSingleLoopIteration();
} catch (DriverEarlyTerminationException unused) {
closeEarlyFinishedOperators();
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
assert isFinished() : "not finished after early termination";
} finally {
assert driverContext.assertEndRunLoop();
@ -251,9 +252,13 @@ public class Driver implements Releasable, Describable {
driverContext.checkForEarlyTermination();
boolean movedPage = false;
for (int i = 0; i < activeOperators.size() - 1; i++) {
Operator op = activeOperators.get(i);
Operator nextOp = activeOperators.get(i + 1);
ListIterator<Operator> iterator = activeOperators.listIterator();
while (iterator.hasNext()) {
Operator op = iterator.next();
if (iterator.hasNext() == false) {
break;
}
Operator nextOp = activeOperators.get(iterator.nextIndex());
// skip blocked operator
if (op.isBlocked().listener().isDone() == false) {
@ -262,6 +267,7 @@ public class Driver implements Releasable, Describable {
if (op.isFinished() == false && nextOp.needsInput()) {
driverContext.checkForEarlyTermination();
assert nextOp.isFinished() == false : "next operator should not be finished yet: " + nextOp;
Page page = op.getOutput();
if (page == null) {
// No result, just move to the next iteration
@ -283,11 +289,15 @@ public class Driver implements Releasable, Describable {
if (op.isFinished()) {
driverContext.checkForEarlyTermination();
nextOp.finish();
var originalIndex = iterator.previousIndex();
var index = closeEarlyFinishedOperators(iterator);
if (index >= 0) {
iterator = new ArrayList<>(activeOperators).listIterator(originalIndex - index);
}
}
}
closeEarlyFinishedOperators();
closeEarlyFinishedOperators(activeOperators.listIterator(activeOperators.size()));
if (movedPage == false) {
return oneOf(
@ -300,22 +310,24 @@ public class Driver implements Releasable, Describable {
return Operator.NOT_BLOCKED;
}
private void closeEarlyFinishedOperators() {
for (int index = activeOperators.size() - 1; index >= 0; index--) {
if (activeOperators.get(index).isFinished()) {
// Returns the index of the last operator that was closed, -1 if no operator was closed.
private int closeEarlyFinishedOperators(ListIterator<Operator> operators) {
var iterator = activeOperators.listIterator(operators.nextIndex());
while (iterator.hasPrevious()) {
if (iterator.previous().isFinished()) {
var index = iterator.nextIndex();
/*
* Close and remove this operator and all source operators in the
* most paranoid possible way. Closing operators shouldn't throw,
* but if it does, this will make sure we don't try to close any
* that succeed twice.
*/
List<Operator> finishedOperators = this.activeOperators.subList(0, index + 1);
Iterator<Operator> itr = finishedOperators.iterator();
while (itr.hasNext()) {
Operator op = itr.next();
Iterator<Operator> finishedOperators = this.activeOperators.subList(0, index + 1).iterator();
while (finishedOperators.hasNext()) {
Operator op = finishedOperators.next();
statusOfCompletedOperators.add(new OperatorStatus(op.toString(), op.status()));
op.close();
itr.remove();
finishedOperators.remove();
}
// Finish the next operator, which is now the first operator.
@ -323,9 +335,10 @@ public class Driver implements Releasable, Describable {
Operator newRootOperator = activeOperators.get(0);
newRootOperator.finish();
}
break;
return index;
}
}
return -1;
}
public void cancel(String reason) {

View file

@ -33,6 +33,7 @@ import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.BlockLoader;
@ -136,6 +137,7 @@ public class OrdinalsGroupingOperator implements Operator {
requireNonNull(page, "page is null");
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
final int shardIndex = docVector.shards().getInt(0);
RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex);
final var blockLoader = blockLoaders.apply(shardIndex);
boolean pagePassed = false;
try {
@ -150,7 +152,8 @@ public class OrdinalsGroupingOperator implements Operator {
driverContext.blockFactory(),
this::createGroupingAggregators,
() -> blockLoader.ordinals(shardContexts.get(k.shardIndex).reader().leaves().get(k.segmentIndex)),
driverContext.bigArrays()
driverContext.bigArrays(),
shardRefCounter
);
} catch (IOException e) {
throw new UncheckedIOException(e);
@ -343,15 +346,19 @@ public class OrdinalsGroupingOperator implements Operator {
private final List<GroupingAggregator> aggregators;
private final CheckedSupplier<SortedSetDocValues, IOException> docValuesSupplier;
private final BitArray visitedOrds;
private final RefCounted shardRefCounted;
private BlockOrdinalsReader currentReader;
OrdinalSegmentAggregator(
BlockFactory blockFactory,
Supplier<List<GroupingAggregator>> aggregatorsSupplier,
CheckedSupplier<SortedSetDocValues, IOException> docValuesSupplier,
BigArrays bigArrays
BigArrays bigArrays,
RefCounted shardRefCounted
) throws IOException {
boolean success = false;
this.shardRefCounted = shardRefCounted;
this.shardRefCounted.mustIncRef();
List<GroupingAggregator> groupingAggregators = null;
BitArray bitArray = null;
try {
@ -368,6 +375,9 @@ public class OrdinalsGroupingOperator implements Operator {
if (success == false) {
if (bitArray != null) Releasables.close(bitArray);
if (groupingAggregators != null) Releasables.close(groupingAggregators);
// There is no danger of double decRef here, since this decRef is called only if the constructor throws, so it would be
// impossible to call close on the instance.
shardRefCounted.decRef();
}
}
}
@ -447,7 +457,7 @@ public class OrdinalsGroupingOperator implements Operator {
@Override
public void close() {
Releasables.close(visitedOrds, () -> Releasables.close(aggregators));
Releasables.close(visitedOrds, () -> Releasables.close(aggregators), Releasables.fromRefCounted(shardRefCounted));
}
}

View file

@ -21,6 +21,8 @@ import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.lucene.ShardContext;
import org.elasticsearch.compute.lucene.ShardRefCounted;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.Warnings;
import org.elasticsearch.core.Releasables;
@ -37,6 +39,7 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
private final BlockFactory blockFactory;
private final QueryList queryList;
private int queryPosition = -1;
private final ShardContext shardContext;
private final IndexReader indexReader;
private final IndexSearcher searcher;
private final Warnings warnings;
@ -49,14 +52,16 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
BlockFactory blockFactory,
int maxPageSize,
QueryList queryList,
IndexReader indexReader,
ShardContext shardContext,
Warnings warnings
) {
this.blockFactory = blockFactory;
this.maxPageSize = maxPageSize;
this.queryList = queryList;
this.indexReader = indexReader;
this.searcher = new IndexSearcher(indexReader);
this.shardContext = shardContext;
this.shardContext.incRef();
this.searcher = shardContext.searcher();
this.indexReader = searcher.getIndexReader();
this.warnings = warnings;
}
@ -142,7 +147,10 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
segmentsVector = segmentsBuilder.build();
}
docsVector = docsBuilder.build();
page = new Page(new DocVector(shardsVector, segmentsVector, docsVector, null).asBlock(), positionsVector.asBlock());
page = new Page(
new DocVector(ShardRefCounted.fromShardContext(shardContext), shardsVector, segmentsVector, docsVector, null).asBlock(),
positionsVector.asBlock()
);
} finally {
if (page == null) {
Releasables.close(positionsBuilder, segmentsVector, docsBuilder, positionsVector, shardsVector, docsVector);
@ -185,6 +193,6 @@ public final class EnrichQuerySourceOperator extends SourceOperator {
@Override
public void close() {
this.shardContext.decRef();
}
}

View file

@ -11,6 +11,8 @@ import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
/**
@ -33,6 +35,12 @@ interface ResultBuilder extends Releasable {
*/
void decodeValue(BytesRef values);
/**
* Sets the RefCounted value, which was extracted by {@link ValueExtractor#getRefCountedForShard(int)}. By default, this is a no-op,
* since most builders do not the shard ref counter.
*/
default void setNextRefCounted(@Nullable RefCounted nextRefCounted) { /* no-op */ }
/**
* Build the result block.
*/

View file

@ -12,14 +12,22 @@ import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.lucene.ShardRefCounted;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasables;
import java.util.HashMap;
import java.util.Map;
class ResultBuilderForDoc implements ResultBuilder {
private final BlockFactory blockFactory;
private final int[] shards;
private final int[] segments;
private final int[] docs;
private int position;
private @Nullable RefCounted nextRefCounted;
private final Map<Integer, RefCounted> refCounted = new HashMap<>();
ResultBuilderForDoc(BlockFactory blockFactory, int positions) {
// TODO use fixed length builders
@ -34,12 +42,24 @@ class ResultBuilderForDoc implements ResultBuilder {
throw new AssertionError("_doc can't be a key");
}
@Override
public void setNextRefCounted(RefCounted nextRefCounted) {
this.nextRefCounted = nextRefCounted;
// Since rows can be closed before build is called, we need to increment the ref count to ensure the shard context isn't closed.
this.nextRefCounted.mustIncRef();
}
@Override
public void decodeValue(BytesRef values) {
if (nextRefCounted == null) {
throw new IllegalStateException("setNextRefCounted must be set before each decodeValue call");
}
shards[position] = TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values);
segments[position] = TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values);
docs[position] = TopNEncoder.DEFAULT_UNSORTABLE.decodeInt(values);
refCounted.putIfAbsent(shards[position], nextRefCounted);
position++;
nextRefCounted = null;
}
@Override
@ -51,16 +71,26 @@ class ResultBuilderForDoc implements ResultBuilder {
shardsVector = blockFactory.newIntArrayVector(shards, position);
segmentsVector = blockFactory.newIntArrayVector(segments, position);
var docsVector = blockFactory.newIntArrayVector(docs, position);
var docsBlock = new DocVector(shardsVector, segmentsVector, docsVector, null).asBlock();
var docsBlock = new DocVector(new ShardRefCountedMap(refCounted), shardsVector, segmentsVector, docsVector, null).asBlock();
success = true;
return docsBlock;
} finally {
// The DocVector constructor already incremented the relevant RefCounted, so we can now decrement them since we incremented them
// in setNextRefCounted.
refCounted.values().forEach(RefCounted::decRef);
if (success == false) {
Releasables.closeExpectNoException(shardsVector, segmentsVector);
}
}
}
private record ShardRefCountedMap(Map<Integer, RefCounted> refCounters) implements ShardRefCounted {
@Override
public RefCounted get(int shardId) {
return refCounters.get(shardId);
}
}
@Override
public String toString() {
return "ValueExtractorForDoc";

View file

@ -15,11 +15,14 @@ import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.compute.operator.DriverContext;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
@ -71,6 +74,21 @@ public class TopNOperator implements Operator, Accountable {
*/
final BreakingBytesRefBuilder values;
/**
* Reference counter for the shard this row belongs to, used for rows containing a {@link DocVector} to ensure that the shard
* context before we build the final result.
*/
@Nullable
RefCounted shardRefCounter;
void setShardRefCountersAndShard(RefCounted shardRefCounter) {
if (this.shardRefCounter != null) {
this.shardRefCounter.decRef();
}
this.shardRefCounter = shardRefCounter;
this.shardRefCounter.mustIncRef();
}
Row(CircuitBreaker breaker, List<SortOrder> sortOrders, int preAllocatedKeysSize, int preAllocatedValueSize) {
boolean success = false;
try {
@ -92,8 +110,16 @@ public class TopNOperator implements Operator, Accountable {
@Override
public void close() {
clearRefCounters();
Releasables.closeExpectNoException(keys, values, bytesOrder);
}
public void clearRefCounters() {
if (shardRefCounter != null) {
shardRefCounter.decRef();
}
shardRefCounter = null;
}
}
static final class BytesOrder implements Releasable, Accountable {
@ -174,7 +200,7 @@ public class TopNOperator implements Operator, Accountable {
*/
void row(int position, Row destination) {
writeKey(position, destination);
writeValues(position, destination.values);
writeValues(position, destination);
}
private void writeKey(int position, Row row) {
@ -187,9 +213,13 @@ public class TopNOperator implements Operator, Accountable {
}
}
private void writeValues(int position, BreakingBytesRefBuilder values) {
private void writeValues(int position, Row destination) {
for (ValueExtractor e : valueExtractors) {
e.writeValue(values, position);
var refCounted = e.getRefCountedForShard(position);
if (refCounted != null) {
destination.setShardRefCountersAndShard(refCounted);
}
e.writeValue(destination.values, position);
}
}
}
@ -376,6 +406,7 @@ public class TopNOperator implements Operator, Accountable {
} else {
spare.keys.clear();
spare.values.clear();
spare.clearRefCounters();
}
rowFiller.row(i, spare);
@ -456,6 +487,7 @@ public class TopNOperator implements Operator, Accountable {
BytesRef values = row.values.bytesRefView();
for (ResultBuilder builder : builders) {
builder.setNextRefCounted(row.shardRefCounter);
builder.decodeValue(values);
}
if (values.length != 0) {
@ -463,7 +495,6 @@ public class TopNOperator implements Operator, Accountable {
}
list.set(i, null);
row.close();
p++;
if (p == size) {
@ -481,6 +512,8 @@ public class TopNOperator implements Operator, Accountable {
Releasables.closeExpectNoException(builders);
builders = null;
}
// It's important to close the row only after we build the new block, so we don't pre-release any shard counter.
row.close();
}
assert builders == null;
success = true;

View file

@ -18,6 +18,8 @@ import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.RefCounted;
/**
* Extracts values into a {@link BreakingBytesRefBuilder}.
@ -25,6 +27,15 @@ import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
interface ValueExtractor {
void writeValue(BreakingBytesRefBuilder values, int position);
/**
* This should return a non-null value if the row is supposed to hold a temporary reference to a shard (including incrementing and
* decrementing it) in between encoding and decoding the row values.
*/
@Nullable
default RefCounted getRefCountedForShard(int position) {
return null;
}
static ValueExtractor extractorFor(ElementType elementType, TopNEncoder encoder, boolean inKey, Block block) {
if (false == (elementType == block.elementType() || ElementType.NULL == block.elementType())) {
// While this maybe should be an IllegalArgumentException, it's important to throw an exception that causes a 500 response.

View file

@ -9,15 +9,25 @@ package org.elasticsearch.compute.operator.topn;
import org.elasticsearch.compute.data.DocVector;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.core.RefCounted;
class ValueExtractorForDoc implements ValueExtractor {
private final DocVector vector;
@Override
public RefCounted getRefCountedForShard(int position) {
return vector().shardRefCounted().get(vector().shards().getInt(position));
}
ValueExtractorForDoc(TopNEncoder encoder, DocVector vector) {
assert encoder == TopNEncoder.DEFAULT_UNSORTABLE;
this.vector = vector;
}
DocVector vector() {
return vector;
}
@Override
public void writeValue(BreakingBytesRefBuilder values, int position) {
TopNEncoder.DEFAULT_UNSORTABLE.encodeInt(vector.shards().getInt(position), values);

View file

@ -14,7 +14,7 @@ import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.List;
@ -36,7 +36,7 @@ public class CountDistinctLongGroupingAggregatorFunctionTests extends GroupingAg
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomGroupId(size), randomLongBetween(0, 100_000)))
);

View file

@ -15,7 +15,7 @@ import org.elasticsearch.compute.data.LongVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.LongDoubleTupleBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.List;
@ -37,7 +37,7 @@ public class CountGroupingAggregatorFunctionTests extends GroupingAggregatorFunc
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
if (randomBoolean()) {
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLong()))
);

View file

@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.List;
@ -34,7 +34,7 @@ public class MaxLongGroupingAggregatorFunctionTests extends GroupingAggregatorFu
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLong()))
);

View file

@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.ArrayList;
@ -42,7 +42,7 @@ public class MedianAbsoluteDeviationLongGroupingAggregatorFunctionTests extends
values.add(Tuple.tuple((long) i, v));
}
}
return new TupleBlockSourceOperator(blockFactory, values.subList(0, Math.min(values.size(), end)));
return new TupleLongLongBlockSourceOperator(blockFactory, values.subList(0, Math.min(values.size(), end)));
}
@Override

View file

@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.List;
@ -34,7 +34,7 @@ public class MinLongGroupingAggregatorFunctionTests extends GroupingAggregatorFu
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLong()))
);

View file

@ -13,7 +13,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.junit.Before;
@ -45,7 +45,7 @@ public class PercentileLongGroupingAggregatorFunctionTests extends GroupingAggre
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
long max = randomLongBetween(1, Long.MAX_VALUE / size / 5);
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLongBetween(-0, max)))
);

View file

@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.List;
@ -34,7 +34,7 @@ public class SumLongGroupingAggregatorFunctionTests extends GroupingAggregatorFu
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
long max = randomLongBetween(1, Long.MAX_VALUE / size / 5);
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLongBetween(-max, max)))
);

View file

@ -12,7 +12,7 @@ import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.TupleBlockSourceOperator;
import org.elasticsearch.compute.operator.TupleLongLongBlockSourceOperator;
import org.elasticsearch.core.Tuple;
import java.util.Arrays;
@ -38,7 +38,7 @@ public class ValuesLongGroupingAggregatorFunctionTests extends GroupingAggregato
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new TupleBlockSourceOperator(
return new TupleLongLongBlockSourceOperator(
blockFactory,
LongStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), randomLong()))
);

Some files were not shown because too many files have changed in this diff Show more