mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Adding support for additional mapping to simulate ingest API (#114742)
This commit is contained in:
parent
79de53ae7b
commit
2ff6bb0543
12 changed files with 675 additions and 131 deletions
5
docs/changelog/114742.yaml
Normal file
5
docs/changelog/114742.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 114742
|
||||
summary: Adding support for additional mapping to simulate ingest API
|
||||
area: Ingest Node
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -108,6 +108,14 @@ POST /_ingest/_simulate
|
|||
"index_patterns": ["my-index-*"],
|
||||
"composed_of": ["component_template_1", "component_template_2"]
|
||||
}
|
||||
},
|
||||
"mapping_addition": { <4>
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
----
|
||||
|
@ -117,6 +125,7 @@ POST /_ingest/_simulate
|
|||
These templates can be used to change the pipeline(s) used, or to modify the mapping that will be used to validate the result.
|
||||
<3> This replaces the existing `my-index-template` index template with the contents given here for the duration of this request.
|
||||
These templates can be used to change the pipeline(s) used, or to modify the mapping that will be used to validate the result.
|
||||
<4> This mapping is merged into the index's final mapping just before validation. It is used only for the duration of this request.
|
||||
|
||||
[[simulate-ingest-api-request]]
|
||||
==== {api-request-title}
|
||||
|
@ -246,6 +255,10 @@ include::{es-ref-dir}/indices/put-index-template.asciidoc[tag=request-body]
|
|||
|
||||
====
|
||||
|
||||
`mapping_addition`::
|
||||
(Optional, <<mapping,mapping object>>)
|
||||
Definition of a mapping that will be merged into the index's mapping for validation during the course of this request.
|
||||
|
||||
[[simulate-ingest-api-example]]
|
||||
==== {api-examples-title}
|
||||
|
||||
|
|
|
@ -1216,3 +1216,358 @@ setup:
|
|||
- match: { docs.0.doc._source.foo: "FOO" }
|
||||
- match: { docs.0.doc.executed_pipelines: ["foo-pipeline-2"] }
|
||||
- not_exists: docs.0.doc.error
|
||||
|
||||
---
|
||||
"Test ingest simulate with mapping addition for data streams":
|
||||
# In this test, we make sure that when the index template is a data stream template, simulate ingest works the same whether the data
|
||||
# stream has been created or not -- either way, we expect it to use the template rather than the data stream / index mappings and settings.
|
||||
|
||||
- skip:
|
||||
features:
|
||||
- headers
|
||||
- allowed_warnings
|
||||
|
||||
- requires:
|
||||
cluster_features: ["simulate.mapping.addition"]
|
||||
reason: "ingest simulate mapping addition added in 8.16"
|
||||
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
ingest.put_pipeline:
|
||||
id: "foo-pipeline"
|
||||
body: >
|
||||
{
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "foo",
|
||||
"value": true
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
cluster.put_component_template:
|
||||
name: mappings_template
|
||||
body:
|
||||
template:
|
||||
mappings:
|
||||
dynamic: strict
|
||||
properties:
|
||||
foo:
|
||||
type: boolean
|
||||
|
||||
- do:
|
||||
cluster.put_component_template:
|
||||
name: settings_template
|
||||
body:
|
||||
template:
|
||||
settings:
|
||||
index:
|
||||
default_pipeline: "foo-pipeline"
|
||||
|
||||
- do:
|
||||
allowed_warnings:
|
||||
- "index template [test-composable-1] has index patterns [foo*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [test-composable-1] will take precedence during new index creation"
|
||||
indices.put_index_template:
|
||||
name: test-composable-1
|
||||
body:
|
||||
index_patterns:
|
||||
- foo*
|
||||
composed_of:
|
||||
- mappings_template
|
||||
- settings_template
|
||||
|
||||
- do:
|
||||
allowed_warnings:
|
||||
- "index template [my-template-1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template-1] will take precedence during new index creation"
|
||||
indices.put_index_template:
|
||||
name: my-template-1
|
||||
body:
|
||||
index_patterns: [simple-data-stream1]
|
||||
composed_of:
|
||||
- mappings_template
|
||||
- settings_template
|
||||
data_stream: {}
|
||||
|
||||
# Here we replace my-template-1 with a substitute version that uses the settings_template_2 and mappings_template_2 templates defined in
|
||||
# this request, and foo-pipeline-2 defined in this request.
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
simulate.ingest:
|
||||
index: simple-data-stream1
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_id": "asdf",
|
||||
"_source": {
|
||||
"@timestamp": 1234,
|
||||
"foo": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"pipeline_substitutions": {
|
||||
"foo-pipeline-2": {
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "foo",
|
||||
"value": "FOO"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"component_template_substitutions": {
|
||||
"settings_template_2": {
|
||||
"template": {
|
||||
"settings": {
|
||||
"index": {
|
||||
"default_pipeline": "foo-pipeline-2"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"mappings_template_2": {
|
||||
"template": {
|
||||
"mappings": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"index_template_substitutions": {
|
||||
"my-template-1": {
|
||||
"index_patterns": ["simple-data-stream1"],
|
||||
"composed_of": ["settings_template_2", "mappings_template_2"],
|
||||
"data_stream": {}
|
||||
}
|
||||
},
|
||||
"mapping_addition": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._index: "simple-data-stream1" }
|
||||
- match: { docs.0.doc._source.foo: "FOO" }
|
||||
- match: { docs.0.doc.executed_pipelines: ["foo-pipeline-2"] }
|
||||
- not_exists: docs.0.doc.error
|
||||
|
||||
- do:
|
||||
indices.create_data_stream:
|
||||
name: simple-data-stream1
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
cluster.health:
|
||||
wait_for_status: yellow
|
||||
|
||||
# Now that we have created a data stream, run the exact same simulate ingeset request to make sure we still get the same result, and that
|
||||
# the substitutions and additions from the simulate ingest request are used instead of information from the data stream or its backing
|
||||
# index.
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
simulate.ingest:
|
||||
index: simple-data-stream1
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_id": "asdf",
|
||||
"_source": {
|
||||
"@timestamp": 1234,
|
||||
"foo": false
|
||||
}
|
||||
}
|
||||
],
|
||||
"pipeline_substitutions": {
|
||||
"foo-pipeline-2": {
|
||||
"processors": [
|
||||
{
|
||||
"set": {
|
||||
"field": "foo",
|
||||
"value": "FOO"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"component_template_substitutions": {
|
||||
"settings_template_2": {
|
||||
"template": {
|
||||
"settings": {
|
||||
"index": {
|
||||
"default_pipeline": "foo-pipeline-2"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"mappings_template_2": {
|
||||
"template": {
|
||||
"mappings": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"index_template_substitutions": {
|
||||
"my-template-1": {
|
||||
"index_patterns": ["simple-data-stream1"],
|
||||
"composed_of": ["settings_template_2", "mappings_template_2"],
|
||||
"data_stream": {}
|
||||
}
|
||||
},
|
||||
"mapping_addition": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"foo": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._index: "simple-data-stream1" }
|
||||
- match: { docs.0.doc._source.foo: "FOO" }
|
||||
- match: { docs.0.doc.executed_pipelines: ["foo-pipeline-2"] }
|
||||
- not_exists: docs.0.doc.error
|
||||
|
||||
---
|
||||
"Test mapping addition works with legacy templates":
|
||||
# In this test, we make sure that when the index template is a data stream template, simulate ingest works the same whether the data
|
||||
# stream has been created or not -- either way, we expect it to use the template rather than the data stream / index mappings and settings.
|
||||
|
||||
- skip:
|
||||
features:
|
||||
- headers
|
||||
- allowed_warnings
|
||||
|
||||
- requires:
|
||||
cluster_features: ["simulate.mapping.addition"]
|
||||
reason: "ingest simulate mapping addition added in 8.16"
|
||||
|
||||
- do:
|
||||
indices.put_template:
|
||||
name: my-legacy-template
|
||||
body:
|
||||
index_patterns: foo-*
|
||||
settings:
|
||||
number_of_replicas: 0
|
||||
mappings:
|
||||
dynamic: strict
|
||||
properties:
|
||||
foo:
|
||||
type: integer
|
||||
bar:
|
||||
type: boolean
|
||||
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
simulate.ingest:
|
||||
index: foo-1
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_id": "asdf",
|
||||
"_source": {
|
||||
"foo": 3,
|
||||
"bar": "not a boolean"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._index: "foo-1" }
|
||||
- match: { docs.0.doc._source.foo: 3 }
|
||||
- match: { docs.0.doc._source.bar: "not a boolean" }
|
||||
- match: { docs.0.doc.error.type: "document_parsing_exception" }
|
||||
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
simulate.ingest:
|
||||
index: foo-1
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_id": "asdf",
|
||||
"_source": {
|
||||
"foo": 3,
|
||||
"bar": "not a boolean"
|
||||
}
|
||||
}
|
||||
],
|
||||
"mapping_addition": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"bar": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._index: "foo-1" }
|
||||
- match: { docs.0.doc._source.foo: 3 }
|
||||
- match: { docs.0.doc._source.bar: "not a boolean" }
|
||||
- not_exists: docs.0.doc.error
|
||||
|
||||
- do:
|
||||
indices.create:
|
||||
index: foo-1
|
||||
- match: { acknowledged: true }
|
||||
|
||||
- do:
|
||||
headers:
|
||||
Content-Type: application/json
|
||||
simulate.ingest:
|
||||
index: foo-1
|
||||
body: >
|
||||
{
|
||||
"docs": [
|
||||
{
|
||||
"_id": "asdf",
|
||||
"_source": {
|
||||
"foo": 3,
|
||||
"bar": "not a boolean"
|
||||
}
|
||||
}
|
||||
],
|
||||
"mapping_addition": {
|
||||
"dynamic": "strict",
|
||||
"properties": {
|
||||
"bar": {
|
||||
"type": "keyword"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
- length: { docs: 1 }
|
||||
- match: { docs.0.doc._index: "foo-1" }
|
||||
- match: { docs.0.doc._source.foo: 3 }
|
||||
- match: { docs.0.doc._source.bar: "not a boolean" }
|
||||
- not_exists: docs.0.doc.error
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -59,7 +60,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
}
|
||||
""";
|
||||
indicesAdmin().create(new CreateIndexRequest(indexName).mapping(mapping)).actionGet();
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -131,10 +132,10 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
|
||||
String indexName = "my-index-1";
|
||||
// First, run before the index is created:
|
||||
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName, indexTemplateName);
|
||||
assertMappingsUpdatedFromSubstitutions(indexName, indexTemplateName);
|
||||
// Now, create the index and make sure the component template substitutions work the same:
|
||||
indicesAdmin().create(new CreateIndexRequest(indexName)).actionGet();
|
||||
assertMappingsUpdatedFromComponentTemplateSubstitutions(indexName, indexTemplateName);
|
||||
assertMappingsUpdatedFromSubstitutions(indexName, indexTemplateName);
|
||||
// Now make sure nothing was actually changed:
|
||||
indicesAdmin().refresh(new RefreshRequest(indexName)).actionGet();
|
||||
SearchResponse searchResponse = client().search(new SearchRequest(indexName)).actionGet();
|
||||
|
@ -146,7 +147,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
assertThat(fields.size(), equalTo(1));
|
||||
}
|
||||
|
||||
private void assertMappingsUpdatedFromComponentTemplateSubstitutions(String indexName, String indexTemplateName) {
|
||||
private void assertMappingsUpdatedFromSubstitutions(String indexName, String indexTemplateName) {
|
||||
IndexRequest indexRequest1 = new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -159,7 +160,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
""", XContentType.JSON).id(randomUUID());
|
||||
{
|
||||
// First we use the original component template, and expect a failure in the second document:
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(indexRequest1);
|
||||
bulkRequest.add(indexRequest2);
|
||||
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
|
||||
|
@ -192,6 +193,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
)
|
||||
)
|
||||
),
|
||||
Map.of(),
|
||||
Map.of()
|
||||
);
|
||||
bulkRequest.add(indexRequest1);
|
||||
|
@ -226,7 +228,34 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
)
|
||||
)
|
||||
),
|
||||
Map.of(indexTemplateName, Map.of("index_patterns", List.of(indexName), "composed_of", List.of("test-component-template-2")))
|
||||
Map.of(
|
||||
indexTemplateName,
|
||||
Map.of("index_patterns", List.of(indexName), "composed_of", List.of("test-component-template-2"))
|
||||
),
|
||||
Map.of()
|
||||
);
|
||||
bulkRequest.add(indexRequest1);
|
||||
bulkRequest.add(indexRequest2);
|
||||
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
|
||||
assertThat(response.getItems().length, equalTo(2));
|
||||
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
|
||||
assertNull(((SimulateIndexResponse) response.getItems()[0].getResponse()).getException());
|
||||
assertThat(response.getItems()[1].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
|
||||
assertNull(((SimulateIndexResponse) response.getItems()[1].getResponse()).getException());
|
||||
}
|
||||
|
||||
{
|
||||
/*
|
||||
* Now we mapping_addition that defines both fields, so we expect no exception:
|
||||
*/
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(
|
||||
Map.of(),
|
||||
Map.of(),
|
||||
Map.of(),
|
||||
Map.of(
|
||||
"_doc",
|
||||
Map.of("dynamic", "strict", "properties", Map.of("foo1", Map.of("type", "text"), "foo3", Map.of("type", "text")))
|
||||
)
|
||||
);
|
||||
bulkRequest.add(indexRequest1);
|
||||
bulkRequest.add(indexRequest2);
|
||||
|
@ -245,7 +274,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
* mapping-less "random-index-template" created by the parent class), so we expect no mapping validation failure.
|
||||
*/
|
||||
String indexName = randomAlphaOfLength(20).toLowerCase(Locale.ROOT);
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -292,7 +321,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
request.indexTemplate(composableIndexTemplate);
|
||||
|
||||
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -324,7 +353,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
indicesAdmin().putTemplate(
|
||||
new PutIndexTemplateRequest("test-template").patterns(List.of("my-index-*")).mapping("foo1", "type=integer")
|
||||
).actionGet();
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -378,7 +407,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet();
|
||||
{
|
||||
// First, try with no @timestamp to make sure we're picking up data-stream-specific templates
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"foo1": "baz"
|
||||
|
@ -389,7 +418,8 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
"foo3": "baz"
|
||||
}
|
||||
""", XContentType.JSON).id(randomUUID()));
|
||||
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest).actionGet();
|
||||
BulkResponse response = client().execute(new ActionType<BulkResponse>(SimulateBulkAction.NAME), bulkRequest)
|
||||
.actionGet(5, TimeUnit.SECONDS);
|
||||
assertThat(response.getItems().length, equalTo(2));
|
||||
assertThat(response.getItems()[0].getResponse().getResult(), equalTo(DocWriteResponse.Result.CREATED));
|
||||
assertThat(
|
||||
|
@ -404,7 +434,7 @@ public class TransportSimulateBulkActionIT extends ESIntegTestCase {
|
|||
}
|
||||
{
|
||||
// Now with @timestamp
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
bulkRequest.add(new IndexRequest(indexName).source("""
|
||||
{
|
||||
"@timestamp": "2024-08-27",
|
||||
|
|
|
@ -178,6 +178,7 @@ public class TransportVersions {
|
|||
public static final TransportVersion REVERT_REMOVE_MIN_COMPATIBLE_SHARD_NODE = def(8_774_00_0);
|
||||
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_PARENT_SIMPLIFIED = def(8_775_00_0);
|
||||
public static final TransportVersion INFERENCE_DONT_PERSIST_ON_READ = def(8_776_00_0);
|
||||
public static final TransportVersion SIMULATE_MAPPING_ADDITION = def(8_777_00_0);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
|
|
|
@ -16,6 +16,7 @@ import java.util.Set;
|
|||
|
||||
import static org.elasticsearch.action.bulk.TransportSimulateBulkAction.SIMULATE_COMPONENT_TEMPLATE_SUBSTITUTIONS;
|
||||
import static org.elasticsearch.action.bulk.TransportSimulateBulkAction.SIMULATE_INDEX_TEMPLATE_SUBSTITUTIONS;
|
||||
import static org.elasticsearch.action.bulk.TransportSimulateBulkAction.SIMULATE_MAPPING_ADDITION;
|
||||
import static org.elasticsearch.action.bulk.TransportSimulateBulkAction.SIMULATE_MAPPING_VALIDATION;
|
||||
import static org.elasticsearch.action.bulk.TransportSimulateBulkAction.SIMULATE_MAPPING_VALIDATION_TEMPLATES;
|
||||
|
||||
|
@ -25,7 +26,8 @@ public class BulkFeatures implements FeatureSpecification {
|
|||
SIMULATE_MAPPING_VALIDATION,
|
||||
SIMULATE_MAPPING_VALIDATION_TEMPLATES,
|
||||
SIMULATE_COMPONENT_TEMPLATE_SUBSTITUTIONS,
|
||||
SIMULATE_INDEX_TEMPLATE_SUBSTITUTIONS
|
||||
SIMULATE_INDEX_TEMPLATE_SUBSTITUTIONS,
|
||||
SIMULATE_MAPPING_ADDITION
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,12 +15,12 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.xcontent.XContentParserConfiguration;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This extends BulkRequest with support for providing substitute pipeline definitions, component template definitions, and index template
|
||||
|
@ -73,7 +73,8 @@ import java.util.Map;
|
|||
* }
|
||||
* }
|
||||
* }
|
||||
* },
|
||||
* }
|
||||
* },
|
||||
* "index_template_substitutions": {
|
||||
* "my-index-template-1": {
|
||||
* "template": {
|
||||
|
@ -84,6 +85,13 @@ import java.util.Map;
|
|||
* ]
|
||||
* }
|
||||
* }
|
||||
* },
|
||||
* "mapping_addition": {
|
||||
* "dynamic": "strict",
|
||||
* "properties": {
|
||||
* "foo": {
|
||||
* "type": "keyword"
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* The pipelineSubstitutions Map held by this class is intended to be the result of XContentHelper.convertToMap(). The top-level keys
|
||||
|
@ -94,6 +102,7 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
private final Map<String, Map<String, Object>> pipelineSubstitutions;
|
||||
private final Map<String, Map<String, Object>> componentTemplateSubstitutions;
|
||||
private final Map<String, Map<String, Object>> indexTemplateSubstitutions;
|
||||
private final Map<String, Object> mappingAddition;
|
||||
|
||||
/**
|
||||
* @param pipelineSubstitutions The pipeline definitions that are to be used in place of any pre-existing pipeline definitions with
|
||||
|
@ -103,16 +112,23 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
* component template definitions with the same name.
|
||||
* @param indexTemplateSubstitutions The index template definitions that are to be used in place of any pre-existing
|
||||
* index template definitions with the same name.
|
||||
* @param mappingAddition A mapping that will be merged into the final index's mapping for mapping validation
|
||||
*/
|
||||
public SimulateBulkRequest(
|
||||
@Nullable Map<String, Map<String, Object>> pipelineSubstitutions,
|
||||
@Nullable Map<String, Map<String, Object>> componentTemplateSubstitutions,
|
||||
@Nullable Map<String, Map<String, Object>> indexTemplateSubstitutions
|
||||
Map<String, Map<String, Object>> pipelineSubstitutions,
|
||||
Map<String, Map<String, Object>> componentTemplateSubstitutions,
|
||||
Map<String, Map<String, Object>> indexTemplateSubstitutions,
|
||||
Map<String, Object> mappingAddition
|
||||
) {
|
||||
super();
|
||||
Objects.requireNonNull(pipelineSubstitutions);
|
||||
Objects.requireNonNull(componentTemplateSubstitutions);
|
||||
Objects.requireNonNull(indexTemplateSubstitutions);
|
||||
Objects.requireNonNull(mappingAddition);
|
||||
this.pipelineSubstitutions = pipelineSubstitutions;
|
||||
this.componentTemplateSubstitutions = componentTemplateSubstitutions;
|
||||
this.indexTemplateSubstitutions = indexTemplateSubstitutions;
|
||||
this.mappingAddition = mappingAddition;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -129,6 +145,11 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
} else {
|
||||
indexTemplateSubstitutions = Map.of();
|
||||
}
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_MAPPING_ADDITION)) {
|
||||
this.mappingAddition = (Map<String, Object>) in.readGenericValue();
|
||||
} else {
|
||||
mappingAddition = Map.of();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -141,6 +162,9 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_INDEX_TEMPLATES_SUBSTITUTIONS)) {
|
||||
out.writeGenericValue(indexTemplateSubstitutions);
|
||||
}
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersions.SIMULATE_MAPPING_ADDITION)) {
|
||||
out.writeGenericValue(mappingAddition);
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Object>> getPipelineSubstitutions() {
|
||||
|
@ -153,41 +177,39 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() throws IOException {
|
||||
if (componentTemplateSubstitutions == null) {
|
||||
return Map.of();
|
||||
}
|
||||
Map<String, ComponentTemplate> result = new HashMap<>(componentTemplateSubstitutions.size());
|
||||
for (Map.Entry<String, Map<String, Object>> rawEntry : componentTemplateSubstitutions.entrySet()) {
|
||||
result.put(rawEntry.getKey(), convertRawTemplateToComponentTemplate(rawEntry.getValue()));
|
||||
}
|
||||
return result;
|
||||
public Map<String, ComponentTemplate> getComponentTemplateSubstitutions() {
|
||||
return componentTemplateSubstitutions.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> convertRawTemplateToComponentTemplate(entry.getValue())));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, ComposableIndexTemplate> getIndexTemplateSubstitutions() throws IOException {
|
||||
if (indexTemplateSubstitutions == null) {
|
||||
return Map.of();
|
||||
}
|
||||
Map<String, ComposableIndexTemplate> result = new HashMap<>(indexTemplateSubstitutions.size());
|
||||
for (Map.Entry<String, Map<String, Object>> rawEntry : indexTemplateSubstitutions.entrySet()) {
|
||||
result.put(rawEntry.getKey(), convertRawTemplateToIndexTemplate(rawEntry.getValue()));
|
||||
}
|
||||
return result;
|
||||
public Map<String, ComposableIndexTemplate> getIndexTemplateSubstitutions() {
|
||||
return indexTemplateSubstitutions.entrySet()
|
||||
.stream()
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, entry -> convertRawTemplateToIndexTemplate(entry.getValue())));
|
||||
}
|
||||
|
||||
private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) throws IOException {
|
||||
public Map<String, Object> getMappingAddition() {
|
||||
return mappingAddition;
|
||||
}
|
||||
|
||||
private static ComponentTemplate convertRawTemplateToComponentTemplate(Map<String, Object> rawTemplate) {
|
||||
ComponentTemplate componentTemplate;
|
||||
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) {
|
||||
componentTemplate = ComponentTemplate.parse(parser);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return componentTemplate;
|
||||
}
|
||||
|
||||
private static ComposableIndexTemplate convertRawTemplateToIndexTemplate(Map<String, Object> rawTemplate) throws IOException {
|
||||
private static ComposableIndexTemplate convertRawTemplateToIndexTemplate(Map<String, Object> rawTemplate) {
|
||||
ComposableIndexTemplate indexTemplate;
|
||||
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawTemplate)) {
|
||||
indexTemplate = ComposableIndexTemplate.parse(parser);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return indexTemplate;
|
||||
}
|
||||
|
@ -197,7 +219,8 @@ public class SimulateBulkRequest extends BulkRequest {
|
|||
BulkRequest bulkRequest = new SimulateBulkRequest(
|
||||
pipelineSubstitutions,
|
||||
componentTemplateSubstitutions,
|
||||
indexTemplateSubstitutions
|
||||
indexTemplateSubstitutions,
|
||||
mappingAddition
|
||||
);
|
||||
bulkRequest.setRefreshPolicy(getRefreshPolicy());
|
||||
bulkRequest.waitForActiveShards(waitForActiveShards());
|
||||
|
|
|
@ -26,10 +26,13 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
|||
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
|
||||
import org.elasticsearch.cluster.metadata.Template;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.core.Nullable;
|
||||
import org.elasticsearch.features.NodeFeature;
|
||||
import org.elasticsearch.index.IndexSettingProvider;
|
||||
import org.elasticsearch.index.IndexSettingProviders;
|
||||
|
@ -37,6 +40,7 @@ import org.elasticsearch.index.IndexVersion;
|
|||
import org.elasticsearch.index.IndexingPressure;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.seqno.SequenceNumbers;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -50,6 +54,10 @@ import org.elasticsearch.tasks.Task;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.xcontent.XContentFactory;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
import org.elasticsearch.xcontent.XContentParserConfiguration;
|
||||
import org.elasticsearch.xcontent.XContentType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -75,6 +83,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
"simulate.component.template.substitutions"
|
||||
);
|
||||
public static final NodeFeature SIMULATE_INDEX_TEMPLATE_SUBSTITUTIONS = new NodeFeature("simulate.index.template.substitutions");
|
||||
public static final NodeFeature SIMULATE_MAPPING_ADDITION = new NodeFeature("simulate.mapping.addition");
|
||||
private final IndicesService indicesService;
|
||||
private final NamedXContentRegistry xContentRegistry;
|
||||
private final Set<IndexSettingProvider> indexSettingProviders;
|
||||
|
@ -122,11 +131,17 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
|
||||
Map<String, ComponentTemplate> componentTemplateSubstitutions = bulkRequest.getComponentTemplateSubstitutions();
|
||||
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = bulkRequest.getIndexTemplateSubstitutions();
|
||||
Map<String, Object> mappingAddition = ((SimulateBulkRequest) bulkRequest).getMappingAddition();
|
||||
for (int i = 0; i < bulkRequest.requests.size(); i++) {
|
||||
DocWriteRequest<?> docRequest = bulkRequest.requests.get(i);
|
||||
assert docRequest instanceof IndexRequest : "TransportSimulateBulkAction should only ever be called with IndexRequests";
|
||||
IndexRequest request = (IndexRequest) docRequest;
|
||||
Exception mappingValidationException = validateMappings(componentTemplateSubstitutions, indexTemplateSubstitutions, request);
|
||||
Exception mappingValidationException = validateMappings(
|
||||
componentTemplateSubstitutions,
|
||||
indexTemplateSubstitutions,
|
||||
mappingAddition,
|
||||
request
|
||||
);
|
||||
responses.set(
|
||||
i,
|
||||
BulkItemResponse.success(
|
||||
|
@ -159,6 +174,7 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
private Exception validateMappings(
|
||||
Map<String, ComponentTemplate> componentTemplateSubstitutions,
|
||||
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions,
|
||||
Map<String, Object> mappingAddition,
|
||||
IndexRequest request
|
||||
) {
|
||||
final SourceToParse sourceToParse = new SourceToParse(
|
||||
|
@ -174,7 +190,10 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
Exception mappingValidationException = null;
|
||||
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(request.index());
|
||||
try {
|
||||
if (indexAbstraction != null && componentTemplateSubstitutions.isEmpty() && indexTemplateSubstitutions.isEmpty()) {
|
||||
if (indexAbstraction != null
|
||||
&& componentTemplateSubstitutions.isEmpty()
|
||||
&& indexTemplateSubstitutions.isEmpty()
|
||||
&& mappingAddition.isEmpty()) {
|
||||
/*
|
||||
* In this case the index exists and we don't have any component template overrides. So we can just use withTempIndexService
|
||||
* to do the mapping validation, using all the existing logic for validation.
|
||||
|
@ -250,36 +269,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
indexSettingProviders
|
||||
);
|
||||
CompressedXContent mappings = template.mappings();
|
||||
if (mappings != null) {
|
||||
MappingMetadata mappingMetadata = new MappingMetadata(mappings);
|
||||
Settings dummySettings = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
||||
.build();
|
||||
final IndexMetadata imd = IndexMetadata.builder(request.index())
|
||||
.settings(dummySettings)
|
||||
.putMapping(mappingMetadata)
|
||||
.build();
|
||||
indicesService.withTempIndexService(imd, indexService -> {
|
||||
indexService.mapperService().updateMapping(null, imd);
|
||||
return IndexShard.prepareIndex(
|
||||
indexService.mapperService(),
|
||||
sourceToParse,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
-1,
|
||||
-1,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
Long.MIN_VALUE,
|
||||
false,
|
||||
request.ifSeqNo(),
|
||||
request.ifPrimaryTerm(),
|
||||
0
|
||||
);
|
||||
});
|
||||
}
|
||||
CompressedXContent mergedMappings = mergeMappings(mappings, mappingAddition);
|
||||
validateUpdatedMappings(mappings, mergedMappings, request, sourceToParse);
|
||||
} else {
|
||||
List<IndexTemplateMetadata> matchingTemplates = findV1Templates(simulatedState.metadata(), request.index(), false);
|
||||
final Map<String, Object> mappingsMap = MetadataCreateIndexService.parseV1Mappings(
|
||||
|
@ -287,40 +278,8 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
matchingTemplates.stream().map(IndexTemplateMetadata::getMappings).collect(toList()),
|
||||
xContentRegistry
|
||||
);
|
||||
final CompressedXContent combinedMappings;
|
||||
if (mappingsMap.isEmpty()) {
|
||||
combinedMappings = null;
|
||||
} else {
|
||||
combinedMappings = new CompressedXContent(mappingsMap);
|
||||
}
|
||||
Settings dummySettings = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
||||
.build();
|
||||
MappingMetadata mappingMetadata = combinedMappings == null ? null : new MappingMetadata(combinedMappings);
|
||||
final IndexMetadata imd = IndexMetadata.builder(request.index())
|
||||
.putMapping(mappingMetadata)
|
||||
.settings(dummySettings)
|
||||
.build();
|
||||
indicesService.withTempIndexService(imd, indexService -> {
|
||||
indexService.mapperService().updateMapping(null, imd);
|
||||
return IndexShard.prepareIndex(
|
||||
indexService.mapperService(),
|
||||
sourceToParse,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
-1,
|
||||
-1,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
Long.MIN_VALUE,
|
||||
false,
|
||||
request.ifSeqNo(),
|
||||
request.ifPrimaryTerm(),
|
||||
0
|
||||
);
|
||||
});
|
||||
final CompressedXContent combinedMappings = mergeMappings(new CompressedXContent(mappingsMap), mappingAddition);
|
||||
validateUpdatedMappings(null, combinedMappings, request, sourceToParse);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -329,6 +288,66 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
return mappingValidationException;
|
||||
}
|
||||
|
||||
/*
|
||||
* Validates that when updatedMappings are applied
|
||||
*/
|
||||
private void validateUpdatedMappings(
|
||||
@Nullable CompressedXContent originalMappings,
|
||||
@Nullable CompressedXContent updatedMappings,
|
||||
IndexRequest request,
|
||||
SourceToParse sourceToParse
|
||||
) throws IOException {
|
||||
if (updatedMappings == null) {
|
||||
return; // no validation to do
|
||||
}
|
||||
Settings dummySettings = Settings.builder()
|
||||
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID())
|
||||
.build();
|
||||
IndexMetadata.Builder originalIndexMetadataBuilder = IndexMetadata.builder(request.index()).settings(dummySettings);
|
||||
if (originalMappings != null) {
|
||||
originalIndexMetadataBuilder.putMapping(new MappingMetadata(originalMappings));
|
||||
}
|
||||
final IndexMetadata originalIndexMetadata = originalIndexMetadataBuilder.build();
|
||||
final IndexMetadata updatedIndexMetadata = IndexMetadata.builder(request.index())
|
||||
.settings(dummySettings)
|
||||
.putMapping(new MappingMetadata(updatedMappings))
|
||||
.build();
|
||||
indicesService.withTempIndexService(originalIndexMetadata, indexService -> {
|
||||
indexService.mapperService().merge(updatedIndexMetadata, MapperService.MergeReason.MAPPING_UPDATE);
|
||||
return IndexShard.prepareIndex(
|
||||
indexService.mapperService(),
|
||||
sourceToParse,
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
-1,
|
||||
-1,
|
||||
VersionType.INTERNAL,
|
||||
Engine.Operation.Origin.PRIMARY,
|
||||
Long.MIN_VALUE,
|
||||
false,
|
||||
request.ifSeqNo(),
|
||||
request.ifPrimaryTerm(),
|
||||
0
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private static CompressedXContent mergeMappings(@Nullable CompressedXContent originalMapping, Map<String, Object> mappingAddition)
|
||||
throws IOException {
|
||||
Map<String, Object> combinedMappingMap = new HashMap<>();
|
||||
if (originalMapping != null) {
|
||||
combinedMappingMap.putAll(XContentHelper.convertToMap(originalMapping.uncompressed(), true, XContentType.JSON).v2());
|
||||
}
|
||||
XContentHelper.update(combinedMappingMap, mappingAddition, true);
|
||||
if (combinedMappingMap.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
return convertMappingMapToXContent(combinedMappingMap);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This overrides TransportSimulateBulkAction's getIngestService to allow us to provide an IngestService that handles pipeline
|
||||
* substitutions defined in the request.
|
||||
|
@ -344,4 +363,25 @@ public class TransportSimulateBulkAction extends TransportAbstractBulkAction {
|
|||
// A simulate bulk request should not change any persistent state in the system, so we never write to the failure store
|
||||
return null;
|
||||
}
|
||||
|
||||
private static CompressedXContent convertMappingMapToXContent(Map<String, Object> rawAdditionalMapping) throws IOException {
|
||||
CompressedXContent compressedXContent;
|
||||
if (rawAdditionalMapping == null || rawAdditionalMapping.isEmpty()) {
|
||||
compressedXContent = null;
|
||||
} else {
|
||||
try (var parser = XContentHelper.mapToXContentParser(XContentParserConfiguration.EMPTY, rawAdditionalMapping)) {
|
||||
compressedXContent = mappingFromXContent(parser);
|
||||
}
|
||||
}
|
||||
return compressedXContent;
|
||||
}
|
||||
|
||||
private static CompressedXContent mappingFromXContent(XContentParser parser) throws IOException {
|
||||
XContentParser.Token token = parser.nextToken();
|
||||
if (token == XContentParser.Token.START_OBJECT) {
|
||||
return new CompressedXContent(Strings.toString(XContentFactory.jsonBuilder().map(parser.mapOrdered())));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unexpected token: " + token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -74,10 +74,21 @@ public class RestSimulateIngestAction extends BaseRestHandler {
|
|||
String defaultPipeline = request.param("pipeline");
|
||||
Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
|
||||
Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
|
||||
Map<String, Map<String, Object>> pipelineSubstitutions = (Map<String, Map<String, Object>>) sourceMap.remove(
|
||||
"pipeline_substitutions"
|
||||
);
|
||||
Map<String, Map<String, Object>> componentTemplateSubstitutions = (Map<String, Map<String, Object>>) sourceMap.remove(
|
||||
"component_template_substitutions"
|
||||
);
|
||||
Map<String, Map<String, Object>> indexTemplateSubstitutions = (Map<String, Map<String, Object>>) sourceMap.remove(
|
||||
"index_template_substitutions"
|
||||
);
|
||||
Object mappingAddition = sourceMap.remove("mapping_addition");
|
||||
SimulateBulkRequest bulkRequest = new SimulateBulkRequest(
|
||||
(Map<String, Map<String, Object>>) sourceMap.remove("pipeline_substitutions"),
|
||||
(Map<String, Map<String, Object>>) sourceMap.remove("component_template_substitutions"),
|
||||
(Map<String, Map<String, Object>>) sourceMap.remove("index_template_substitutions")
|
||||
pipelineSubstitutions == null ? Map.of() : pipelineSubstitutions,
|
||||
componentTemplateSubstitutions == null ? Map.of() : componentTemplateSubstitutions,
|
||||
indexTemplateSubstitutions == null ? Map.of() : indexTemplateSubstitutions,
|
||||
mappingAddition == null ? Map.of() : Map.of("_doc", mappingAddition)
|
||||
);
|
||||
BytesReference transformedData = convertToBulkRequestXContentBytes(sourceMap);
|
||||
bulkRequest.add(
|
||||
|
|
|
@ -22,32 +22,74 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static java.util.Map.entry;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class SimulateBulkRequestTests extends ESTestCase {
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
testSerialization(getTestPipelineSubstitutions(), getTestComponentTemplateSubstitutions(), getTestIndexTemplateSubstitutions());
|
||||
testSerialization(getTestPipelineSubstitutions(), null, null);
|
||||
testSerialization(getTestPipelineSubstitutions(), getTestComponentTemplateSubstitutions(), null);
|
||||
testSerialization(getTestPipelineSubstitutions(), null, getTestIndexTemplateSubstitutions());
|
||||
testSerialization(null, getTestComponentTemplateSubstitutions(), getTestIndexTemplateSubstitutions());
|
||||
testSerialization(null, getTestComponentTemplateSubstitutions(), null);
|
||||
testSerialization(null, null, getTestIndexTemplateSubstitutions());
|
||||
testSerialization(null, null, null);
|
||||
testSerialization(Map.of(), Map.of(), Map.of());
|
||||
testSerialization(
|
||||
getMapOrEmpty(getTestPipelineSubstitutions()),
|
||||
getMapOrEmpty(getTestComponentTemplateSubstitutions()),
|
||||
getMapOrEmpty(getTestIndexTemplateSubstitutions()),
|
||||
getMapOrEmpty(getTestMappingAddition())
|
||||
);
|
||||
}
|
||||
|
||||
private <K, V> Map<K, V> getMapOrEmpty(Map<K, V> map) {
|
||||
if (randomBoolean()) {
|
||||
return map;
|
||||
} else {
|
||||
return Map.of();
|
||||
}
|
||||
}
|
||||
|
||||
public void testNullsNotAllowed() {
|
||||
assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new SimulateBulkRequest(
|
||||
null,
|
||||
getTestPipelineSubstitutions(),
|
||||
getTestComponentTemplateSubstitutions(),
|
||||
getTestMappingAddition()
|
||||
)
|
||||
);
|
||||
assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new SimulateBulkRequest(
|
||||
getTestPipelineSubstitutions(),
|
||||
null,
|
||||
getTestComponentTemplateSubstitutions(),
|
||||
getTestMappingAddition()
|
||||
)
|
||||
);
|
||||
assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new SimulateBulkRequest(getTestPipelineSubstitutions(), getTestPipelineSubstitutions(), null, getTestMappingAddition())
|
||||
);
|
||||
assertThrows(
|
||||
NullPointerException.class,
|
||||
() -> new SimulateBulkRequest(
|
||||
getTestPipelineSubstitutions(),
|
||||
getTestPipelineSubstitutions(),
|
||||
getTestComponentTemplateSubstitutions(),
|
||||
null
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private void testSerialization(
|
||||
Map<String, Map<String, Object>> pipelineSubstitutions,
|
||||
Map<String, Map<String, Object>> componentTemplateSubstitutions,
|
||||
Map<String, Map<String, Object>> indexTemplateSubstitutions
|
||||
Map<String, Map<String, Object>> indexTemplateSubstitutions,
|
||||
Map<String, Object> mappingAddition
|
||||
) throws IOException {
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(
|
||||
pipelineSubstitutions,
|
||||
componentTemplateSubstitutions,
|
||||
indexTemplateSubstitutions
|
||||
indexTemplateSubstitutions,
|
||||
mappingAddition
|
||||
);
|
||||
/*
|
||||
* Note: SimulateBulkRequest does not implement equals or hashCode, so we can't test serialization in the usual way for a
|
||||
|
@ -59,7 +101,7 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void testGetComponentTemplateSubstitutions() throws IOException {
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
assertThat(simulateBulkRequest.getComponentTemplateSubstitutions(), equalTo(Map.of()));
|
||||
String substituteComponentTemplatesString = """
|
||||
{
|
||||
|
@ -93,7 +135,7 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
XContentType.JSON
|
||||
).v2();
|
||||
Map<String, Map<String, Object>> substituteComponentTemplates = (Map<String, Map<String, Object>>) tempMap;
|
||||
simulateBulkRequest = new SimulateBulkRequest(Map.of(), substituteComponentTemplates, Map.of());
|
||||
simulateBulkRequest = new SimulateBulkRequest(Map.of(), substituteComponentTemplates, Map.of(), Map.of());
|
||||
Map<String, ComponentTemplate> componentTemplateSubstitutions = simulateBulkRequest.getComponentTemplateSubstitutions();
|
||||
assertThat(componentTemplateSubstitutions.size(), equalTo(2));
|
||||
assertThat(
|
||||
|
@ -118,7 +160,7 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testGetIndexTemplateSubstitutions() throws IOException {
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of());
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
assertThat(simulateBulkRequest.getIndexTemplateSubstitutions(), equalTo(Map.of()));
|
||||
String substituteIndexTemplatesString = """
|
||||
{
|
||||
|
@ -154,7 +196,7 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
randomBoolean(),
|
||||
XContentType.JSON
|
||||
).v2();
|
||||
simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), substituteIndexTemplates);
|
||||
simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), substituteIndexTemplates, Map.of());
|
||||
Map<String, ComposableIndexTemplate> indexTemplateSubstitutions = simulateBulkRequest.getIndexTemplateSubstitutions();
|
||||
assertThat(indexTemplateSubstitutions.size(), equalTo(2));
|
||||
assertThat(
|
||||
|
@ -179,7 +221,8 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(
|
||||
getTestPipelineSubstitutions(),
|
||||
getTestComponentTemplateSubstitutions(),
|
||||
getTestIndexTemplateSubstitutions()
|
||||
getTestIndexTemplateSubstitutions(),
|
||||
getTestMappingAddition()
|
||||
);
|
||||
simulateBulkRequest.setRefreshPolicy(randomFrom(WriteRequest.RefreshPolicy.values()));
|
||||
simulateBulkRequest.waitForActiveShards(randomIntBetween(1, 10));
|
||||
|
@ -204,7 +247,6 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
assertThat(shallowCopy.routing(), equalTo(simulateBulkRequest.routing()));
|
||||
assertThat(shallowCopy.requireAlias(), equalTo(simulateBulkRequest.requireAlias()));
|
||||
assertThat(shallowCopy.requireDataStream(), equalTo(simulateBulkRequest.requireDataStream()));
|
||||
|
||||
}
|
||||
|
||||
private static Map<String, Map<String, Object>> getTestPipelineSubstitutions() {
|
||||
|
@ -248,4 +290,22 @@ public class SimulateBulkRequestTests extends ESTestCase {
|
|||
Map.of("template", Map.of("index_patterns", List.of("foo*", "bar*"), "mappings", Map.of(), "settings", Map.of()))
|
||||
);
|
||||
}
|
||||
|
||||
private static Map<String, Object> getTestMappingAddition() {
|
||||
return Map.ofEntries(
|
||||
entry(
|
||||
"_doc",
|
||||
Map.ofEntries(
|
||||
entry("dynamic", "strict"),
|
||||
entry(
|
||||
"properties",
|
||||
Map.ofEntries(
|
||||
entry("foo", Map.ofEntries(entry("type", "keyword"))),
|
||||
entry("bar", Map.ofEntries(entry("type", "boolean")))
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
|
|||
|
||||
public void testIndexData() throws IOException {
|
||||
Task task = mock(Task.class); // unused
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(null, null, null);
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
int bulkItemCount = randomIntBetween(0, 200);
|
||||
for (int i = 0; i < bulkItemCount; i++) {
|
||||
Map<String, ?> source = Map.of(randomAlphaOfLength(10), randomAlphaOfLength(5));
|
||||
|
@ -218,7 +218,11 @@ public class TransportSimulateBulkActionTests extends ESTestCase {
|
|||
* (7) An indexing request to a nonexistent index that matches no templates
|
||||
*/
|
||||
Task task = mock(Task.class); // unused
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(null, null, null);
|
||||
/*
|
||||
* Here we only add a mapping_addition because if there is no mapping at all TransportSimulateBulkAction skips mapping validation
|
||||
* altogether, and we need it to run for this test to pass.
|
||||
*/
|
||||
BulkRequest bulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of("_doc", Map.of("dynamic", "strict")));
|
||||
int bulkItemCount = randomIntBetween(0, 200);
|
||||
Map<String, IndexMetadata> indicesMap = new HashMap<>();
|
||||
Map<String, IndexTemplateMetadata> v1Templates = new HashMap<>();
|
||||
|
|
|
@ -65,7 +65,7 @@ public class SimulateIngestServiceTests extends ESTestCase {
|
|||
ingestService.innerUpdatePipelines(ingestMetadata);
|
||||
{
|
||||
// First we make sure that if there are no substitutions that we get our original pipeline back:
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(null, null, null);
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(Map.of(), Map.of(), Map.of(), Map.of());
|
||||
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
|
||||
Pipeline pipeline = simulateIngestService.getPipeline("pipeline1");
|
||||
assertThat(pipeline.getProcessors(), contains(transformedMatch(Processor::getType, equalTo("processor1"))));
|
||||
|
@ -83,7 +83,7 @@ public class SimulateIngestServiceTests extends ESTestCase {
|
|||
);
|
||||
pipelineSubstitutions.put("pipeline2", newHashMap("processors", List.of(newHashMap("processor3", Collections.emptyMap()))));
|
||||
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, null, null);
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, Map.of(), Map.of(), Map.of());
|
||||
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
|
||||
Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1");
|
||||
assertThat(
|
||||
|
@ -103,7 +103,7 @@ public class SimulateIngestServiceTests extends ESTestCase {
|
|||
*/
|
||||
Map<String, Map<String, Object>> pipelineSubstitutions = new HashMap<>();
|
||||
pipelineSubstitutions.put("pipeline2", newHashMap("processors", List.of(newHashMap("processor3", Collections.emptyMap()))));
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, null, null);
|
||||
SimulateBulkRequest simulateBulkRequest = new SimulateBulkRequest(pipelineSubstitutions, Map.of(), Map.of(), Map.of());
|
||||
SimulateIngestService simulateIngestService = new SimulateIngestService(ingestService, simulateBulkRequest);
|
||||
Pipeline pipeline1 = simulateIngestService.getPipeline("pipeline1");
|
||||
assertThat(pipeline1.getProcessors(), contains(transformedMatch(Processor::getType, equalTo("processor1"))));
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue