Merge revision 0d57e1025e into multi-project

This commit is contained in:
Tim Vernum 2024-12-11 15:25:03 +11:00
commit c57d41d0ff
70 changed files with 1705 additions and 428 deletions

View file

@ -13,8 +13,8 @@ package org.elasticsearch.gradle.internal;
* This class models the different Docker base images that are used to build Docker distributions of Elasticsearch.
*/
public enum DockerBase {
// "latest" here is intentional, since the image name specifies "8"
DEFAULT("docker.elastic.co/ubi8/ubi-minimal:latest", "", "microdnf"),
// "latest" here is intentional, since the image name specifies "9"
DEFAULT("docker.elastic.co/ubi9/ubi-minimal:latest", "", "microdnf"),
// The Iron Bank base image is UBI (albeit hardened), but we are required to parameterize the Docker build
IRON_BANK("${BASE_REGISTRY}/${BASE_IMAGE}:${BASE_TAG}", "-ironbank", "yum"),

View file

@ -70,10 +70,10 @@ tasks.register("generateDependenciesReport", ConcatFilesTask) {
// Explicitly add the dependency on the RHEL UBI Docker base image
String[] rhelUbiFields = [
'Red Hat Universal Base Image minimal',
'8',
'https://catalog.redhat.com/software/containers/ubi8/ubi-minimal/5c359a62bed8bd75a2c3fba8',
'9',
'https://catalog.redhat.com/software/containers/ubi9-minimal/61832888c0d15aff4912fe0d',
'Custom;https://www.redhat.com/licenses/EULA_Red_Hat_Universal_Base_Image_English_20190422.pdf',
'https://oss-dependencies.elastic.co/red-hat-universal-base-image-minimal/8/ubi-minimal-8-source.tar.gz'
'https://oss-dependencies.elastic.co/red-hat-universal-base-image-minimal/9/ubi-minimal-9-source.tar.gz'
]
additionalLines << rhelUbiFields.join(',')
}

View file

@ -76,6 +76,12 @@ https://platform.openai.com/api-keys[API keys section].
include::inference-shared.asciidoc[tag=api-key-admonition]
--
`dimensions`:::
(Optional, integer)
The number of dimensions the resulting output embeddings should have.
Only supported in `text-embedding-3` and later models.
If not set the OpenAI defined default for the model is used.
`model_id`:::
(Required, string)
The name of the model to use for the {infer} task.
@ -134,8 +140,8 @@ Specifies the user issuing the request, which can be used for abuse detection.
[[inference-example-openai]]
==== OpenAI service example
The following example shows how to create an {infer} endpoint called
`openai-embeddings` to perform a `text_embedding` task type.
The following example shows how to create an {infer} endpoint called `openai-embeddings` to perform a `text_embedding` task type.
The embeddings created by requests to this endpoint will have 128 dimensions.
[source,console]
------------------------------------------------------------
@ -144,14 +150,14 @@ PUT _inference/text_embedding/openai-embeddings
"service": "openai",
"service_settings": {
"api_key": "<api_key>",
"model_id": "text-embedding-ada-002"
"model_id": "text-embedding-3-small",
"dimensions": 128
}
}
------------------------------------------------------------
// TEST[skip:TBD]
The next example shows how to create an {infer} endpoint called
`openai-completion` to perform a `completion` task type.
The next example shows how to create an {infer} endpoint called `openai-completion` to perform a `completion` task type.
[source,console]
------------------------------------------------------------

View file

@ -42,6 +42,34 @@ if (buildParams.isSnapshotBuild() == false) {
tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("data_stream/10_basic/Create hidden data stream", "warning does not exist for compatibility")
// Failure store configuration changed on 8.18 (earlier versions behind feature flag)
task.skipTest("data_stream/10_basic/Create data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/10_basic/Delete data stream with failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/10_basic/Delete data stream with failure store uninitialized", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/30_auto_create_data_stream/Don't initialize failure store during data stream auto-creation on successful index", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/150_tsdb/TSDB failures go to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/170_modify_data_stream/Modify a data stream's failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if final pipeline changes target", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Ensure failure is redirected to correct failure store after a reroute processor", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Test failure store status with bulk request", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Redirect ingest failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when pipeline loop is detected", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to correct failure store when index loop is detected", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Failure redirects to original failure store during index change if self referenced", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Redirect shard failure in data stream to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/190_failure_store_redirection/Version conflicts are not redirected to failure store", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after an ingest failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/A failure store marked for lazy rollover should only be rolled over when there is a failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store without conditions", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Lazily roll over a data stream's failure store after a shard failure", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Don't roll over a data stream's failure store when conditions aren't met", "Configuring the failure store via data stream templates is not supported anymore.")
task.skipTest("data_stream/200_rollover_failure_store/Roll over a data stream's failure store with conditions", "Configuring the failure store via data stream templates is not supported anymore.")
})
configurations {

View file

@ -67,6 +67,7 @@ import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadataStats;
import org.elasticsearch.cluster.metadata.IndexWriteLoad;
@ -2447,9 +2448,10 @@ public class DataStreamIT extends ESIntegTestCase {
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.aliases(aliases)
.lifecycle(lifecycle)
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();

View file

@ -21,6 +21,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
@ -165,8 +166,8 @@ public class FailureStoreMetricsWithIncrementalBulkIT extends ESIntegTestCase {
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(DATA_STREAM_NAME + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
.template(new Template(null, new CompressedXContent("""
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().mappings(new CompressedXContent("""
{
"dynamic": false,
"properties": {
@ -177,7 +178,7 @@ public class FailureStoreMetricsWithIncrementalBulkIT extends ESIntegTestCase {
"type": "long"
}
}
}"""), null))
}""")).dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.build()
);
assertAcked(safeGet(client().execute(TransportPutComposableIndexTemplateAction.TYPE, request)));

View file

@ -23,6 +23,7 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.compress.CompressedXContent;
@ -283,8 +284,8 @@ public class IngestFailureStoreMetricsIT extends ESIntegTestCase {
request.indexTemplate(
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStream + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, failureStore))
.template(new Template(null, new CompressedXContent("""
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().mappings(new CompressedXContent("""
{
"dynamic": false,
"properties": {
@ -295,7 +296,7 @@ public class IngestFailureStoreMetricsIT extends ESIntegTestCase {
"type": "long"
}
}
}"""), null))
}""")).dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(failureStore)))
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();

View file

@ -40,6 +40,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAction;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -1226,9 +1227,10 @@ public class DataStreamLifecycleServiceIT extends ESIntegTestCase {
.settings(settings)
.mappings(mappings == null ? null : CompressedXContent.fromJSON(mappings))
.lifecycle(lifecycle)
.dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(withFailureStore))
)
.metadata(metadata)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, withFailureStore))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();

View file

@ -11,12 +11,14 @@ package org.elasticsearch.datastreams;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
@ -40,10 +42,14 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"data_stream": {
"failure_store": true
}
}
""");
@ -59,12 +65,63 @@ public class DataStreamOptionsIT extends DisabledSecurityDataStreamTestCase {
assertThat(dataStreams.size(), is(1));
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
assertThat(dataStream.get("name"), equalTo(DATA_STREAM_NAME));
assertThat(((Map<String, Object>) dataStream.get("failure_store")).get("enabled"), is(true));
List<String> backingIndices = getIndices(dataStream);
assertThat(backingIndices.size(), is(1));
List<String> failureStore = getFailureStore(dataStream);
assertThat(failureStore.size(), is(1));
}
public void testExplicitlyResetDataStreamOptions() throws IOException {
Request putComponentTemplateRequest = new Request("POST", "/_component_template/with-options");
putComponentTemplateRequest.setJsonEntity("""
{
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
}
}
""");
assertOK(client().performRequest(putComponentTemplateRequest));
Request invalidRequest = new Request("POST", "/_index_template/other-template");
invalidRequest.setJsonEntity("""
{
"index_patterns": ["something-else"],
"composed_of" : ["with-options"],
"template": {
"settings": {
"number_of_replicas": 0
}
}
}
""");
Exception error = expectThrows(ResponseException.class, () -> client().performRequest(invalidRequest));
assertThat(
error.getMessage(),
containsString("specifies data stream options that can only be used in combination with a data stream")
);
// Check that when we nullify the data stream options we can create use any component template in a non data stream template
Request otherRequest = new Request("POST", "/_index_template/other-template");
otherRequest.setJsonEntity("""
{
"index_patterns": ["something-else"],
"composed_of" : ["with-options"],
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": null
}
}
""");
assertOK(client().performRequest(otherRequest));
}
public void testEnableDisableFailureStore() throws IOException {
{
assertAcknowledged(client().performRequest(new Request("DELETE", "/_data_stream/" + DATA_STREAM_NAME + "/_options")));

View file

@ -42,10 +42,14 @@ public class FailureStoreQueryParamIT extends DisabledSecurityDataStreamTestCase
"template": {
"settings": {
"number_of_replicas": 0
},
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
},
"data_stream": {
"failure_store": true
}
}
""");

View file

@ -212,8 +212,12 @@ setup:
---
"Create data stream with failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores default settings changed in 8.15+"
test_runner_features: [ capabilities, allowed_warnings ]
reason: "Data stream failure stores config in templates was added in 8.18+"
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
ingest.put_pipeline:
@ -256,8 +260,7 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1, failure-data-stream2 ]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
@ -269,6 +272,9 @@ setup:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:
@ -632,8 +638,12 @@ setup:
---
"Delete data stream with failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ allowed_warnings, capabilities ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
@ -642,8 +652,7 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true
data_stream: {}
template:
mappings:
properties:
@ -651,6 +660,9 @@ setup:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:
@ -722,8 +734,12 @@ setup:
---
"Delete data stream with failure store uninitialized":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
@ -732,8 +748,11 @@ setup:
name: my-template4
body:
index_patterns: [ failure-data-stream1 ]
data_stream:
failure_store: true
data_stream: {}
template:
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:

View file

@ -185,9 +185,12 @@ index without timestamp:
---
TSDB failures go to failure store:
- requires:
cluster_features: ["data_stream.failure_store.tsdb_fix"]
reason: "tests tsdb failure store fixes in 8.16.0 that catch timestamp errors that happen earlier in the process and redirect them to the failure store."
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [fs-k8s*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
@ -195,8 +198,7 @@ TSDB failures go to failure store:
name: my-template2
body:
index_patterns: [ "fs-k8s*" ]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
@ -207,6 +209,9 @@ TSDB failures go to failure store:
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
data_stream_options:
failure_store:
enabled: true
mappings:
properties:
"@timestamp":

View file

@ -92,9 +92,12 @@
---
"Modify a data stream's failure store":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [ "allowed_warnings" ]
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
@ -103,8 +106,7 @@
name: my-template
body:
index_patterns: [data-*]
data_stream:
failure_store: true
data_stream: {}
template:
mappings:
properties:
@ -112,6 +114,9 @@
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:

View file

@ -1,3 +1,15 @@
setup:
- requires:
reason: "Data stream options was added in 8.18+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
---
teardown:
- do:
@ -32,13 +44,6 @@ teardown:
---
"Redirect ingest failure in data stream to failure store":
- requires:
reason: "Failure store status was added in 8.16+"
test_runner_features: [capabilities, allowed_warnings, contains]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
@ -78,14 +83,16 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "parent_failing_pipeline"
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -147,14 +154,6 @@ teardown:
---
"Redirect shard failure in data stream to failure store":
- requires:
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
@ -162,8 +161,7 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
@ -174,7 +172,9 @@ teardown:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -231,13 +231,6 @@ teardown:
---
"Ensure failure is redirected to correct failure store after a reroute processor":
- requires:
test_runner_features: [allowed_warnings, capabilities]
reason: "Failure store status was added in 8.16+"
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
@ -262,14 +255,16 @@ teardown:
name: destination_template
body:
index_patterns: destination-data-stream
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "failing_pipeline"
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:
@ -331,11 +326,6 @@ teardown:
---
"Failure redirects to original failure store during index change if self referenced":
- requires:
cluster_features: [ "gte_v8.15.0" ]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [ allowed_warnings, contains ]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
@ -365,14 +355,16 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "failing_pipeline"
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -430,14 +422,6 @@ teardown:
---
"Failure redirects to original failure store during index change if final pipeline changes target":
- requires:
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "change_index_pipeline"
@ -462,14 +446,16 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
final_pipeline: "change_index_pipeline"
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -526,14 +512,6 @@ teardown:
---
"Failure redirects to correct failure store when index loop is detected":
- requires:
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "send_to_destination"
@ -575,14 +553,16 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "send_to_destination"
data_stream_options:
failure_store:
enabled: true
- do:
allowed_warnings:
@ -591,14 +571,16 @@ teardown:
name: destination_logs_template
body:
index_patterns: destination-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "send_back_to_original"
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -657,14 +639,6 @@ teardown:
---
"Failure redirects to correct failure store when pipeline loop is detected":
- requires:
reason: "Failure store status was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings, contains ]
capabilities:
- method: POST
path: /{index}/_doc
capabilities: [ 'failure_store_status' ]
- do:
ingest.put_pipeline:
id: "step_1"
@ -706,14 +680,16 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
index:
default_pipeline: "step_1"
data_stream_options:
failure_store:
enabled: true
- do:
index:
@ -773,9 +749,6 @@ teardown:
---
"Version conflicts are not redirected to failure store":
- requires:
test_runner_features: [ allowed_warnings]
- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
@ -783,8 +756,7 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
@ -795,6 +767,9 @@ teardown:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
bulk:
@ -812,17 +787,6 @@ teardown:
---
"Test failure store status with bulk request":
- requires:
test_runner_features: [ allowed_warnings, capabilities ]
reason: "Failure store status was added in 8.16+"
capabilities:
- method: POST
path: /_bulk
capabilities: [ 'failure_store_status' ]
- method: PUT
path: /_bulk
capabilities: [ 'failure_store_status' ]
- do:
allowed_warnings:
- "index template [generic_logs_template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [generic_logs_template] will take precedence during new index creation"
@ -830,8 +794,7 @@ teardown:
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
@ -842,6 +805,9 @@ teardown:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
allowed_warnings:
- "index template [no-fs] has index patterns [no-fs*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [no-fs] will take precedence during new index creation"
@ -849,8 +815,7 @@ teardown:
name: no-fs
body:
index_patterns: no-fs*
data_stream:
failure_store: false
data_stream: {}
template:
settings:
number_of_shards: 1
@ -861,6 +826,9 @@ teardown:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: false
- do:

View file

@ -1,9 +1,15 @@
---
setup:
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: [allowed_warnings, contains, capabilities]
reason: "Data stream failure stores config in templates was added in 8.16+"
test_runner_features: [ capabilities, allowed_warnings ]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- method: POST
path: /{index}/_rollover
capabilities: [ 'lazy-rollover-failure-store' ]
- do:
allowed_warnings:
@ -12,8 +18,7 @@ setup:
name: my-template
body:
index_patterns: [data-*]
data_stream:
failure_store: true
data_stream: {}
template:
mappings:
properties:
@ -21,6 +26,9 @@ setup:
type: date
count:
type: long
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:
@ -145,14 +153,6 @@ teardown:
---
"Lazily roll over a data stream's failure store after a shard failure":
- requires:
reason: "data stream failure store lazy rollover only supported in 8.15+"
test_runner_features: [allowed_warnings, capabilities]
capabilities:
- method: POST
path: /{index}/_rollover
capabilities: [lazy-rollover-failure-store]
# Initialize failure store
- do:
index:
@ -215,14 +215,6 @@ teardown:
---
"Lazily roll over a data stream's failure store after an ingest failure":
- requires:
reason: "data stream failure store lazy rollover only supported in 8.15+"
test_runner_features: [allowed_warnings, capabilities]
capabilities:
- method: POST
path: /{index}/_rollover
capabilities: [lazy-rollover-failure-store]
- do:
ingest.put_pipeline:
id: "failing_pipeline"
@ -246,12 +238,14 @@ teardown:
name: my-template
body:
index_patterns: [data-*]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
default_pipeline: "failing_pipeline"
data_stream_options:
failure_store:
enabled: true
- do:
indices.create_data_stream:

View file

@ -50,9 +50,12 @@
---
"Don't initialize failure store during data stream auto-creation on successful index":
- requires:
cluster_features: ["gte_v8.15.0"]
reason: "data stream failure stores REST structure changed in 8.15+"
test_runner_features: allowed_warnings
reason: "Data stream failure stores config in templates was added in 8.18+"
test_runner_features: [allowed_warnings, capabilities]
capabilities:
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
- do:
allowed_warnings:
@ -61,12 +64,14 @@
name: generic_logs_template
body:
index_patterns: logs-*
data_stream:
failure_store: true
data_stream: {}
template:
settings:
number_of_shards: 1
number_of_replicas: 1
data_stream_options:
failure_store:
enabled: true
- do:
index:

View file

@ -16,7 +16,14 @@ import org.elasticsearch.internal.VersionExtension;
import org.elasticsearch.plugins.ExtensionLoader;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Represents the version of the wire protocol used to communicate between a pair of ES nodes.
@ -56,8 +63,14 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
return fromId(in.readVInt());
}
/**
* Finds a {@code TransportVersion} by its id.
* If a transport version with the specified ID does not exist,
* this method creates and returns a new instance of {@code TransportVersion} with the specified ID.
* The new instance is not registered in {@code TransportVersion.getAllVersions}.
*/
public static TransportVersion fromId(int id) {
TransportVersion known = TransportVersions.VERSION_IDS.get(id);
TransportVersion known = VersionsHolder.ALL_VERSIONS_MAP.get(id);
if (known != null) {
return known;
}
@ -95,7 +108,14 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
* This should be the transport version with the highest id.
*/
public static TransportVersion current() {
return CurrentHolder.CURRENT;
return VersionsHolder.CURRENT;
}
/**
* Sorted list of all defined transport versions
*/
public static List<TransportVersion> getAllVersions() {
return VersionsHolder.ALL_VERSIONS;
}
public static TransportVersion fromString(String str) {
@ -139,16 +159,25 @@ public record TransportVersion(int id) implements VersionId<TransportVersion> {
return Integer.toString(id);
}
private static class CurrentHolder {
private static final TransportVersion CURRENT = findCurrent();
private static class VersionsHolder {
private static final List<TransportVersion> ALL_VERSIONS;
private static final Map<Integer, TransportVersion> ALL_VERSIONS_MAP;
private static final TransportVersion CURRENT;
// finds the pluggable current version
private static TransportVersion findCurrent() {
var version = ExtensionLoader.loadSingleton(ServiceLoader.load(VersionExtension.class))
.map(e -> e.getCurrentTransportVersion(TransportVersions.LATEST_DEFINED))
.orElse(TransportVersions.LATEST_DEFINED);
assert version.onOrAfter(TransportVersions.LATEST_DEFINED);
return version;
static {
Collection<TransportVersion> extendedVersions = ExtensionLoader.loadSingleton(ServiceLoader.load(VersionExtension.class))
.map(VersionExtension::getTransportVersions)
.orElse(Collections.emptyList());
if (extendedVersions.isEmpty()) {
ALL_VERSIONS = TransportVersions.DEFINED_VERSIONS;
} else {
ALL_VERSIONS = Stream.concat(TransportVersions.DEFINED_VERSIONS.stream(), extendedVersions.stream()).sorted().toList();
}
ALL_VERSIONS_MAP = ALL_VERSIONS.stream().collect(Collectors.toUnmodifiableMap(TransportVersion::id, Function.identity()));
CURRENT = ALL_VERSIONS.getLast();
}
}
}

View file

@ -14,13 +14,12 @@ import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.core.UpdateForV9;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.IntFunction;
@ -141,6 +140,7 @@ public class TransportVersions {
public static final TransportVersion SOURCE_MODE_TELEMETRY = def(8_802_00_0);
public static final TransportVersion NEW_REFRESH_CLUSTER_BLOCK = def(8_803_00_0);
public static final TransportVersion RETRIES_AND_OPERATIONS_IN_BLOBSTORE_STATS = def(8_804_00_0);
public static final TransportVersion ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES = def(8_805_00_0);
/*
* WARNING: DO NOT MERGE INTO MAIN!
@ -220,21 +220,24 @@ public class TransportVersions {
*/
public static final TransportVersion MINIMUM_CCS_VERSION = V_8_15_0;
static final NavigableMap<Integer, TransportVersion> VERSION_IDS = getAllVersionIds(TransportVersions.class);
/**
* Sorted list of all versions defined in this class
*/
static final List<TransportVersion> DEFINED_VERSIONS = collectAllVersionIdsDefinedInClass(TransportVersions.class);
// the highest transport version constant defined in this file, used as a fallback for TransportVersion.current()
// the highest transport version constant defined
static final TransportVersion LATEST_DEFINED;
static {
LATEST_DEFINED = VERSION_IDS.lastEntry().getValue();
LATEST_DEFINED = DEFINED_VERSIONS.getLast();
// see comment on IDS field
// now we're registered all the transport versions, we can clear the map
IDS = null;
}
public static NavigableMap<Integer, TransportVersion> getAllVersionIds(Class<?> cls) {
public static List<TransportVersion> collectAllVersionIdsDefinedInClass(Class<?> cls) {
Map<Integer, String> versionIdFields = new HashMap<>();
NavigableMap<Integer, TransportVersion> builder = new TreeMap<>();
List<TransportVersion> definedTransportVersions = new ArrayList<>();
Set<String> ignore = Set.of("ZERO", "CURRENT", "MINIMUM_COMPATIBLE", "MINIMUM_CCS_VERSION");
@ -251,7 +254,7 @@ public class TransportVersions {
} catch (IllegalAccessException e) {
throw new AssertionError(e);
}
builder.put(version.id(), version);
definedTransportVersions.add(version);
if (Assertions.ENABLED) {
// check the version number is unique
@ -268,11 +271,9 @@ public class TransportVersions {
}
}
return Collections.unmodifiableNavigableMap(builder);
}
Collections.sort(definedTransportVersions);
static Collection<TransportVersion> getAllVersions() {
return VERSION_IDS.values();
return List.copyOf(definedTransportVersions);
}
static final IntFunction<String> VERSION_LOOKUP = ReleaseVersions.generateVersionsLookup(TransportVersions.class, LATEST_DEFINED.id());

View file

@ -13,6 +13,7 @@ import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.rollover.RolloverConfiguration;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention;
import org.elasticsearch.cluster.metadata.ResettableValue;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -111,9 +112,9 @@ public class SimulateIndexTemplateResponse extends ActionResponse implements ToX
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (this.resolvedTemplate != null) {
if (resolvedTemplate != null) {
builder.field(TEMPLATE.getPreferredName());
this.resolvedTemplate.toXContent(builder, params, rolloverConfiguration);
resolvedTemplate.toXContent(builder, ResettableValue.hideResetValues(params), rolloverConfiguration);
}
if (this.overlappingTemplates != null) {
builder.startArray(OVERLAPPING.getPreferredName());

View file

@ -62,6 +62,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.isDataStrea
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findConflictingV1Templates;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findConflictingV2Templates;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveDataStreamOptions;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveLifecycle;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.resolveSettings;
@ -352,7 +353,13 @@ public class TransportSimulateIndexTemplateAction extends TransportMasterNodeRea
if (template.getDataStreamTemplate() != null && lifecycle == null && isDslOnlyMode) {
lifecycle = DataStreamLifecycle.DEFAULT;
}
return new Template(settings, mergedMapping, aliasesByName, lifecycle);
return new Template(
settings,
mergedMapping,
aliasesByName,
lifecycle,
resolveDataStreamOptions(simulatedState.metadata(), matchingTemplate)
);
}
private static IndexLongFieldRange getEventIngestedRange(String indexName, ClusterState simulatedState) {

View file

@ -35,6 +35,7 @@ import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
@ -43,6 +44,7 @@ import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
@ -667,12 +669,16 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
}
/**
* Determines if an index name is associated with an index template that has a data stream failure store enabled.
* Determines if an index name is associated with an index template that has a data stream failure store enabled. Since failure store is
* a data stream feature, the method returns true/false only if it is a data stream template, otherwise null.
* @param indexName The index name to check.
* @param projectMetadata ProjectMetadata from the cluster state.
* @return true if the given index name corresponds to an index template with a data stream failure store enabled.
* @return true the associated index template has failure store enabled, false if the failure store is disabled or it's not specified,
* and null if the template is not a data stream template.
* Visible for testing
*/
private static Boolean resolveFailureStoreFromTemplate(String indexName, ProjectMetadata projectMetadata) {
@Nullable
static Boolean resolveFailureStoreFromTemplate(String indexName, ProjectMetadata projectMetadata) {
if (indexName == null) {
return null;
}
@ -685,7 +691,11 @@ public class TransportBulkAction extends TransportAbstractBulkAction {
ComposableIndexTemplate composableIndexTemplate = projectMetadata.templatesV2().get(template);
if (composableIndexTemplate.getDataStreamTemplate() != null) {
// Check if the data stream has the failure store enabled
return composableIndexTemplate.getDataStreamTemplate().hasFailureStore();
DataStreamOptions dataStreamOptions = MetadataIndexTemplateService.resolveDataStreamOptions(
composableIndexTemplate,
metadata.componentTemplates()
).mapAndGet(DataStreamOptions.Template::toDataStreamOptions);
return dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled();
}
}

View file

@ -19,6 +19,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xcontent.ConstructingObjectParser;
@ -372,16 +373,14 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
private static final ParseField HIDDEN = new ParseField("hidden");
private static final ParseField ALLOW_CUSTOM_ROUTING = new ParseField("allow_custom_routing");
// Remove this after this PR gets backported
@UpdateForV9(owner = UpdateForV9.Owner.DATA_MANAGEMENT)
private static final ParseField FAILURE_STORE = new ParseField("failure_store");
public static final ConstructingObjectParser<DataStreamTemplate, Void> PARSER = new ConstructingObjectParser<>(
"data_stream_template",
false,
args -> new DataStreamTemplate(
args[0] != null && (boolean) args[0],
args[1] != null && (boolean) args[1],
DataStream.isFailureStoreFeatureFlagEnabled() && args[2] != null && (boolean) args[2]
)
args -> new DataStreamTemplate(args[0] != null && (boolean) args[0], args[1] != null && (boolean) args[1])
);
static {
@ -394,20 +393,14 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
private final boolean hidden;
private final boolean allowCustomRouting;
private final boolean failureStore;
public DataStreamTemplate() {
this(false, false, false);
this(false, false);
}
public DataStreamTemplate(boolean hidden, boolean allowCustomRouting) {
this(hidden, allowCustomRouting, false);
}
public DataStreamTemplate(boolean hidden, boolean allowCustomRouting, boolean failureStore) {
this.hidden = hidden;
this.allowCustomRouting = allowCustomRouting;
this.failureStore = failureStore;
}
DataStreamTemplate(StreamInput in) throws IOException {
@ -425,10 +418,9 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
boolean value = in.readBoolean();
assert value == false : "expected false, because this used to be an optional enum that never got set";
}
if (in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
failureStore = in.readBoolean();
} else {
failureStore = false;
if (in.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) {
in.readBoolean();
}
}
@ -458,10 +450,6 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
return allowCustomRouting;
}
public boolean hasFailureStore() {
return failureStore;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(hidden);
@ -472,8 +460,11 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
// See comment in constructor.
out.writeBoolean(false);
}
if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
out.writeBoolean(failureStore);
if (out.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) {
// Previous versions expect the failure store to be configured via the DataStreamTemplate. We add it here, so we don't break
// the serialisation, but we do not care to preserve the value because this feature is still behind a feature flag.
out.writeBoolean(false);
}
}
@ -482,9 +473,6 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
builder.startObject();
builder.field("hidden", hidden);
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
builder.field(FAILURE_STORE.getPreferredName(), failureStore);
}
builder.endObject();
return builder;
}
@ -494,12 +482,12 @@ public class ComposableIndexTemplate implements SimpleDiffable<ComposableIndexTe
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DataStreamTemplate that = (DataStreamTemplate) o;
return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting && failureStore == that.failureStore;
return hidden == that.hidden && allowCustomRouting == that.allowCustomRouting;
}
@Override
public int hashCode() {
return Objects.hash(hidden, allowCustomRouting, failureStore);
return Objects.hash(hidden, allowCustomRouting);
}
}

View file

@ -424,7 +424,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
* @return true, if the user has explicitly enabled the failure store.
*/
public boolean isFailureStoreEnabled() {
return dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().isExplicitlyEnabled();
return dataStreamOptions.isFailureStoreEnabled();
}
@Nullable
@ -1189,6 +1189,7 @@ public final class DataStream implements SimpleDiffable<DataStream>, ToXContentO
);
// The fields behind the feature flag should always be last.
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
// Should be removed after backport
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), FAILURE_STORE_FIELD);
PARSER.declareObjectArray(
ConstructingObjectParser.optionalConstructorArg(),

View file

@ -14,7 +14,9 @@ import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -27,11 +29,13 @@ import java.io.IOException;
* supports the following configurations only explicitly enabling or disabling the failure store
*/
public record DataStreamFailureStore(Boolean enabled) implements SimpleDiffable<DataStreamFailureStore>, ToXContentObject {
public static final String FAILURE_STORE = "failure_store";
public static final String ENABLED = "enabled";
public static final ParseField ENABLED_FIELD = new ParseField("enabled");
public static final ParseField ENABLED_FIELD = new ParseField(ENABLED);
public static final ConstructingObjectParser<DataStreamFailureStore, Void> PARSER = new ConstructingObjectParser<>(
"failure_store",
FAILURE_STORE,
false,
(args, unused) -> new DataStreamFailureStore((Boolean) args[0])
);
@ -59,13 +63,6 @@ public record DataStreamFailureStore(Boolean enabled) implements SimpleDiffable<
return SimpleDiffable.readDiffFrom(DataStreamFailureStore::new, in);
}
/**
* @return iff the user has explicitly enabled the failure store
*/
public boolean isExplicitlyEnabled() {
return enabled != null && enabled;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(enabled);
@ -89,4 +86,80 @@ public record DataStreamFailureStore(Boolean enabled) implements SimpleDiffable<
public static DataStreamFailureStore fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
/**
* This class is only used in template configuration. It wraps the fields of {@link DataStreamFailureStore} with {@link ResettableValue}
* to allow a user to signal when they want to reset any previously encountered values during template composition. Furthermore, it
* provides the method {@link #merge(Template, Template)} that dictates how two templates can be composed.
*/
public record Template(ResettableValue<Boolean> enabled) implements Writeable, ToXContentObject {
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Template, Void> PARSER = new ConstructingObjectParser<>(
"failure_store_template",
false,
(args, unused) -> new Template(args[0] == null ? ResettableValue.undefined() : (ResettableValue<Boolean>) args[0])
);
static {
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL
? ResettableValue.reset()
: ResettableValue.create(p.booleanValue()),
ENABLED_FIELD,
ObjectParser.ValueType.BOOLEAN_OR_NULL
);
}
public Template {
if (enabled.get() == null) {
throw new IllegalArgumentException("Failure store configuration should have at least one non-null configuration value.");
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
ResettableValue.write(out, enabled, StreamOutput::writeBoolean);
}
public static Template read(StreamInput in) throws IOException {
ResettableValue<Boolean> enabled = ResettableValue.read(in, StreamInput::readBoolean);
return new Template(enabled);
}
/**
* Converts the template to XContent, depending on the XContent.Params set by {@link ResettableValue#hideResetValues(Params)}
* it may or may not display any explicit nulls when the value is to be reset.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
enabled.toXContent(builder, params, ENABLED_FIELD.getPreferredName());
builder.endObject();
return builder;
}
public static Template fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
/**
* Returns a template which has the value of the initial template updated with the values of the update.
* Note: for now it's a trivial composition because we have only one non-null field.
* @return the composed template
*/
public static Template merge(Template ignored, Template update) {
return update;
}
public DataStreamFailureStore toFailureStore() {
return new DataStreamFailureStore(enabled.get());
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
}

View file

@ -14,9 +14,9 @@ import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
@ -24,6 +24,8 @@ import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import static org.elasticsearch.cluster.metadata.DataStreamFailureStore.FAILURE_STORE;
/**
* Holds data stream dedicated configuration options such as failure store, (in the future lifecycle). Currently, it
* supports the following configurations:
@ -34,10 +36,10 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore)
SimpleDiffable<DataStreamOptions>,
ToXContentObject {
public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store");
public static final ParseField FAILURE_STORE_FIELD = new ParseField(FAILURE_STORE);
public static final DataStreamOptions FAILURE_STORE_ENABLED = new DataStreamOptions(new DataStreamFailureStore(true));
public static final DataStreamOptions FAILURE_STORE_DISABLED = new DataStreamOptions(new DataStreamFailureStore(false));
public static final DataStreamOptions EMPTY = new DataStreamOptions();
public static final DataStreamOptions EMPTY = new DataStreamOptions(null);
public static final ConstructingObjectParser<DataStreamOptions, Void> PARSER = new ConstructingObjectParser<>(
"options",
@ -46,18 +48,13 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore)
);
static {
PARSER.declareField(
PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataStreamFailureStore.fromXContent(p),
FAILURE_STORE_FIELD,
ObjectParser.ValueType.OBJECT_OR_NULL
FAILURE_STORE_FIELD
);
}
public DataStreamOptions() {
this(null);
}
public static DataStreamOptions read(StreamInput in) throws IOException {
return new DataStreamOptions(in.readOptionalWriteable(DataStreamFailureStore::new));
}
@ -66,8 +63,21 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore)
return SimpleDiffable.readDiffFrom(DataStreamOptions::read, in);
}
/**
* @return true if none of the options are defined
*/
public boolean isEmpty() {
return this.equals(EMPTY);
return failureStore == null;
}
/**
* Determines if this data stream has its failure store enabled or not. Currently, the failure store
* is enabled only when a user has explicitly requested it.
*
* @return true, if the user has explicitly enabled the failure store.
*/
public boolean isFailureStoreEnabled() {
return failureStore != null && Boolean.TRUE.equals(failureStore.enabled());
}
@Override
@ -93,4 +103,100 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore)
public static DataStreamOptions fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
/**
* This class is only used in template configuration. It wraps the fields of {@link DataStreamOptions} with {@link ResettableValue}
* to allow a user to signal when they want to reset any previously encountered values during template composition. Furthermore, it
* provides the {@link Template.Builder} that dictates how two templates can be composed.
*/
public record Template(ResettableValue<DataStreamFailureStore.Template> failureStore) implements Writeable, ToXContentObject {
public static final Template EMPTY = new Template(ResettableValue.undefined());
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Template, Void> PARSER = new ConstructingObjectParser<>(
"data_stream_options_template",
false,
(args, unused) -> new Template(
args[0] == null ? ResettableValue.undefined() : (ResettableValue<DataStreamFailureStore.Template>) args[0]
)
);
static {
PARSER.declareObjectOrNull(
ConstructingObjectParser.optionalConstructorArg(),
(p, s) -> ResettableValue.create(DataStreamFailureStore.Template.fromXContent(p)),
ResettableValue.reset(),
FAILURE_STORE_FIELD
);
}
public Template {
assert failureStore != null : "Template does not accept null values, please use Resettable.undefined()";
}
@Override
public void writeTo(StreamOutput out) throws IOException {
ResettableValue.write(out, failureStore, (o, v) -> v.writeTo(o));
}
public static Template read(StreamInput in) throws IOException {
ResettableValue<DataStreamFailureStore.Template> failureStore = ResettableValue.read(in, DataStreamFailureStore.Template::read);
return new Template(failureStore);
}
public static Template fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
/**
* Converts the template to XContent, depending on the {@param params} set by {@link ResettableValue#hideResetValues(Params)}
* it may or may not display any explicit nulls when the value is to be reset.
*/
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
failureStore.toXContent(builder, params, FAILURE_STORE_FIELD.getPreferredName());
builder.endObject();
return builder;
}
public DataStreamOptions toDataStreamOptions() {
return new DataStreamOptions(failureStore.mapAndGet(DataStreamFailureStore.Template::toFailureStore));
}
public static Builder builder(Template template) {
return new Builder(template);
}
/**
* Builds and composes a data stream template.
*/
public static class Builder {
private ResettableValue<DataStreamFailureStore.Template> failureStore = ResettableValue.undefined();
public Builder(Template template) {
if (template != null) {
failureStore = template.failureStore();
}
}
/**
* Updates the current failure store configuration with the provided value. This is not a replacement necessarily, if both
* instance contain data the configurations are merged.
*/
public Builder updateFailureStore(ResettableValue<DataStreamFailureStore.Template> newFailureStore) {
failureStore = ResettableValue.merge(failureStore, newFailureStore, DataStreamFailureStore.Template::merge);
return this;
}
public Template build() {
return new Template(failureStore);
}
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
}

View file

@ -264,11 +264,16 @@ public class MetadataCreateDataStreamService {
// This is not a problem as both have different prefixes (`.ds-` vs `.fs-`) and both will be using the same `generation` field
// when rolling over in the future.
final long initialGeneration = 1;
ResettableValue<DataStreamOptions.Template> dataStreamOptionsTemplate = isSystem
? MetadataIndexTemplateService.resolveDataStreamOptions(template, systemDataStreamDescriptor.getComponentTemplates())
: MetadataIndexTemplateService.resolveDataStreamOptions(template, metadata.componentTemplates());
final DataStreamOptions dataStreamOptions = dataStreamOptionsTemplate.mapAndGet(DataStreamOptions.Template::toDataStreamOptions);
var isFailureStoreEnabled = dataStreamOptions != null && dataStreamOptions.isFailureStoreEnabled();
// If we need to create a failure store, do so first. Do not reroute during the creation since we will do
// that as part of creating the backing index if required.
IndexMetadata failureStoreIndex = null;
if (template.getDataStreamTemplate().hasFailureStore() && initializeFailureStore) {
if (isFailureStoreEnabled && initializeFailureStore) {
if (isSystem) {
throw new IllegalArgumentException("Failure stores are not supported on system data streams");
}
@ -307,7 +312,7 @@ public class MetadataCreateDataStreamService {
}
assert writeIndex != null;
assert writeIndex.mapping() != null : "no mapping found for backing index [" + writeIndex.getIndex().getName() + "]";
assert template.getDataStreamTemplate().hasFailureStore() == false || initializeFailureStore == false || failureStoreIndex != null
assert isFailureStoreEnabled == false || initializeFailureStore == false || failureStoreIndex != null
: "failure store should have an initial index";
assert failureStoreIndex == null || failureStoreIndex.mapping() != null
: "no mapping found for failure store [" + failureStoreIndex.getIndex().getName() + "]";
@ -334,7 +339,7 @@ public class MetadataCreateDataStreamService {
template.getDataStreamTemplate().isAllowCustomRouting(),
indexMode,
lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle,
template.getDataStreamTemplate().hasFailureStore() ? DataStreamOptions.FAILURE_STORE_ENABLED : DataStreamOptions.EMPTY,
dataStreamOptions,
new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, dsBackingIndices, false, null),
// If the failure store shouldn't be initialized on data stream creation, we're marking it for "lazy rollover", which will
// initialize the failure store on first write.

View file

@ -317,12 +317,7 @@ public class MetadataIndexTemplateService {
}
}
final Template finalTemplate = new Template(
finalSettings,
wrappedMappings,
template.template().aliases(),
template.template().lifecycle()
);
final Template finalTemplate = Template.builder(template.template()).settings(finalSettings).mappings(wrappedMappings).build();
final ComponentTemplate finalComponentTemplate = new ComponentTemplate(
finalTemplate,
template.version(),
@ -353,6 +348,7 @@ public class MetadataIndexTemplateService {
composableTemplate,
globalRetentionSettings.get()
);
validateDataStreamOptions(tempStateWithComponentTemplateAdded.metadata(), composableTemplateName, composableTemplate);
validateIndexTemplateV2(
tempProjectWithComponentTemplateAdded,
projectState.cluster().getMinTransportVersion(),
@ -643,7 +639,7 @@ public class MetadataIndexTemplateService {
// adjusted (to add _doc) and it should be validated
CompressedXContent mappings = innerTemplate.mappings();
CompressedXContent wrappedMappings = wrapMappingsIfNecessary(mappings, xContentRegistry);
final Template finalTemplate = new Template(finalSettings, wrappedMappings, innerTemplate.aliases(), innerTemplate.lifecycle());
final Template finalTemplate = Template.builder(innerTemplate).settings(finalSettings).mappings(wrappedMappings).build();
finalIndexTemplate = template.toBuilder().template(finalTemplate).build();
}
@ -704,7 +700,8 @@ public class MetadataIndexTemplateService {
return overlaps;
}
private void validateIndexTemplateV2(
// Visibility for testing
void validateIndexTemplateV2(
ProjectMetadata projectMetadata,
TransportVersion minTransportVersion,
String name,
@ -744,6 +741,7 @@ public class MetadataIndexTemplateService {
validate(name, templateToValidate);
validateDataStreamsStillReferenced(projectMetadata, name, templateToValidate);
validateLifecycle(projectMetadata, name, templateToValidate, globalRetentionSettings.get());
validateDataStreamOptions(currentState.metadata(), name, templateToValidate);
if (templateToValidate.isDeprecated() == false) {
validateUseOfDeprecatedComponentTemplates(name, templateToValidate, projectMetadata.componentTemplates());
@ -845,6 +843,20 @@ public class MetadataIndexTemplateService {
}
}
// Visible for testing
static void validateDataStreamOptions(Metadata metadata, String indexTemplateName, ComposableIndexTemplate template) {
ResettableValue<DataStreamOptions.Template> dataStreamOptions = resolveDataStreamOptions(template, metadata.componentTemplates());
if (dataStreamOptions.get() != null) {
if (template.getDataStreamTemplate() == null) {
throw new IllegalArgumentException(
"index template ["
+ indexTemplateName
+ "] specifies data stream options that can only be used in combination with a data stream"
);
}
}
}
/**
* Validate that by changing or adding {@code newTemplate}, there are
* no unreferenced data streams. Note that this scenario is still possible
@ -1614,7 +1626,7 @@ public class MetadataIndexTemplateService {
public static DataStreamLifecycle resolveLifecycle(final Metadata metadata, final String templateName) {
final ComposableIndexTemplate template = metadata.getProject().templatesV2().get(templateName);
assert template != null
: "attempted to resolve settings for a template [" + templateName + "] that did not exist in the cluster state";
: "attempted to resolve lifecycle for a template [" + templateName + "] that did not exist in the cluster state";
if (template == null) {
return null;
}
@ -1706,6 +1718,81 @@ public class MetadataIndexTemplateService {
return builder == null ? null : builder.build();
}
/**
* Resolve the given v2 template into a {@link ResettableValue<DataStreamOptions>} object
*/
public static ResettableValue<DataStreamOptions.Template> resolveDataStreamOptions(final Metadata metadata, final String templateName) {
final ComposableIndexTemplate template = metadata.templatesV2().get(templateName);
assert template != null
: "attempted to resolve data stream options for a template [" + templateName + "] that did not exist in the cluster state";
if (template == null) {
return ResettableValue.undefined();
}
return resolveDataStreamOptions(template, metadata.componentTemplates());
}
/**
* Resolve the provided v2 template and component templates into a {@link ResettableValue<DataStreamOptions>} object
*/
public static ResettableValue<DataStreamOptions.Template> resolveDataStreamOptions(
ComposableIndexTemplate template,
Map<String, ComponentTemplate> componentTemplates
) {
Objects.requireNonNull(template, "attempted to resolve data stream for a null template");
Objects.requireNonNull(componentTemplates, "attempted to resolve data stream options with null component templates");
List<ResettableValue<DataStreamOptions.Template>> dataStreamOptionsList = new ArrayList<>();
for (String componentTemplateName : template.composedOf()) {
if (componentTemplates.containsKey(componentTemplateName) == false) {
continue;
}
ResettableValue<DataStreamOptions.Template> dataStreamOptions = componentTemplates.get(componentTemplateName)
.template()
.resettableDataStreamOptions();
if (dataStreamOptions.isDefined()) {
dataStreamOptionsList.add(dataStreamOptions);
}
}
// The actual index template's data stream options have the highest precedence.
if (template.template() != null && template.template().resettableDataStreamOptions().isDefined()) {
dataStreamOptionsList.add(template.template().resettableDataStreamOptions());
}
return composeDataStreamOptions(dataStreamOptionsList);
}
/**
* This method composes a series of data streams options to a final one. Since currently the data stream options
* contains only the failure store configuration which also contains only one field, the composition is a bit trivial.
* But we introduce the mechanics that will help extend it really easily.
* @param dataStreamOptionsList a sorted list of data stream options in the order that they will be composed
* @return the final data stream option configuration
*/
public static ResettableValue<DataStreamOptions.Template> composeDataStreamOptions(
List<ResettableValue<DataStreamOptions.Template>> dataStreamOptionsList
) {
if (dataStreamOptionsList.isEmpty()) {
return ResettableValue.undefined();
}
DataStreamOptions.Template.Builder builder = null;
for (ResettableValue<DataStreamOptions.Template> current : dataStreamOptionsList) {
if (current.isDefined() == false) {
continue;
}
if (current.shouldReset()) {
builder = null;
} else {
DataStreamOptions.Template currentTemplate = current.get();
if (builder == null) {
builder = DataStreamOptions.Template.builder(currentTemplate);
} else {
// Currently failure store has only one field that needs to be defined so the composing of the failure store is trivial
builder.updateFailureStore(currentTemplate.failureStore());
}
}
}
return builder == null ? ResettableValue.undefined() : ResettableValue.create(builder.build());
}
/**
* Given a state and a composable template, validate that the final composite template
* generated by the composable template and all of its component templates contains valid

View file

@ -0,0 +1,216 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
/**
* This class holds a value of type @{param T} that can be in one of 3 states:
* - It has a concrete value, or
* - It is missing, or
* - It is meant to reset any other when it is composed with it.
* It is mainly used in template composition to capture the case when the user wished to reset any previous values.
* @param <T>
*/
public class ResettableValue<T> {
private static final ResettableValue<?> RESET = new ResettableValue<>(true, null);
private static final ResettableValue<?> UNDEFINED = new ResettableValue<>(false, null);
private static final String DISPLAY_RESET_VALUES = "display_reset";
private static final Map<String, String> HIDE_RESET_VALUES_PARAMS = Map.of(DISPLAY_RESET_VALUES, "false");
private final T value;
private final boolean isDefined;
/**
* @return the reset state, meaning that this value is explicitly requested to be reset
*/
public static <T> ResettableValue<T> reset() {
@SuppressWarnings("unchecked")
ResettableValue<T> t = (ResettableValue<T>) RESET;
return t;
}
/**
* @return the undefined state, meaning that this value has not been specified
*/
public static <T> ResettableValue<T> undefined() {
@SuppressWarnings("unchecked")
ResettableValue<T> t = (ResettableValue<T>) UNDEFINED;
return t;
}
/**
* Wraps a value, if the value is null, it returns {@link #undefined()}
*/
public static <T> ResettableValue<T> create(T value) {
if (value == null) {
return undefined();
}
return new ResettableValue<>(true, value);
}
private ResettableValue(boolean isDefined, T value) {
this.isDefined = isDefined;
this.value = value;
}
/**
* @return true if the state of this is reset
*/
public boolean shouldReset() {
return isDefined && value == null;
}
/**
* @return true when the value is defined, either with a concrete value or reset.
*/
public boolean isDefined() {
return isDefined;
}
/**
* @return the concrete value or null if it is in undefined or reset states.
*/
@Nullable
public T get() {
return value;
}
/**
* Writes a single optional explicitly nullable value. This method is in direct relation with the
* {@link #read(StreamInput, Writeable.Reader)} which reads the respective value. It's the
* responsibility of the caller to preserve order of the fields and their backwards compatibility.
*
* @throws IOException
*/
static <T> void write(StreamOutput out, ResettableValue<T> value, Writeable.Writer<T> writer) throws IOException {
out.writeBoolean(value.isDefined);
if (value.isDefined) {
out.writeBoolean(value.shouldReset());
if (value.shouldReset() == false) {
writer.write(out, value.get());
}
}
}
/**
* Reads a single optional and explicitly nullable value. This method is in direct relation with the
* {@link #write(StreamOutput, ResettableValue, Writeable.Writer)} which writes the respective value. It's the
* responsibility of the caller to preserve order of the fields and their backwards compatibility.
*
* @throws IOException
*/
@Nullable
static <T> ResettableValue<T> read(StreamInput in, Writeable.Reader<T> reader) throws IOException {
boolean isDefined = in.readBoolean();
if (isDefined == false) {
return ResettableValue.undefined();
}
boolean shouldReset = in.readBoolean();
if (shouldReset) {
return ResettableValue.reset();
}
T value = reader.read(in);
return ResettableValue.create(value);
}
/**
* Gets the value and applies the function {@param f} when the value is not null. Slightly more efficient than
* <code>this.map(f).get()</code>.
*/
public <U> U mapAndGet(Function<? super T, ? extends U> f) {
if (isDefined() == false || shouldReset()) {
return null;
} else {
return f.apply(value);
}
}
public <U> ResettableValue<U> map(Function<? super T, ? extends U> mapper) {
Objects.requireNonNull(mapper);
if (isDefined == false) {
return ResettableValue.undefined();
}
if (shouldReset()) {
return reset();
}
return ResettableValue.create(mapper.apply(value));
}
/**
* Ιt merges the values of the ResettableValue's when they are defined using the provided mergeFunction.
*/
public static <T> ResettableValue<T> merge(ResettableValue<T> initial, ResettableValue<T> update, BiFunction<T, T, T> mergeFunction) {
if (update.shouldReset()) {
return undefined();
}
if (update.isDefined() == false) {
return initial;
}
if (initial.isDefined() == false || initial.shouldReset()) {
return update;
}
// Because we checked that's defined and not in reset state, we can directly apply the merge function.
return ResettableValue.create(mergeFunction.apply(initial.value, update.value));
}
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params, String field) throws IOException {
return toXContent(builder, params, field, Function.identity());
}
public <U> XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params, String field, Function<T, U> transformValue)
throws IOException {
if (isDefined) {
if (value != null) {
builder.field(field, transformValue.apply(value));
} else if (ResettableValue.shouldDisplayResetValue(params)) {
builder.nullField(field);
}
}
return builder;
}
public static boolean shouldDisplayResetValue(ToXContent.Params params) {
return params.paramAsBoolean(DISPLAY_RESET_VALUES, true);
}
public static ToXContent.Params hideResetValues(ToXContent.Params params) {
return new ToXContent.DelegatingMapParams(HIDE_RESET_VALUES_PARAMS, params);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ResettableValue<?> that = (ResettableValue<?>) o;
return isDefined == that.isDefined && Objects.equals(value, that.value);
}
@Override
public int hashCode() {
return Objects.hash(value, isDefined);
}
@Override
public String toString() {
return "ResettableValue{" + "value=" + value + ", isDefined=" + isDefined + '}';
}
}

View file

@ -47,12 +47,19 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
private static final ParseField MAPPINGS = new ParseField("mappings");
private static final ParseField ALIASES = new ParseField("aliases");
private static final ParseField LIFECYCLE = new ParseField("lifecycle");
private static final ParseField DATA_STREAM_OPTIONS = new ParseField("data_stream_options");
@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<Template, Void> PARSER = new ConstructingObjectParser<>(
"template",
false,
a -> new Template((Settings) a[0], (CompressedXContent) a[1], (Map<String, AliasMetadata>) a[2], (DataStreamLifecycle) a[3])
a -> new Template(
(Settings) a[0],
(CompressedXContent) a[1],
(Map<String, AliasMetadata>) a[2],
(DataStreamLifecycle) a[3],
a[4] == null ? ResettableValue.undefined() : (ResettableValue<DataStreamOptions.Template>) a[4]
)
);
static {
@ -82,6 +89,12 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
return aliasMap;
}, ALIASES);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> DataStreamLifecycle.fromXContent(p), LIFECYCLE);
PARSER.declareObjectOrNull(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> ResettableValue.create(DataStreamOptions.Template.fromXContent(p)),
ResettableValue.reset(),
DATA_STREAM_OPTIONS
);
}
@Nullable
@ -93,21 +106,25 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
@Nullable
private final DataStreamLifecycle lifecycle;
private final ResettableValue<DataStreamOptions.Template> dataStreamOptions;
public Template(
@Nullable Settings settings,
@Nullable CompressedXContent mappings,
@Nullable Map<String, AliasMetadata> aliases,
@Nullable DataStreamLifecycle lifecycle
@Nullable DataStreamLifecycle lifecycle,
ResettableValue<DataStreamOptions.Template> dataStreamOptions
) {
this.settings = settings;
this.mappings = mappings;
this.aliases = aliases;
this.lifecycle = lifecycle;
assert dataStreamOptions != null : "Template does not accept null values, please use Resettable.undefined()";
this.dataStreamOptions = dataStreamOptions;
}
public Template(@Nullable Settings settings, @Nullable CompressedXContent mappings, @Nullable Map<String, AliasMetadata> aliases) {
this(settings, mappings, aliases, null);
this(settings, mappings, aliases, null, ResettableValue.undefined());
}
public Template(StreamInput in) throws IOException {
@ -138,6 +155,12 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
} else {
this.lifecycle = null;
}
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) {
dataStreamOptions = ResettableValue.read(in, DataStreamOptions.Template::read);
} else {
// We default to no data stream options since failure store is behind a feature flag up to this version
this.dataStreamOptions = ResettableValue.undefined();
}
}
@Nullable
@ -160,6 +183,15 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
return lifecycle;
}
@Nullable
public DataStreamOptions.Template dataStreamOptions() {
return dataStreamOptions.get();
}
public ResettableValue<DataStreamOptions.Template> resettableDataStreamOptions() {
return dataStreamOptions;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (this.settings == null) {
@ -189,11 +221,14 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
out.writeOptionalWriteable(lifecycle);
}
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS_TO_TEMPLATES)) {
ResettableValue.write(out, dataStreamOptions, (o, v) -> v.writeTo(o));
}
}
@Override
public int hashCode() {
return Objects.hash(settings, mappings, aliases, lifecycle);
return Objects.hash(settings, mappings, aliases, lifecycle, dataStreamOptions);
}
@Override
@ -208,7 +243,8 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
return Objects.equals(settings, other.settings)
&& mappingsEquals(this.mappings, other.mappings)
&& Objects.equals(aliases, other.aliases)
&& Objects.equals(lifecycle, other.lifecycle);
&& Objects.equals(lifecycle, other.lifecycle)
&& Objects.equals(dataStreamOptions, other.dataStreamOptions);
}
@Override
@ -222,7 +258,9 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
}
/**
* Converts the template to XContent and passes the RolloverConditions, when provided, to the lifecycle.
* Converts the template to XContent and passes the RolloverConditions, when provided, to the lifecycle. Depending on the
* {@param params} set by {@link ResettableValue#hideResetValues(Params)} it may or may not display <code>null</code> when the value
* is to be reset.
*/
public XContentBuilder toXContent(XContentBuilder builder, Params params, @Nullable RolloverConfiguration rolloverConfiguration)
throws IOException {
@ -257,6 +295,9 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
builder.field(LIFECYCLE.getPreferredName());
lifecycle.toXContent(builder, params, rolloverConfiguration, null, false);
}
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
dataStreamOptions.toXContent(builder, params, DATA_STREAM_OPTIONS.getPreferredName());
}
builder.endObject();
return builder;
}
@ -305,6 +346,7 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
private CompressedXContent mappings = null;
private Map<String, AliasMetadata> aliases = null;
private DataStreamLifecycle lifecycle = null;
private ResettableValue<DataStreamOptions.Template> dataStreamOptions = ResettableValue.undefined();
private Builder() {}
@ -313,6 +355,7 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
mappings = template.mappings;
aliases = template.aliases;
lifecycle = template.lifecycle;
dataStreamOptions = template.dataStreamOptions;
}
public Builder settings(Settings settings) {
@ -345,8 +388,21 @@ public class Template implements SimpleDiffable<Template>, ToXContentObject {
return this;
}
/**
* When the value passed is null it considers the value as undefined.
*/
public Builder dataStreamOptions(@Nullable DataStreamOptions.Template dataStreamOptions) {
this.dataStreamOptions = ResettableValue.create(dataStreamOptions);
return this;
}
public Builder dataStreamOptions(ResettableValue<DataStreamOptions.Template> dataStreamOptions) {
this.dataStreamOptions = dataStreamOptions;
return this;
}
public Template build() {
return new Template(settings, mappings, aliases, lifecycle);
return new Template(settings, mappings, aliases, lifecycle, dataStreamOptions);
}
}
}

View file

@ -76,7 +76,7 @@ public class DocumentMapper {
if (logger.isDebugEnabled()) {
logger.debug("Error while parsing document: " + ex.getMessage(), ex);
} else if (IntervalThrottler.DOCUMENT_PARSING_FAILURE.accept()) {
logger.error("Error while parsing document: " + ex.getMessage(), ex);
logger.info("Error while parsing document: " + ex.getMessage(), ex);
}
}

View file

@ -12,17 +12,16 @@ package org.elasticsearch.internal;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.index.IndexVersion;
import java.util.List;
/**
* Allows plugging in current version elements.
*/
public interface VersionExtension {
/**
* Returns the {@link TransportVersion} that Elasticsearch should use.
* <p>
* This must be at least as high as the given fallback.
* @param fallback The latest transport version from server
* Returns list of {@link TransportVersion} defined by extension
*/
TransportVersion getCurrentTransportVersion(TransportVersion fallback);
List<TransportVersion> getTransportVersions();
/**
* Returns the {@link IndexVersion} that Elasticsearch should use.

View file

@ -12,6 +12,7 @@ package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.template.put.PutComponentTemplateAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
@ -20,6 +21,7 @@ import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
@ -28,6 +30,9 @@ import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
@ServerlessScope(Scope.PUBLIC)
public class RestPutComponentTemplateAction extends BaseRestHandler {
public static final String SUPPORTS_FAILURE_STORE = "failure_store_in_template";
private static final Set<String> capabilities = Set.of(SUPPORTS_FAILURE_STORE);
@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_component_template/{name}"), new Route(PUT, "/_component_template/{name}"));
@ -51,4 +56,9 @@ public class RestPutComponentTemplateAction extends BaseRestHandler {
return channel -> client.execute(PutComponentTemplateAction.INSTANCE, putRequest, new RestToXContentListener<>(channel));
}
@Override
public Set<String> supportedCapabilities() {
return DataStream.isFailureStoreFeatureFlagEnabled() ? capabilities : Set.of();
}
}

View file

@ -12,6 +12,7 @@ package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.Scope;
@ -20,14 +21,18 @@ import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestRequest.Method.PUT;
import static org.elasticsearch.rest.RestUtils.getMasterNodeTimeout;
import static org.elasticsearch.rest.action.admin.indices.RestPutComponentTemplateAction.SUPPORTS_FAILURE_STORE;
@ServerlessScope(Scope.PUBLIC)
public class RestPutComposableIndexTemplateAction extends BaseRestHandler {
private static final Set<String> capabilities = Set.of(SUPPORTS_FAILURE_STORE);
@Override
public List<Route> routes() {
return List.of(new Route(POST, "/_index_template/{name}"), new Route(PUT, "/_index_template/{name}"));
@ -53,4 +58,9 @@ public class RestPutComposableIndexTemplateAction extends BaseRestHandler {
return channel -> client.execute(TransportPutComposableIndexTemplateAction.TYPE, putRequest, new RestToXContentListener<>(channel));
}
@Override
public Set<String> supportedCapabilities() {
return DataStream.isFailureStoreFeatureFlagEnabled() ? capabilities : Set.of();
}
}

View file

@ -15,7 +15,7 @@ import org.elasticsearch.test.TransportVersionUtils;
import java.lang.reflect.Modifier;
import java.util.Collections;
import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
@ -70,21 +70,20 @@ public class TransportVersionTests extends ESTestCase {
public void testStaticTransportVersionChecks() {
assertThat(
TransportVersions.getAllVersionIds(CorrectFakeVersion.class),
TransportVersions.collectAllVersionIdsDefinedInClass(CorrectFakeVersion.class),
equalTo(
Map.of(
199,
CorrectFakeVersion.V_0_00_01,
2,
List.of(
CorrectFakeVersion.V_0_000_002,
3,
CorrectFakeVersion.V_0_000_003,
4,
CorrectFakeVersion.V_0_000_004
CorrectFakeVersion.V_0_000_004,
CorrectFakeVersion.V_0_00_01
)
)
);
AssertionError e = expectThrows(AssertionError.class, () -> TransportVersions.getAllVersionIds(DuplicatedIdFakeVersion.class));
AssertionError e = expectThrows(
AssertionError.class,
() -> TransportVersions.collectAllVersionIdsDefinedInClass(DuplicatedIdFakeVersion.class)
);
assertThat(e.getMessage(), containsString("have the same version number"));
}
@ -187,7 +186,7 @@ public class TransportVersionTests extends ESTestCase {
}
public void testCURRENTIsLatest() {
assertThat(Collections.max(TransportVersions.getAllVersions()), is(TransportVersion.current()));
assertThat(Collections.max(TransportVersion.getAllVersions()), is(TransportVersion.current()));
}
public void testToReleaseVersion() {
@ -212,7 +211,7 @@ public class TransportVersionTests extends ESTestCase {
public void testDenseTransportVersions() {
Set<Integer> missingVersions = new TreeSet<>();
TransportVersion previous = null;
for (var tv : TransportVersions.getAllVersions()) {
for (var tv : TransportVersion.getAllVersions()) {
if (tv.before(TransportVersions.V_8_16_0)) {
continue;
}

View file

@ -14,6 +14,8 @@ import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComponentTemplate;
import org.elasticsearch.cluster.metadata.ComponentTemplateTests;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.ResettableValue;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
@ -65,6 +67,7 @@ public class GetComponentTemplateResponseTests extends AbstractWireSerializingTe
CompressedXContent mappings = null;
Map<String, AliasMetadata> aliases = null;
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
ResettableValue<DataStreamOptions.Template> dataStreamOptions = ResettableValue.undefined();
if (randomBoolean()) {
settings = randomSettings();
}
@ -74,9 +77,12 @@ public class GetComponentTemplateResponseTests extends AbstractWireSerializingTe
if (randomBoolean()) {
aliases = randomAliases();
}
if (randomBoolean()) {
dataStreamOptions = ComponentTemplateTests.randomDataStreamOptionsTemplate();
}
var template = new ComponentTemplate(
new Template(settings, mappings, aliases, lifecycle),
new Template(settings, mappings, aliases, lifecycle, dataStreamOptions),
randomBoolean() ? null : randomNonNegativeLong(),
null,
false

View file

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
@ -149,12 +150,14 @@ public class BulkOperationTests extends ESTestCase {
"ds-template",
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, false))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(false)))
.build(),
"ds-template-with-failure-store",
ComposableIndexTemplate.builder()
.indexPatterns(List.of(fsDataStreamName, fsRolloverDataStreamName))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.build()
)
)

View file

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
@ -329,7 +330,8 @@ public class TransportBulkActionIngestTests extends ESTestCase {
WITH_FAILURE_STORE_ENABLED,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(WITH_FAILURE_STORE_ENABLED + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
)
)

View file

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
import org.elasticsearch.cluster.project.ProjectResolver;
@ -514,12 +515,14 @@ public class TransportBulkActionTests extends ESTestCase {
dsTemplateWithFailureStore,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dsTemplateWithFailureStore + "-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
dsTemplateWithoutFailureStore,
ComposableIndexTemplate.builder()
.indexPatterns(List.of(dsTemplateWithoutFailureStore + "-*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, false))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(false)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build(),
indexTemplate,
ComposableIndexTemplate.builder().indexPatterns(List.of(indexTemplate + "-*")).build()
@ -617,6 +620,38 @@ public class TransportBulkActionTests extends ESTestCase {
assertNull(bulkRequest.requests.get(2));
}
public void testFailureStoreFromTemplateResolution() {
Metadata metadata = Metadata.builder()
.indexTemplates(
Map.of(
"my-index-template",
ComposableIndexTemplate.builder().indexPatterns(List.of("my-index*")).build(),
"my-enabled-fs-template",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("my-enabled*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.build(),
"my-disabled-fs-template",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("my-disabled*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(false)))
.build(),
"my-no-fs-template",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("my-no*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
)
)
.build();
assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-index", metadata), nullValue());
assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-enabled-fs", metadata), equalTo(true));
assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-disabled-fs", metadata), equalTo(false));
assertThat(TransportBulkAction.resolveFailureStoreFromTemplate("my-no-fs", metadata), equalTo(false));
}
private BulkRequest buildBulkRequest(List<String> indices) {
BulkRequest request = new BulkRequest();
for (String index : indices) {

View file

@ -78,24 +78,24 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
return randomInstance(lifecycleAllowed, randomOptionalBoolean());
}
public static ComponentTemplate randomInstance(boolean lifecycleAllowed, Boolean deprecated) {
Settings settings = null;
CompressedXContent mappings = null;
Map<String, AliasMetadata> aliases = null;
DataStreamLifecycle lifecycle = null;
public static ComponentTemplate randomInstance(boolean supportsDataStreams, Boolean deprecated) {
Template.Builder templateBuilder = Template.builder();
if (randomBoolean()) {
settings = randomSettings();
templateBuilder.settings(randomSettings());
}
if (randomBoolean()) {
mappings = randomMappings();
templateBuilder.mappings(randomMappings());
}
if (randomBoolean()) {
aliases = randomAliases();
templateBuilder.aliases(randomAliases());
}
if (randomBoolean() && lifecycleAllowed) {
lifecycle = DataStreamLifecycleTests.randomLifecycle();
if (randomBoolean() && supportsDataStreams) {
templateBuilder.lifecycle(DataStreamLifecycleTests.randomLifecycle());
}
Template template = new Template(settings, mappings, aliases, lifecycle);
if (randomBoolean() && supportsDataStreams) {
templateBuilder.dataStreamOptions(randomDataStreamOptionsTemplate());
}
Template template = templateBuilder.build();
Map<String, Object> meta = null;
if (randomBoolean()) {
@ -104,6 +104,15 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
return new ComponentTemplate(template, randomBoolean() ? null : randomNonNegativeLong(), meta, deprecated);
}
public static ResettableValue<DataStreamOptions.Template> randomDataStreamOptionsTemplate() {
return switch (randomIntBetween(0, 2)) {
case 0 -> ResettableValue.undefined();
case 1 -> ResettableValue.reset();
case 2 -> ResettableValue.create(DataStreamOptionsTemplateTests.randomDataStreamOptions());
default -> throw new IllegalArgumentException("Illegal randomisation branch");
};
}
public static Map<String, AliasMetadata> randomAliases() {
String aliasName = randomAlphaOfLength(5);
AliasMetadata aliasMeta = AliasMetadata.builder(aliasName)
@ -152,7 +161,7 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
return switch (randomIntBetween(0, 3)) {
case 0 -> {
Template ot = orig.template();
yield switch (randomIntBetween(0, 3)) {
yield switch (randomIntBetween(0, 4)) {
case 0 -> new ComponentTemplate(
Template.builder(ot).settings(randomValueOtherThan(ot.settings(), ComponentTemplateTests::randomSettings)).build(),
orig.version(),
@ -179,6 +188,16 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
orig.metadata(),
orig.deprecated()
);
case 4 -> new ComponentTemplate(
Template.builder(ot)
.dataStreamOptions(
randomValueOtherThan(ot.dataStreamOptions(), DataStreamOptionsTemplateTests::randomDataStreamOptions)
)
.build(),
orig.version(),
orig.metadata(),
orig.deprecated()
);
default -> throw new IllegalStateException("illegal randomization branch");
};
}
@ -254,6 +273,7 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
Settings settings = null;
CompressedXContent mappings = null;
Map<String, AliasMetadata> aliases = null;
ResettableValue<DataStreamOptions.Template> dataStreamOptions = ResettableValue.undefined();
if (randomBoolean()) {
settings = randomSettings();
}
@ -263,9 +283,12 @@ public class ComponentTemplateTests extends SimpleDiffableSerializationTestCase<
if (randomBoolean()) {
aliases = randomAliases();
}
if (randomBoolean()) {
dataStreamOptions = randomDataStreamOptionsTemplate();
}
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
ComponentTemplate template = new ComponentTemplate(
new Template(settings, mappings, aliases, lifecycle),
new Template(settings, mappings, aliases, lifecycle, dataStreamOptions),
randomNonNegativeLong(),
null
);

View file

@ -219,6 +219,7 @@ public class ComposableIndexTemplateTests extends SimpleDiffableSerializationTes
Settings settings = null;
CompressedXContent mappings = null;
Map<String, AliasMetadata> aliases = null;
ResettableValue<DataStreamOptions.Template> dataStreamOptions = ResettableValue.undefined();
ComposableIndexTemplate.DataStreamTemplate dataStreamTemplate = randomDataStreamTemplate();
if (randomBoolean()) {
settings = randomSettings();
@ -229,9 +230,12 @@ public class ComposableIndexTemplateTests extends SimpleDiffableSerializationTes
if (randomBoolean()) {
aliases = randomAliases();
}
if (randomBoolean()) {
dataStreamOptions = ComponentTemplateTests.randomDataStreamOptionsTemplate();
}
// We use the empty lifecycle so the global retention can be in effect
DataStreamLifecycle lifecycle = new DataStreamLifecycle();
Template template = new Template(settings, mappings, aliases, lifecycle);
Template template = new Template(settings, mappings, aliases, lifecycle, dataStreamOptions);
ComposableIndexTemplate.builder()
.indexPatterns(List.of(randomAlphaOfLength(4)))
.template(template)

View file

@ -0,0 +1,65 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import static org.elasticsearch.cluster.metadata.DataStreamFailureStore.Template.merge;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class DataStreamFailureStoreTemplateTests extends AbstractXContentSerializingTestCase<DataStreamFailureStore.Template> {
@Override
protected Writeable.Reader<DataStreamFailureStore.Template> instanceReader() {
return DataStreamFailureStore.Template::read;
}
@Override
protected DataStreamFailureStore.Template createTestInstance() {
return randomFailureStoreTemplate();
}
@Override
protected DataStreamFailureStore.Template mutateInstance(DataStreamFailureStore.Template instance) {
return new DataStreamFailureStore.Template(instance.enabled().map(v -> v == false));
}
@Override
protected DataStreamFailureStore.Template doParseInstance(XContentParser parser) throws IOException {
return DataStreamFailureStore.Template.fromXContent(parser);
}
static DataStreamFailureStore.Template randomFailureStoreTemplate() {
return new DataStreamFailureStore.Template(ResettableValue.create(randomBoolean()));
}
public void testInvalidEmptyConfiguration() {
Exception exception = expectThrows(
IllegalArgumentException.class,
() -> new DataStreamFailureStore.Template(randomBoolean() ? ResettableValue.undefined() : ResettableValue.reset())
);
assertThat(exception.getMessage(), containsString("at least one non-null configuration value"));
}
public void testMerging() {
DataStreamFailureStore.Template template = randomFailureStoreTemplate();
DataStreamFailureStore.Template result = merge(template, template);
assertThat(result, equalTo(template));
DataStreamFailureStore.Template negatedTemplate = mutateInstance(template);
result = merge(template, negatedTemplate);
assertThat(result, equalTo(negatedTemplate));
}
}

View file

@ -30,7 +30,7 @@ public class DataStreamFailureStoreTests extends AbstractXContentSerializingTest
}
@Override
protected DataStreamFailureStore mutateInstance(DataStreamFailureStore instance) throws IOException {
protected DataStreamFailureStore mutateInstance(DataStreamFailureStore instance) {
return new DataStreamFailureStore(instance.enabled() == false);
}

View file

@ -0,0 +1,116 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import static org.hamcrest.Matchers.equalTo;
public class DataStreamOptionsTemplateTests extends AbstractXContentSerializingTestCase<DataStreamOptions.Template> {
public static final DataStreamOptions.Template RESET = new DataStreamOptions.Template(ResettableValue.reset());
@Override
protected Writeable.Reader<DataStreamOptions.Template> instanceReader() {
return DataStreamOptions.Template::read;
}
@Override
protected DataStreamOptions.Template createTestInstance() {
return randomDataStreamOptions();
}
public static DataStreamOptions.Template randomDataStreamOptions() {
return switch (randomIntBetween(0, 3)) {
case 0 -> DataStreamOptions.Template.EMPTY;
case 1 -> createTemplateWithFailureStoreConfig(true);
case 2 -> createTemplateWithFailureStoreConfig(false);
case 3 -> RESET;
default -> throw new IllegalArgumentException("Illegal randomisation branch");
};
}
@Override
protected DataStreamOptions.Template mutateInstance(DataStreamOptions.Template instance) {
ResettableValue<DataStreamFailureStore.Template> failureStore = instance.failureStore();
if (failureStore.isDefined() == false) {
if (randomBoolean()) {
return createTemplateWithFailureStoreConfig(randomBoolean());
} else {
return new DataStreamOptions.Template(ResettableValue.reset());
}
}
if (failureStore.shouldReset()) {
if (randomBoolean()) {
return createTemplateWithFailureStoreConfig(randomBoolean());
} else {
return DataStreamOptions.Template.EMPTY;
}
}
return new DataStreamOptions.Template(
instance.failureStore().map(x -> new DataStreamFailureStore.Template(x.enabled().map(e -> e == false)))
);
}
@Override
protected DataStreamOptions.Template doParseInstance(XContentParser parser) throws IOException {
return DataStreamOptions.Template.fromXContent(parser);
}
private static DataStreamOptions.Template createTemplateWithFailureStoreConfig(boolean enabled) {
return new DataStreamOptions.Template(ResettableValue.create(new DataStreamFailureStore.Template(ResettableValue.create(enabled))));
}
public void testBuilder() {
// No updates
{
DataStreamOptions.Template.Builder builder = DataStreamOptions.Template.builder(null);
assertThat(builder.build(), equalTo(DataStreamOptions.Template.EMPTY));
builder = DataStreamOptions.Template.builder(new DataStreamOptions.Template(ResettableValue.undefined()));
assertThat(builder.build(), equalTo(DataStreamOptions.Template.EMPTY));
builder = DataStreamOptions.Template.builder(RESET);
assertThat(builder.build(), equalTo(RESET));
DataStreamOptions.Template initial = new DataStreamOptions.Template(
ResettableValue.create(DataStreamFailureStoreTemplateTests.randomFailureStoreTemplate())
);
builder = DataStreamOptions.Template.builder(initial);
assertThat(builder.build(), equalTo(initial));
}
// Merge
{
DataStreamOptions.Template initial = randomDataStreamOptions();
DataStreamOptions.Template.Builder builder = DataStreamOptions.Template.builder(initial);
builder.updateFailureStore(ResettableValue.undefined());
assertThat(builder.build(), equalTo(initial));
}
// Override
{
DataStreamOptions.Template.Builder builder = DataStreamOptions.Template.builder(randomDataStreamOptions());
builder.updateFailureStore(ResettableValue.reset());
assertThat(builder.build(), equalTo(DataStreamOptions.Template.EMPTY));
builder = DataStreamOptions.Template.builder(randomDataStreamOptions());
DataStreamOptions.Template update = new DataStreamOptions.Template(
ResettableValue.create(DataStreamFailureStoreTemplateTests.randomFailureStoreTemplate())
);
builder.updateFailureStore(update.failureStore());
assertThat(builder.build(), equalTo(update));
}
}
}

View file

@ -34,11 +34,18 @@ public class DataStreamTemplateTests extends AbstractXContentSerializingTestCase
@Override
protected DataStreamTemplate mutateInstance(DataStreamTemplate instance) {
return null;// TODO implement https://github.com/elastic/elasticsearch/issues/25929
var hidden = instance.isHidden();
var allowCustomRouting = instance.isAllowCustomRouting();
switch (randomIntBetween(0, 1)) {
case 0 -> hidden = hidden == false;
case 1 -> allowCustomRouting = allowCustomRouting == false;
default -> throw new IllegalArgumentException("Illegal randomisation branch");
}
return new DataStreamTemplate(hidden, allowCustomRouting);
}
public static DataStreamTemplate randomInstance() {
return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean(), randomBoolean(), randomBoolean());
return new ComposableIndexTemplate.DataStreamTemplate(randomBoolean(), randomBoolean());
}
}

View file

@ -136,7 +136,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase<DataStr
: DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build();
case 10 -> failureIndices = randomValueOtherThan(failureIndices, DataStreamTestHelper::randomIndexInstances);
case 11 -> dataStreamOptions = dataStreamOptions.isEmpty() ? new DataStreamOptions(new DataStreamFailureStore(randomBoolean()))
: randomBoolean() ? (randomBoolean() ? null : DataStreamOptions.EMPTY)
: randomBoolean() ? DataStreamOptions.EMPTY
: new DataStreamOptions(new DataStreamFailureStore(dataStreamOptions.failureStore().enabled() == false));
case 12 -> {
rolloverOnWrite = rolloverOnWrite == false;

View file

@ -275,8 +275,10 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
public void testCreateDataStreamWithFailureStoreInitialized() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStreamName + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
final var projectId = randomProjectIdOrDefault();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -317,8 +319,10 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
public void testCreateDataStreamWithFailureStoreUninitialized() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStreamName + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
final var projectId = randomProjectIdOrDefault();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -356,8 +360,10 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
.put(MetadataCreateDataStreamService.FAILURE_STORE_REFRESH_INTERVAL_SETTING_NAME, timeValue)
.build();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template = new ComposableIndexTemplate.Builder().indexPatterns(List.of(dataStreamName + "*"))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate(false, false, true))
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(Template.builder().dataStreamOptions(DataStreamTestHelper.createDataStreamOptionsTemplate(true)))
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
final var projectId = randomProjectIdOrDefault();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))

View file

@ -1633,7 +1633,7 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
state = addComponentTemplate(service, state, ctDisabledLifecycle, DataStreamLifecycle.newBuilder().enabled(false).build());
String ctNoLifecycle = "ct_no_lifecycle";
state = addComponentTemplate(service, state, ctNoLifecycle, null);
state = addComponentTemplate(service, state, ctNoLifecycle, (DataStreamLifecycle) null);
// Component A: -
// Component B: "lifecycle": {"enabled": true}
@ -1727,13 +1727,128 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
assertLifecycleResolution(service, state, List.of(ct30d, ctDisabledLifecycle), lifecycle45d, lifecycle45d);
}
public void testResolveFailureStore() throws Exception {
final MetadataIndexTemplateService service = getMetadataIndexTemplateService();
ClusterState state = ClusterState.EMPTY_STATE;
String ctNoFailureStoreConfig = "no_failure_store";
state = addComponentTemplate(service, state, ctNoFailureStoreConfig, (DataStreamOptions.Template) null);
String ctFailureStoreEnabled = "ct_failure_store_enabled";
state = addComponentTemplate(service, state, ctFailureStoreEnabled, DataStreamTestHelper.createDataStreamOptionsTemplate(true));
String ctFailureStoreDisabled = "ct_failure_store_disabled";
state = addComponentTemplate(service, state, ctFailureStoreDisabled, DataStreamTestHelper.createDataStreamOptionsTemplate(false));
String ctFailureStoreNullified = "ct_null_failure_store";
DataStreamOptions.Template nullifiedFailureStore = new DataStreamOptions.Template(ResettableValue.reset());
state = addComponentTemplate(service, state, ctFailureStoreNullified, nullifiedFailureStore);
// Component A: -
// Composable Z: -
// Result: -
assertDataStreamOptionsResolution(service, state, List.of(), null, null);
// Component A: "data_stream_options": { "failure_store": { "enabled": true}}
// Composable Z: -
// Result: "data_stream_options": { "failure_store": { "enabled": true}}
assertDataStreamOptionsResolution(service, state, List.of(ctFailureStoreEnabled), null, DataStreamOptions.FAILURE_STORE_ENABLED);
// Component A: "data_stream_options": { "failure_store": { "enabled": false}}
// Composable Z: "data_stream_options": {}
// Result: "data_stream_options": { "failure_store": { "enabled": false}}
assertDataStreamOptionsResolution(
service,
state,
List.of(ctFailureStoreDisabled),
DataStreamOptions.Template.EMPTY,
DataStreamOptions.FAILURE_STORE_DISABLED
);
// Component A: "data_stream_options": { "failure_store": { "enabled": true}}
// Composable Z: "data_stream_options": { "failure_store": { "enabled": false}}
// Result: "data_stream_options": { "failure_store": { "enabled": false}}
assertDataStreamOptionsResolution(
service,
state,
List.of(ctFailureStoreEnabled),
DataStreamTestHelper.createDataStreamOptionsTemplate(false),
DataStreamOptions.FAILURE_STORE_DISABLED
);
// Component A: "data_stream_options": { "failure_store": null}
// Composable Z: "data_stream_options": { "failure_store": { "enabled": false}}
// Result: "data_stream_options": { "failure_store": { "enabled": false}}
assertDataStreamOptionsResolution(
service,
state,
List.of(ctFailureStoreNullified),
DataStreamTestHelper.createDataStreamOptionsTemplate(false),
DataStreamOptions.FAILURE_STORE_DISABLED
);
// Component A: "data_stream_options": { "failure_store": null}
// Composable Z: -
// Result: "data_stream_options": {}
assertDataStreamOptionsResolution(service, state, List.of(ctFailureStoreNullified), null, DataStreamOptions.EMPTY);
// Component A: "data_stream_options": { "failure_store": { "enabled": true}}
// Composable Z: "data_stream_options": { "failure_store": null}
// Result: "data_stream_options": {}
assertDataStreamOptionsResolution(service, state, List.of(ctFailureStoreEnabled), nullifiedFailureStore, DataStreamOptions.EMPTY);
}
public void testInvalidNonDataStreamTemplateWithDataStreamOptions() throws Exception {
MetadataIndexTemplateService metadataIndexTemplateService = getMetadataIndexTemplateService();
Template template = Template.builder().dataStreamOptions(DataStreamOptionsTemplateTests.randomDataStreamOptions()).build();
ComponentTemplate componentTemplate = new ComponentTemplate(template, 1L, new HashMap<>());
ComposableIndexTemplate globalIndexTemplate = ComposableIndexTemplate.builder()
.indexPatterns(List.of("my-index"))
.componentTemplates(List.of("ct-with-data-stream-options"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metadata(Metadata.builder().componentTemplates(Map.of("ct-with-data-stream-options", componentTemplate)))
.build();
Exception exception = expectThrows(
Exception.class,
() -> metadataIndexTemplateService.validateIndexTemplateV2("name", globalIndexTemplate, clusterState)
);
assertThat(
exception.getMessage(),
containsString("specifies data stream options that can only be used in combination with a data stream")
);
}
private ClusterState addComponentTemplate(
MetadataIndexTemplateService service,
ClusterState state,
String name,
DataStreamLifecycle lifecycle
) throws Exception {
ComponentTemplate ct = new ComponentTemplate(Template.builder().lifecycle(lifecycle).build(), null, null);
return addComponentTemplate(service, state, name, null, lifecycle);
}
private ClusterState addComponentTemplate(
MetadataIndexTemplateService service,
ClusterState state,
String name,
DataStreamOptions.Template dataStreamOptions
) throws Exception {
return addComponentTemplate(service, state, name, dataStreamOptions, null);
}
private ClusterState addComponentTemplate(
MetadataIndexTemplateService service,
ClusterState state,
String name,
DataStreamOptions.Template dataStreamOptions,
DataStreamLifecycle lifecycle
) throws Exception {
ComponentTemplate ct = new ComponentTemplate(
Template.builder().dataStreamOptions(dataStreamOptions).lifecycle(lifecycle).build(),
null,
null
);
return service.addComponentTemplate(state.projectState(Metadata.DEFAULT_PROJECT_ID), true, name, ct);
}
@ -1758,6 +1873,28 @@ public class MetadataIndexTemplateServiceTests extends ESSingleNodeTestCase {
assertThat(resolvedLifecycle, equalTo(expected));
}
private void assertDataStreamOptionsResolution(
MetadataIndexTemplateService service,
ClusterState state,
List<String> composeOf,
DataStreamOptions.Template dataStreamOptionsZ,
DataStreamOptions expected
) throws Exception {
ComposableIndexTemplate it = ComposableIndexTemplate.builder()
.indexPatterns(List.of(randomAlphaOfLength(10) + "*"))
.template(Template.builder().dataStreamOptions(dataStreamOptionsZ))
.componentTemplates(composeOf)
.priority(0L)
.version(1L)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build();
state = service.addIndexTemplateV2(state, true, "my-template", it);
DataStreamOptions resolvedDataStreamOptions = MetadataIndexTemplateService.resolveDataStreamOptions(state.metadata(), "my-template")
.mapAndGet(DataStreamOptions.Template::toDataStreamOptions);
assertThat(resolvedDataStreamOptions, resolvedDataStreamOptions == null ? nullValue() : equalTo(expected));
}
public void testAddInvalidTemplate() throws Exception {
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(Collections.singletonList("a"))

View file

@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class ResettableValueTests extends ESTestCase {
public void testCreateConstructor() {
ResettableValue<Boolean> value = ResettableValue.create(true);
assertThat(value.get(), equalTo(true));
assertThat(value.isDefined(), equalTo(true));
assertThat(ResettableValue.create(null), equalTo(ResettableValue.undefined()));
}
public void testMerge() {
// Initial state: undefined
{
ResettableValue<Integer> initial = ResettableValue.undefined();
assertThat(ResettableValue.merge(initial, ResettableValue.reset(), Integer::sum), equalTo(ResettableValue.undefined()));
assertThat(ResettableValue.merge(initial, ResettableValue.undefined(), Integer::sum), equalTo(ResettableValue.undefined()));
ResettableValue<Integer> update = ResettableValue.create(randomInt());
assertThat(ResettableValue.merge(initial, update, Integer::sum), equalTo(update));
}
// Initial state: reset
{
ResettableValue<Integer> initial = ResettableValue.reset();
assertThat(ResettableValue.merge(initial, ResettableValue.reset(), Integer::sum), equalTo(ResettableValue.undefined()));
assertThat(ResettableValue.merge(initial, ResettableValue.undefined(), Integer::sum), equalTo(initial));
ResettableValue<Integer> update = ResettableValue.create(randomInt());
assertThat(ResettableValue.merge(ResettableValue.undefined(), update, Integer::sum), equalTo(update));
}
// Initial state: value
{
ResettableValue<Integer> initial = ResettableValue.create(randomIntBetween(1, 200));
assertThat(ResettableValue.merge(initial, ResettableValue.reset(), Integer::sum), equalTo(ResettableValue.undefined()));
assertThat(ResettableValue.merge(initial, ResettableValue.undefined(), Integer::sum), equalTo(initial));
ResettableValue<Integer> update = ResettableValue.create(randomIntBetween(1, 200));
assertThat(ResettableValue.merge(initial, update, Integer::sum).get(), equalTo(initial.get() + update.get()));
}
}
public void testMap() {
Function<Integer, Integer> increment = x -> x + 1;
assertThat(ResettableValue.<Integer>undefined().map(increment), equalTo(ResettableValue.undefined()));
assertThat(ResettableValue.<Integer>reset().map(increment), equalTo(ResettableValue.reset()));
int value = randomIntBetween(0, Integer.MAX_VALUE - 1);
assertThat(ResettableValue.create(value).map(increment), equalTo(ResettableValue.create(value + 1)));
}
public void testMapAndGet() {
Function<Integer, Integer> increment = x -> x + 1;
assertThat(ResettableValue.<Integer>undefined().mapAndGet(increment), nullValue());
assertThat(ResettableValue.<Integer>reset().mapAndGet(increment), nullValue());
int value = randomIntBetween(0, Integer.MAX_VALUE - 1);
assertThat(ResettableValue.create(value).mapAndGet(increment), equalTo(value + 1));
}
public void testSerialisation() throws IOException {
ResettableValue<Boolean> value = ResettableValue.create(randomBoolean());
assertThat(writeAndRead(value), equalTo(value));
assertThat(writeAndRead(ResettableValue.undefined()), equalTo(ResettableValue.undefined()));
assertThat(writeAndRead(ResettableValue.reset()), equalTo(ResettableValue.reset()));
}
private ResettableValue<Boolean> writeAndRead(ResettableValue<Boolean> value) throws IOException {
try (BytesStreamOutput output = new BytesStreamOutput()) {
ResettableValue.write(output, value, StreamOutput::writeBoolean);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), new NamedWriteableRegistry(List.of()))) {
return ResettableValue.read(in, StreamInput::readBoolean);
}
}
}
}

View file

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch;
import java.util.List;
/**
* Provides access to all known transport versions.
*/
public class KnownTransportVersions {
/**
* A sorted list of all known transport versions
*/
public static final List<TransportVersion> ALL_VERSIONS = List.copyOf(TransportVersions.getAllVersions());
}

View file

@ -467,13 +467,13 @@ public final class DataStreamTestHelper {
"template_1",
ComposableIndexTemplate.builder()
.indexPatterns(List.of("*"))
.dataStreamTemplate(
new ComposableIndexTemplate.DataStreamTemplate(
false,
false,
DataStream.isFailureStoreFeatureFlagEnabled() && storeFailures
)
.template(
Template.builder()
.dataStreamOptions(
DataStream.isFailureStoreFeatureFlagEnabled() && storeFailures ? createDataStreamOptionsTemplate(true) : null
)
)
.dataStreamTemplate(new ComposableIndexTemplate.DataStreamTemplate())
.build()
);
@ -757,4 +757,12 @@ public final class DataStreamTestHelper {
return indicesService;
}
public static DataStreamOptions.Template createDataStreamOptionsTemplate(Boolean failureStore) {
if (failureStore == null) {
return DataStreamOptions.Template.EMPTY;
}
return new DataStreamOptions.Template(
ResettableValue.create(new DataStreamFailureStore.Template(ResettableValue.create(failureStore)))
);
}
}

View file

@ -15,14 +15,13 @@ import org.elasticsearch.TransportVersions;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.KnownTransportVersions.ALL_VERSIONS;
public final class BWCVersions {
private BWCVersions() {}
public static List<TransportVersion> getAllBWCVersions() {
int minCompatVersion = Collections.binarySearch(ALL_VERSIONS, TransportVersions.MINIMUM_COMPATIBLE);
return ALL_VERSIONS.subList(minCompatVersion, ALL_VERSIONS.size());
List<TransportVersion> allVersions = TransportVersion.getAllVersions();
int minCompatVersion = Collections.binarySearch(allVersions, TransportVersions.MINIMUM_COMPATIBLE);
return allVersions.subList(minCompatVersion, allVersions.size());
}
public static final List<TransportVersion> DEFAULT_BWC_VERSIONS = getAllBWCVersions();

View file

@ -19,32 +19,30 @@ import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.KnownTransportVersions.ALL_VERSIONS;
public class TransportVersionUtils {
/** Returns all released versions */
public static List<TransportVersion> allReleasedVersions() {
return ALL_VERSIONS;
return TransportVersion.getAllVersions();
}
/** Returns the oldest known {@link TransportVersion} */
public static TransportVersion getFirstVersion() {
return ALL_VERSIONS.get(0);
return allReleasedVersions().getFirst();
}
/** Returns a random {@link TransportVersion} from all available versions. */
public static TransportVersion randomVersion() {
return ESTestCase.randomFrom(ALL_VERSIONS);
return ESTestCase.randomFrom(allReleasedVersions());
}
/** Returns a random {@link TransportVersion} from all available versions without the ignore set */
public static TransportVersion randomVersion(Set<TransportVersion> ignore) {
return ESTestCase.randomFrom(ALL_VERSIONS.stream().filter(v -> ignore.contains(v) == false).collect(Collectors.toList()));
return ESTestCase.randomFrom(allReleasedVersions().stream().filter(v -> ignore.contains(v) == false).collect(Collectors.toList()));
}
/** Returns a random {@link TransportVersion} from all available versions. */
public static TransportVersion randomVersion(Random random) {
return ALL_VERSIONS.get(random.nextInt(ALL_VERSIONS.size()));
return allReleasedVersions().get(random.nextInt(allReleasedVersions().size()));
}
/** Returns a random {@link TransportVersion} between <code>minVersion</code> and <code>maxVersion</code> (inclusive). */
@ -58,12 +56,13 @@ public class TransportVersionUtils {
}
int minVersionIndex = 0;
List<TransportVersion> allReleasedVersions = allReleasedVersions();
if (minVersion != null) {
minVersionIndex = Collections.binarySearch(ALL_VERSIONS, minVersion);
minVersionIndex = Collections.binarySearch(allReleasedVersions, minVersion);
}
int maxVersionIndex = ALL_VERSIONS.size() - 1;
int maxVersionIndex = allReleasedVersions.size() - 1;
if (maxVersion != null) {
maxVersionIndex = Collections.binarySearch(ALL_VERSIONS, maxVersion);
maxVersionIndex = Collections.binarySearch(allReleasedVersions, maxVersion);
}
if (minVersionIndex < 0) {
throw new IllegalArgumentException("minVersion [" + minVersion + "] does not exist.");
@ -72,7 +71,7 @@ public class TransportVersionUtils {
} else {
// minVersionIndex is inclusive so need to add 1 to this index
int range = maxVersionIndex + 1 - minVersionIndex;
return ALL_VERSIONS.get(minVersionIndex + random.nextInt(range));
return allReleasedVersions.get(minVersionIndex + random.nextInt(range));
}
}
@ -83,7 +82,7 @@ public class TransportVersionUtils {
}
public static TransportVersion getPreviousVersion(TransportVersion version) {
int place = Collections.binarySearch(ALL_VERSIONS, version);
int place = Collections.binarySearch(allReleasedVersions(), version);
if (place < 0) {
// version does not exist - need the item before the index this version should be inserted
place = -(place + 1);
@ -92,7 +91,7 @@ public class TransportVersionUtils {
if (place < 1) {
throw new IllegalArgumentException("couldn't find any released versions before [" + version + "]");
}
return ALL_VERSIONS.get(place - 1);
return allReleasedVersions().get(place - 1);
}
public static TransportVersion getNextVersion(TransportVersion version) {
@ -100,7 +99,8 @@ public class TransportVersionUtils {
}
public static TransportVersion getNextVersion(TransportVersion version, boolean createIfNecessary) {
int place = Collections.binarySearch(ALL_VERSIONS, version);
List<TransportVersion> allReleasedVersions = allReleasedVersions();
int place = Collections.binarySearch(allReleasedVersions, version);
if (place < 0) {
// version does not exist - need the item at the index this version should be inserted
place = -(place + 1);
@ -109,7 +109,7 @@ public class TransportVersionUtils {
place++;
}
if (place < 0 || place >= ALL_VERSIONS.size()) {
if (place < 0 || place >= allReleasedVersions.size()) {
if (createIfNecessary) {
// create a new transport version one greater than specified
return new TransportVersion(version.id() + 1);
@ -117,7 +117,7 @@ public class TransportVersionUtils {
throw new IllegalArgumentException("couldn't find any released versions after [" + version + "]");
}
}
return ALL_VERSIONS.get(place);
return allReleasedVersions.get(place);
}
/** Returns a random {@code TransportVersion} that is compatible with {@link TransportVersion#current()} */

View file

@ -109,7 +109,19 @@ public class DataStreamRestIT extends ESRestTestCase {
private void putFailureStoreTemplate() {
try {
Request request = new Request("PUT", "/_index_template/fs-template");
request.setJsonEntity("{\"index_patterns\": [\"fs*\"], \"data_stream\": {\"failure_store\": true}}");
request.setJsonEntity("""
{
"index_patterns": ["fs*"],
"data_stream": {},
"template": {
"data_stream_options": {
"failure_store": {
"enabled": true
}
}
}
}
""");
assertAcknowledged(client().performRequest(request));
} catch (Exception e) {
fail("failed to insert index template with failure store enabled - got: " + e);

View file

@ -16,14 +16,14 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.KnownTransportVersions.ALL_VERSIONS;
import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractBWCSerializationTestCase<T extends Writeable & ToXContent> extends AbstractXContentSerializingTestCase<T> {
private static List<TransportVersion> getAllBWCVersions() {
int minCompatVersion = Collections.binarySearch(ALL_VERSIONS, TransportVersions.MINIMUM_COMPATIBLE);
return ALL_VERSIONS.subList(minCompatVersion, ALL_VERSIONS.size());
List<TransportVersion> allVersions = TransportVersion.getAllVersions();
int minCompatVersion = Collections.binarySearch(allVersions, TransportVersions.MINIMUM_COMPATIBLE);
return allVersions.subList(minCompatVersion, allVersions.size());
}
private static final List<TransportVersion> DEFAULT_BWC_VERSIONS = getAllBWCVersions();

View file

@ -15,14 +15,14 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.KnownTransportVersions.ALL_VERSIONS;
import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractBWCWireSerializingTestCase<T extends Writeable> extends AbstractWireSerializingTestCase<T> {
private static List<TransportVersion> getAllBWCVersions() {
int minCompatVersion = Collections.binarySearch(ALL_VERSIONS, TransportVersions.MINIMUM_COMPATIBLE);
return ALL_VERSIONS.subList(minCompatVersion, ALL_VERSIONS.size());
List<TransportVersion> allVersions = TransportVersion.getAllVersions();
int minCompatVersion = Collections.binarySearch(allVersions, TransportVersions.MINIMUM_COMPATIBLE);
return allVersions.subList(minCompatVersion, allVersions.size());
}
private static final List<TransportVersion> DEFAULT_BWC_VERSIONS = getAllBWCVersions();

View file

@ -39,9 +39,9 @@ import static org.elasticsearch.xpack.esql.core.util.DateUtils.UTC_DATE_TIME_FOR
import static org.elasticsearch.xpack.esql.core.util.NumericUtils.unsignedLongAsNumber;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.CARTESIAN;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
public final class CsvAssert {
@ -197,11 +197,7 @@ public final class CsvAssert {
for (int row = 0; row < expectedValues.size(); row++) {
try {
if (row >= actualValues.size()) {
if (dataFailures.isEmpty()) {
fail("Expected more data but no more entries found after [" + row + "]");
} else {
dataFailure(dataFailures, "Expected more data but no more entries found after [" + row + "]\n");
}
dataFailure("Expected more data but no more entries found after [" + row + "]", dataFailures, expected, actualValues);
}
if (logger != null) {
@ -250,13 +246,17 @@ public final class CsvAssert {
dataFailures.add(new DataFailure(row, column, transformedExpected, transformedActual));
}
if (dataFailures.size() > 10) {
dataFailure(dataFailures);
dataFailure("", dataFailures, expected, actualValues);
}
}
var delta = actualRow.size() - expectedRow.size();
if (delta > 0) {
fail("Plan has extra columns, returned [" + actualRow.size() + "], expected [" + expectedRow.size() + "]");
if (actualRow.size() != expectedRow.size()) {
dataFailure(
"Plan has extra columns, returned [" + actualRow.size() + "], expected [" + expectedRow.size() + "]",
dataFailures,
expected,
actualValues
);
}
} catch (AssertionError ae) {
if (logger != null && row + 1 < actualValues.size()) {
@ -267,21 +267,59 @@ public final class CsvAssert {
}
}
if (dataFailures.isEmpty() == false) {
dataFailure(dataFailures);
dataFailure("", dataFailures, expected, actualValues);
}
if (expectedValues.size() < actualValues.size()) {
fail(
"Elasticsearch still has data after [" + expectedValues.size() + "] entries:\n" + row(actualValues, expectedValues.size())
);
dataFailure("Elasticsearch still has data after [" + expectedValues.size() + "] entries", dataFailures, expected, actualValues);
}
}
private static void dataFailure(List<DataFailure> dataFailures) {
dataFailure(dataFailures, "");
private static void dataFailure(
String description,
List<DataFailure> dataFailures,
ExpectedResults expectedValues,
List<List<Object>> actualValues
) {
var expected = pipeTable("Expected:", expectedValues.columnNames(), expectedValues.values(), 25);
var actual = pipeTable("Actual:", expectedValues.columnNames(), actualValues, 25);
fail(description + System.lineSeparator() + describeFailures(dataFailures) + actual + expected);
}
private static void dataFailure(List<DataFailure> dataFailures, String prefixError) {
fail(prefixError + "Data mismatch:\n" + dataFailures.stream().map(f -> {
private static String pipeTable(String description, List<String> headers, List<List<Object>> values, int maxRows) {
int[] width = new int[headers.size()];
for (int i = 0; i < width.length; i++) {
width[i] = headers.get(i).length();
for (List<Object> row : values) {
width[i] = Math.max(width[i], String.valueOf(row.get(i)).length());
}
}
var result = new StringBuilder().append(System.lineSeparator()).append(description).append(System.lineSeparator());
for (int c = 0; c < width.length; c++) {
appendValue(result, headers.get(c), width[c]);
}
result.append('|').append(System.lineSeparator());
for (int r = 0; r < Math.min(maxRows, values.size()); r++) {
for (int c = 0; c < width.length; c++) {
appendValue(result, values.get(r).get(c), width[c]);
}
result.append('|').append(System.lineSeparator());
}
if (values.size() > maxRows) {
result.append("...").append(System.lineSeparator());
}
return result.toString();
}
private static void appendValue(StringBuilder result, Object value, int width) {
result.append('|').append(value);
for (int i = 0; i < width - String.valueOf(value).length(); i++) {
result.append(' ');
}
}
private static String describeFailures(List<DataFailure> dataFailures) {
return "Data mismatch:" + System.lineSeparator() + dataFailures.stream().map(f -> {
Description description = new StringDescription();
ListMatcher expected;
if (f.expected instanceof List<?> e) {
@ -299,7 +337,7 @@ public final class CsvAssert {
expected.describeMismatch(actualList, description);
String prefix = "row " + f.row + " column " + f.column + ":";
return prefix + description.toString().replace("\n", "\n" + prefix);
}).collect(Collectors.joining("\n")));
}).collect(Collectors.joining(System.lineSeparator()));
}
private static Comparator<List<Object>> resultRowComparator(List<Type> types) {

View file

@ -382,6 +382,18 @@ public class Verifier {
);
}
})));
agg.aggregates().forEach(a -> a.forEachDown(FilteredExpression.class, fe -> fe.filter().forEachDown(Attribute.class, attribute -> {
var categorize = categorizeByAttribute.get(attribute);
if (categorize != null) {
failures.add(
fail(
attribute,
"cannot reference CATEGORIZE grouping function [{}] within an aggregation filter",
attribute.sourceText()
)
);
}
})));
}
private static void checkRateAggregates(Expression expr, int nestedLevel, Set<Failure> failures) {
@ -421,7 +433,8 @@ public class Verifier {
Expression filter = fe.filter();
failures.add(fail(filter, "WHERE clause allowed only for aggregate functions, none found in [{}]", fe.sourceText()));
}
Expression f = fe.filter(); // check the filter has to be a boolean term, similar as checkFilterConditionType
Expression f = fe.filter();
// check the filter has to be a boolean term, similar as checkFilterConditionType
if (f.dataType() != NULL && f.dataType() != BOOLEAN) {
failures.add(fail(f, "Condition expression needs to be boolean, found [{}]", f.dataType()));
}
@ -432,9 +445,10 @@ public class Verifier {
fail(af, "cannot use aggregate function [{}] in aggregate WHERE clause [{}]", af.sourceText(), fe.sourceText())
);
}
// check the bucketing function against the group
// check the grouping function against the group
else if (c instanceof GroupingFunction gf) {
if (Expressions.anyMatch(groups, ex -> ex instanceof Alias a && a.child().semanticEquals(gf)) == false) {
if (c instanceof Categorize
|| Expressions.anyMatch(groups, ex -> ex instanceof Alias a && a.child().semanticEquals(gf)) == false) {
failures.add(fail(gf, "can only use grouping function [{}] as part of the BY clause", gf.sourceText()));
}
}

View file

@ -1968,6 +1968,26 @@ public class VerifierTests extends ESTestCase {
);
}
public void testCategorizeWithFilteredAggregations() {
assumeTrue("requires Categorize capability", EsqlCapabilities.Cap.CATEGORIZE_V5.isEnabled());
query("FROM test | STATS COUNT(*) WHERE first_name == \"John\" BY CATEGORIZE(last_name)");
query("FROM test | STATS COUNT(*) WHERE last_name == \"Doe\" BY CATEGORIZE(last_name)");
assertEquals(
"1:34: can only use grouping function [CATEGORIZE(first_name)] as part of the BY clause",
error("FROM test | STATS COUNT(*) WHERE CATEGORIZE(first_name) == \"John\" BY CATEGORIZE(last_name)")
);
assertEquals(
"1:34: can only use grouping function [CATEGORIZE(last_name)] as part of the BY clause",
error("FROM test | STATS COUNT(*) WHERE CATEGORIZE(last_name) == \"Doe\" BY CATEGORIZE(last_name)")
);
assertEquals(
"1:34: cannot reference CATEGORIZE grouping function [category] within an aggregation filter",
error("FROM test | STATS COUNT(*) WHERE category == \"Doe\" BY category = CATEGORIZE(last_name)")
);
}
public void testSortByAggregate() {
assertEquals("1:18: Aggregate functions are not allowed in SORT [COUNT]", error("ROW a = 1 | SORT count(*)"));
assertEquals("1:28: Aggregate functions are not allowed in SORT [COUNT]", error("ROW a = 1 | SORT to_string(count(*))"));

View file

@ -104,7 +104,6 @@ public abstract class AbstractScalarFunctionTestCase extends AbstractFunctionTes
assertTypeResolutionFailure(expression);
return;
}
assumeTrue("Expected type must be representable to build an evaluator", DataType.isRepresentable(testCase.expectedType()));
logger.info(
"Test Values: " + testCase.getData().stream().map(TestCaseSupplier.TypedData::toString).collect(Collectors.joining(","))
);
@ -209,7 +208,6 @@ public abstract class AbstractScalarFunctionTestCase extends AbstractFunctionTes
return;
}
assumeTrue("Can't build evaluator", testCase.canBuildEvaluator());
assumeTrue("Expected type must be representable to build an evaluator", DataType.isRepresentable(testCase.expectedType()));
int positions = between(1, 1024);
List<TestCaseSupplier.TypedData> data = testCase.getData();
Page onePositionPage = row(testCase.getDataValues());
@ -275,7 +273,6 @@ public abstract class AbstractScalarFunctionTestCase extends AbstractFunctionTes
return;
}
assumeTrue("Can't build evaluator", testCase.canBuildEvaluator());
assumeTrue("Expected type must be representable to build an evaluator", DataType.isRepresentable(testCase.expectedType()));
int count = 10_000;
int threads = 5;
var evalSupplier = evaluator(expression);
@ -338,14 +335,13 @@ public abstract class AbstractScalarFunctionTestCase extends AbstractFunctionTes
assertThat(factory.toString(), testCase.evaluatorToString());
}
public final void testFold() {
public void testFold() {
Expression expression = buildLiteralExpression(testCase);
if (testCase.getExpectedTypeError() != null) {
assertTypeResolutionFailure(expression);
return;
}
assertFalse("expected resolved", expression.typeResolved().unresolved());
assumeTrue("Can't build evaluator", testCase.canBuildEvaluator());
Expression nullOptimized = new FoldNull().rule(expression);
assertThat(nullOptimized.dataType(), equalTo(testCase.expectedType()));
assertTrue(nullOptimized.foldable());

View file

@ -278,6 +278,9 @@ public record TestCaseSupplier(String name, List<DataType> types, Supplier<TestC
for (String warning : warnings.apply(lhsTyped, rhsTyped)) {
testCase = testCase.withWarning(warning);
}
if (DataType.isRepresentable(expectedType) == false) {
testCase = testCase.withoutEvaluator();
}
return testCase;
});
}
@ -1438,7 +1441,7 @@ public record TestCaseSupplier(String name, List<DataType> types, Supplier<TestC
foldingExceptionClass,
foldingExceptionMessage,
extra,
data.stream().allMatch(d -> d.forceLiteral || DataType.isRepresentable(d.type))
true
);
}

View file

@ -58,4 +58,9 @@ public class CategorizeTests extends AbstractScalarFunctionTestCase {
protected Expression build(Source source, List<Expression> args) {
return new Categorize(source, args.get(0));
}
@Override
public void testFold() {
// Cannot be folded
}
}

View file

@ -50,7 +50,7 @@ public class ToDatePeriodTests extends AbstractScalarFunctionTestCase {
matchesPattern("LiteralsEvaluator.*"),
DATE_PERIOD,
equalTo(field)
);
).withoutEvaluator();
}));
for (EsqlDataTypeConverter.INTERVALS interval : DATE_PERIODS) {
@ -67,7 +67,7 @@ public class ToDatePeriodTests extends AbstractScalarFunctionTestCase {
matchesPattern("LiteralsEvaluator.*"),
DATE_PERIOD,
equalTo(result)
);
).withoutEvaluator();
}));
}
}

View file

@ -126,7 +126,7 @@ public class ToDoubleTests extends AbstractScalarFunctionTestCase {
);
TestCaseSupplier.unary(
suppliers,
evaluatorName.apply("Integer"),
evaluatorName.apply("Int"),
List.of(new TestCaseSupplier.TypedDataSupplier("counter", () -> randomInt(1000), DataType.COUNTER_INTEGER)),
DataType.DOUBLE,
l -> ((Integer) l).doubleValue(),

View file

@ -230,7 +230,7 @@ public class ToLongTests extends AbstractScalarFunctionTestCase {
);
TestCaseSupplier.unary(
suppliers,
evaluatorName.apply("Integer"),
evaluatorName.apply("Int"),
List.of(new TestCaseSupplier.TypedDataSupplier("counter", ESTestCase::randomInt, DataType.COUNTER_INTEGER)),
DataType.LONG,
l -> ((Integer) l).longValue(),

View file

@ -50,7 +50,7 @@ public class ToTimeDurationTests extends AbstractScalarFunctionTestCase {
matchesPattern("LiteralsEvaluator.*"),
TIME_DURATION,
equalTo(field)
);
).withoutEvaluator();
}));
for (EsqlDataTypeConverter.INTERVALS interval : TIME_DURATIONS) {
@ -66,7 +66,7 @@ public class ToTimeDurationTests extends AbstractScalarFunctionTestCase {
matchesPattern("LiteralsEvaluator.*"),
TIME_DURATION,
equalTo(result)
);
).withoutEvaluator();
}));
}
}

View file

@ -136,10 +136,10 @@ public class DateTruncTests extends AbstractScalarFunctionTestCase {
+ pad(randomIntBetween(0, 59));
return new TestCaseSupplier.TestCase(
List.of(
new TestCaseSupplier.TypedData(Duration.ofSeconds(1), DataType.TIME_DURATION, "interval"),
new TestCaseSupplier.TypedData(Duration.ofSeconds(1), DataType.TIME_DURATION, "interval").forceLiteral(),
new TestCaseSupplier.TypedData(toMillis(dateFragment + ".38Z"), DataType.DATETIME, "date")
),
"DateTruncDatetimeEvaluator[date=Attribute[channel=1], interval=Attribute[channel=0]]",
Matchers.startsWith("DateTruncDatetimeEvaluator[fieldVal=Attribute[channel=0], rounding=Rounding["),
DataType.DATETIME,
equalTo(toMillis(dateFragment + ".00Z"))
);

View file

@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.xpack.esql.VerificationException;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
@ -18,6 +17,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTestCase;
import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
import org.hamcrest.Matchers;
import java.time.Duration;
import java.time.Period;
@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import static org.elasticsearch.compute.data.BlockUtils.toJavaObject;
import static org.elasticsearch.xpack.esql.EsqlTestUtils.randomLiteral;
import static org.hamcrest.Matchers.equalTo;
@ -97,19 +96,19 @@ public class NegTests extends AbstractScalarFunctionTestCase {
suppliers.addAll(List.of(new TestCaseSupplier("Duration", List.of(DataType.TIME_DURATION), () -> {
Duration arg = (Duration) randomLiteral(DataType.TIME_DURATION).value();
return new TestCaseSupplier.TestCase(
List.of(new TestCaseSupplier.TypedData(arg, DataType.TIME_DURATION, "arg")),
"No evaluator since this expression is only folded",
List.of(new TestCaseSupplier.TypedData(arg, DataType.TIME_DURATION, "arg").forceLiteral()),
Matchers.startsWith("LiteralsEvaluator[lit="),
DataType.TIME_DURATION,
equalTo(arg.negated())
);
).withoutEvaluator();
}), new TestCaseSupplier("Period", List.of(DataType.DATE_PERIOD), () -> {
Period arg = (Period) randomLiteral(DataType.DATE_PERIOD).value();
return new TestCaseSupplier.TestCase(
List.of(new TestCaseSupplier.TypedData(arg, DataType.DATE_PERIOD, "arg")),
"No evaluator since this expression is only folded",
List.of(new TestCaseSupplier.TypedData(arg, DataType.DATE_PERIOD, "arg").forceLiteral()),
Matchers.startsWith("LiteralsEvaluator[lit="),
DataType.DATE_PERIOD,
equalTo(arg.negated())
);
).withoutEvaluator();
})));
return parameterSuppliersFromTypedDataWithDefaultChecks(false, suppliers, (v, p) -> "numeric, date_period or time_duration");
}
@ -126,25 +125,25 @@ public class NegTests extends AbstractScalarFunctionTestCase {
if (testCaseType == DataType.DATE_PERIOD) {
Period maxPeriod = Period.of(Integer.MAX_VALUE, Integer.MAX_VALUE, Integer.MAX_VALUE);
Period negatedMaxPeriod = Period.of(-Integer.MAX_VALUE, -Integer.MAX_VALUE, -Integer.MAX_VALUE);
assertEquals(negatedMaxPeriod, process(maxPeriod));
assertEquals(negatedMaxPeriod, foldTemporalAmount(maxPeriod));
Period minPeriod = Period.of(Integer.MIN_VALUE, Integer.MIN_VALUE, Integer.MIN_VALUE);
VerificationException e = expectThrows(
VerificationException.class,
"Expected exception when negating minimal date period.",
() -> process(minPeriod)
() -> foldTemporalAmount(minPeriod)
);
assertEquals(e.getMessage(), "arithmetic exception in expression []: [integer overflow]");
} else if (testCaseType == DataType.TIME_DURATION) {
Duration maxDuration = Duration.ofSeconds(Long.MAX_VALUE, 0);
Duration negatedMaxDuration = Duration.ofSeconds(-Long.MAX_VALUE, 0);
assertEquals(negatedMaxDuration, process(maxDuration));
assertEquals(negatedMaxDuration, foldTemporalAmount(maxDuration));
Duration minDuration = Duration.ofSeconds(Long.MIN_VALUE, 0);
VerificationException e = expectThrows(
VerificationException.class,
"Expected exception when negating minimal time duration.",
() -> process(minDuration)
() -> foldTemporalAmount(minDuration)
);
assertEquals(
e.getMessage(),
@ -153,16 +152,9 @@ public class NegTests extends AbstractScalarFunctionTestCase {
}
}
private Object process(Object val) {
if (testCase.canBuildEvaluator()) {
Neg neg = new Neg(Source.EMPTY, field("val", typeOf(val)));
try (Block block = evaluator(neg).get(driverContext()).eval(row(List.of(val)))) {
return toJavaObject(block, 0);
}
} else { // just fold if type is not representable
Neg neg = new Neg(Source.EMPTY, new Literal(Source.EMPTY, val, typeOf(val)));
return neg.fold();
}
private Object foldTemporalAmount(Object val) {
Neg neg = new Neg(Source.EMPTY, new Literal(Source.EMPTY, val, typeOf(val)));
return neg.fold();
}
private static DataType typeOf(Object val) {

View file

@ -169,12 +169,12 @@ public class SubTests extends AbstractScalarFunctionTestCase {
return new TestCaseSupplier.TestCase(
List.of(
new TestCaseSupplier.TypedData(lhs, DataType.DATE_PERIOD, "lhs"),
new TestCaseSupplier.TypedData(rhs, DataType.DATE_PERIOD, "rhs")
new TestCaseSupplier.TypedData(rhs, DataType.DATE_PERIOD, "rhs").forceLiteral()
),
"Only folding possible, so there's no evaluator",
DataType.DATE_PERIOD,
equalTo(lhs.minus(rhs))
);
).withoutEvaluator();
}));
suppliers.add(new TestCaseSupplier("Datetime - Duration", List.of(DataType.DATETIME, DataType.TIME_DURATION), () -> {
long lhs = (Long) randomLiteral(DataType.DATETIME).value();
@ -196,12 +196,12 @@ public class SubTests extends AbstractScalarFunctionTestCase {
return new TestCaseSupplier.TestCase(
List.of(
new TestCaseSupplier.TypedData(lhs, DataType.TIME_DURATION, "lhs"),
new TestCaseSupplier.TypedData(rhs, DataType.TIME_DURATION, "rhs")
new TestCaseSupplier.TypedData(rhs, DataType.TIME_DURATION, "rhs").forceLiteral()
),
"Only folding possible, so there's no evaluator",
DataType.TIME_DURATION,
equalTo(lhs.minus(rhs))
);
).withoutEvaluator();
}));
// exact math arithmetic exceptions
@ -250,7 +250,12 @@ public class SubTests extends AbstractScalarFunctionTestCase {
return original.getData().get(nullPosition == 0 ? 1 : 0).type();
}
return original.expectedType();
}, (nullPosition, nullData, original) -> nullData.isForceLiteral() ? equalTo("LiteralsEvaluator[lit=null]") : original);
}, (nullPosition, nullData, original) -> {
if (DataType.isTemporalAmount(nullData.type())) {
return equalTo("LiteralsEvaluator[lit=null]");
}
return original;
});
suppliers.add(new TestCaseSupplier("MV", List.of(DataType.INTEGER, DataType.INTEGER), () -> {
// Ensure we don't have an overflow

View file

@ -38,7 +38,7 @@ final class SyntheticSourceLicenseService {
"xpack.mapping.synthetic_source_fallback_to_stored_source",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
Setting.Property.OperatorDynamic
);
static final LicensedFeature.Momentary SYNTHETIC_SOURCE_FEATURE = LicensedFeature.momentary(

View file

@ -33,7 +33,10 @@ Test failure store with logsdb:
- method: PUT
path: /_bulk
capabilities: [ 'failure_store_status' ]
reason: "Support for 'logsdb' index mode & failure status capability required"
- method: POST
path: /_index_template/{template}
capabilities: [ 'failure_store_in_template' ]
reason: "Support for 'logsdb' index mode & failure status config in templates"
- do:
allowed_warnings:
@ -42,14 +45,16 @@ Test failure store with logsdb:
name: my-template
body:
index_patterns: ["my-logs-fs*"]
data_stream:
failure_store: true
data_stream: {}
template:
settings:
index:
mode: logsdb
number_of_replicas: 1
number_of_shards: 2
data_stream_options:
failure_store:
enabled: true
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [my-logs-db*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"