From 21bb836ce57998c4c4d08f6d0425e66450f5fdcb Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Fri, 27 Jun 2025 10:29:06 -0500 Subject: [PATCH] Adding actions to get and update data stream mappings (#130042) --- modules/data-streams/build.gradle | 4 + ...sportUpdateDataStreamMappingsActionIT.java | 241 ++++++++++++++++++ .../datastreams/DataStreamsPlugin.java | 9 + .../TransportGetDataStreamMappingsAction.java | 90 +++++++ ...ansportUpdateDataStreamMappingsAction.java | 197 ++++++++++++++ .../datastreams/GetDataStreamAction.java | 12 +- .../GetDataStreamMappingsAction.java | 140 ++++++++++ .../UpdateDataStreamMappingsAction.java | 224 ++++++++++++++++ .../cluster/metadata/DataStream.java | 4 +- .../metadata/MetadataDataStreamsService.java | 110 +++++++- .../datastreams/GetDataStreamActionTests.java | 6 + .../GetDataStreamMappingsActionTests.java | 119 +++++++++ ...eDataStreamMappingsActionRequestTests.java | 78 ++++++ ...DataStreamMappingsActionResponseTests.java | 169 ++++++++++++ .../cluster/metadata/DataStreamTests.java | 4 +- .../xpack/security/operator/Constants.java | 2 + 16 files changed, 1404 insertions(+), 5 deletions(-) create mode 100644 modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TransportUpdateDataStreamMappingsActionIT.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java create mode 100644 modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java create mode 100644 server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java create mode 100644 server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java diff --git a/modules/data-streams/build.gradle b/modules/data-streams/build.gradle index fcd435e29a30..b5fcae8115cd 100644 --- a/modules/data-streams/build.gradle +++ b/modules/data-streams/build.gradle @@ -30,6 +30,10 @@ tasks.withType(StandaloneRestIntegTestTask).configureEach { usesDefaultDistribution("to be triaged") } +tasks.named("internalClusterTest").configure { + systemProperty 'es.logs_stream_feature_flag_enabled', 'true' +} + if (buildParams.inFipsJvm){ // These fail in CI but only when run as part of checkPart2 and not individually. // Tracked in : diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TransportUpdateDataStreamMappingsActionIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TransportUpdateDataStreamMappingsActionIT.java new file mode 100644 index 000000000000..97dce981cfbf --- /dev/null +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TransportUpdateDataStreamMappingsActionIT.java @@ -0,0 +1,241 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsAction; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; +import org.elasticsearch.action.admin.indices.template.put.TransportPutComposableIndexTemplateAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.MappingMetadata; +import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class TransportUpdateDataStreamMappingsActionIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return List.of(DataStreamsPlugin.class); + } + + public void testGetAndUpdateMappings() throws IOException { + String dataStreamName = "my-data-stream-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createDataStream(dataStreamName); + + Map originalMappings = Map.of( + "dynamic", + "strict", + "properties", + Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "text")) + ); + Map mappingOverrides = Map.of( + "properties", + Map.of("foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text")) + ); + Map expectedEffectiveMappings = Map.of( + "dynamic", + "strict", + "properties", + Map.of("foo1", Map.of("type", "text"), "foo2", Map.of("type", "keyword"), "foo3", Map.of("type", "text")) + ); + assertExpectedMappings(dataStreamName, Map.of(), originalMappings); + updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, true); + assertExpectedMappings(dataStreamName, Map.of(), originalMappings); + updateMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings, false); + assertExpectedMappings(dataStreamName, mappingOverrides, expectedEffectiveMappings); + + // Now make sure that the backing index still has the original mappings: + Map originalIndexMappings = Map.of( + "dynamic", + "strict", + "_data_stream_timestamp", + Map.of("enabled", true), + "properties", + Map.of("@timestamp", Map.of("type", "date"), "foo1", Map.of("type", "text"), "foo2", Map.of("type", "text")) + ); + assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().getFirst().getName(), originalIndexMappings); + + // Do a rollover, and then make sure that the updated mappnigs are on the new write index: + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + Map updatedIndexMappings = Map.of( + "dynamic", + "strict", + "_data_stream_timestamp", + Map.of("enabled", true), + "properties", + Map.of( + "@timestamp", + Map.of("type", "date"), + "foo1", + Map.of("type", "text"), + "foo2", + Map.of("type", "keyword"), + "foo3", + Map.of("type", "text") + ) + ); + assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(1).getName(), updatedIndexMappings); + + // Now undo the mapping overrides, and expect the original mapping to be in effect: + updateMappings(dataStreamName, Map.of(), originalMappings, false); + assertExpectedMappings(dataStreamName, Map.of(), originalMappings); + assertAcked(indicesAdmin().rolloverIndex(new RolloverRequest(dataStreamName, null)).actionGet()); + assertExpectedMappingsOnIndex(getDataStream(dataStreamName).getIndices().get(2).getName(), originalIndexMappings); + } + + private void createDataStream(String dataStreamName) throws IOException { + String mappingString = """ + { + "_doc":{ + "dynamic":"strict", + "properties":{ + "foo1":{ + "type":"text" + }, + "foo2":{ + "type":"text" + } + } + } + } + """; + CompressedXContent mapping = CompressedXContent.fromJSON(mappingString); + Template template = new Template(Settings.EMPTY, mapping, null); + ComposableIndexTemplate.DataStreamTemplate dataStreamTemplate = new ComposableIndexTemplate.DataStreamTemplate(); + ComposableIndexTemplate composableIndexTemplate = ComposableIndexTemplate.builder() + .indexPatterns(List.of("my-data-stream-*")) + .dataStreamTemplate(dataStreamTemplate) + .template(template) + .build(); + TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request("test"); + request.indexTemplate(composableIndexTemplate); + + client().execute(TransportPutComposableIndexTemplateAction.TYPE, request).actionGet(); + + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(dataStreamName).source(""" + { + "@timestamp": "2024-08-27", + "foo1": "baz" + } + """, XContentType.JSON).id(randomUUID())); + bulkRequest.add(new IndexRequest(dataStreamName).source(""" + { + "@timestamp": "2024-08-27", + "foo3": "baz" + } + """, XContentType.JSON).id(randomUUID())); + BulkResponse response = client().execute(new ActionType(TransportBulkAction.NAME), bulkRequest).actionGet(); + assertThat(response.getItems().length, equalTo(2)); + } + + private void assertExpectedMappings( + String dataStreamName, + Map expectedMappingOverrides, + Map expectedEffectiveMappings + ) { + GetDataStreamMappingsAction.Response getMappingsResponse = client().execute( + new ActionType(GetDataStreamMappingsAction.NAME), + new GetDataStreamMappingsAction.Request(TimeValue.THIRTY_SECONDS).indices(dataStreamName) + ).actionGet(); + List responses = getMappingsResponse.getDataStreamMappingsResponses(); + assertThat(responses.size(), equalTo(1)); + GetDataStreamMappingsAction.DataStreamMappingsResponse mappingsResponse = responses.getFirst(); + assertThat(mappingsResponse.dataStreamName(), equalTo(dataStreamName)); + assertThat( + XContentHelper.convertToMap(mappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(), + equalTo(expectedMappingOverrides) + ); + assertThat( + XContentHelper.convertToMap(mappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(), + equalTo(expectedEffectiveMappings) + ); + + DataStream dataStream = getDataStream(dataStreamName); + assertThat( + XContentHelper.convertToMap(dataStream.getMappings().uncompressed(), true, XContentType.JSON).v2(), + equalTo(expectedMappingOverrides) + ); + } + + private void assertExpectedMappingsOnIndex(String indexName, Map expectedMappings) { + GetMappingsResponse mappingsResponse = client().execute( + new ActionType(GetMappingsAction.NAME), + new GetMappingsRequest(TimeValue.THIRTY_SECONDS).indices(indexName) + ).actionGet(); + Map mappings = mappingsResponse.mappings(); + assertThat(mappings.size(), equalTo(1)); + assertThat(mappings.values().iterator().next().sourceAsMap(), equalTo(expectedMappings)); + } + + private void updateMappings( + String dataStreamName, + Map mappingOverrides, + Map expectedEffectiveMappings, + boolean dryRun + ) throws IOException { + CompressedXContent mappingOverride = new CompressedXContent(mappingOverrides); + UpdateDataStreamMappingsAction.Response putMappingsResponse = client().execute( + new ActionType(UpdateDataStreamMappingsAction.NAME), + new UpdateDataStreamMappingsAction.Request(mappingOverride, dryRun, TimeValue.THIRTY_SECONDS, TimeValue.THIRTY_SECONDS).indices( + dataStreamName + ) + ).actionGet(); + assertThat(putMappingsResponse.getDataStreamMappingsResponses().size(), equalTo(1)); + UpdateDataStreamMappingsAction.DataStreamMappingsResponse firstPutMappingsResponse = putMappingsResponse + .getDataStreamMappingsResponses() + .getFirst(); + assertThat(firstPutMappingsResponse.dataStreamName(), equalTo(dataStreamName)); + assertThat( + XContentHelper.convertToMap(firstPutMappingsResponse.mappings().uncompressed(), true, XContentType.JSON).v2(), + equalTo(mappingOverrides) + ); + assertThat( + XContentHelper.convertToMap(firstPutMappingsResponse.effectiveMappings().uncompressed(), true, XContentType.JSON).v2(), + equalTo(expectedEffectiveMappings) + ); + } + + private DataStream getDataStream(String dataStreamName) { + GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request( + TEST_REQUEST_TIMEOUT, + new String[] { dataStreamName } + ); + GetDataStreamAction.Response getDataStreamResponse = client().execute(GetDataStreamAction.INSTANCE, getDataStreamRequest) + .actionGet(); + assertThat(getDataStreamResponse.getDataStreams().size(), equalTo(1)); + return getDataStreamResponse.getDataStreams().get(0).getDataStream(); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index eb33e2a3f158..8bbf76cd005a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -14,16 +14,19 @@ import org.elasticsearch.action.datastreams.CreateDataStreamAction; import org.elasticsearch.action.datastreams.DataStreamsStatsAction; import org.elasticsearch.action.datastreams.DeleteDataStreamAction; import org.elasticsearch.action.datastreams.GetDataStreamAction; +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; import org.elasticsearch.action.datastreams.GetDataStreamSettingsAction; import org.elasticsearch.action.datastreams.MigrateToDataStreamAction; import org.elasticsearch.action.datastreams.ModifyDataStreamsAction; import org.elasticsearch.action.datastreams.PromoteDataStreamAction; import org.elasticsearch.action.datastreams.PutDataStreamOptionsAction; +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; import org.elasticsearch.action.datastreams.UpdateDataStreamSettingsAction; import org.elasticsearch.action.datastreams.lifecycle.ExplainDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.GetDataStreamLifecycleAction; import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction; import org.elasticsearch.client.internal.OriginSettingClient; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -37,11 +40,13 @@ import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.action.TransportCreateDataStreamAction; import org.elasticsearch.datastreams.action.TransportDataStreamsStatsAction; import org.elasticsearch.datastreams.action.TransportDeleteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportGetDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamSettingsAction; import org.elasticsearch.datastreams.action.TransportGetDataStreamsAction; import org.elasticsearch.datastreams.action.TransportMigrateToDataStreamAction; import org.elasticsearch.datastreams.action.TransportModifyDataStreamsAction; import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction; +import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -246,6 +251,10 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlu actions.add(new ActionHandler(DeleteDataStreamOptionsAction.INSTANCE, TransportDeleteDataStreamOptionsAction.class)); actions.add(new ActionHandler(GetDataStreamSettingsAction.INSTANCE, TransportGetDataStreamSettingsAction.class)); actions.add(new ActionHandler(UpdateDataStreamSettingsAction.INSTANCE, TransportUpdateDataStreamSettingsAction.class)); + if (DataStream.LOGS_STREAM_FEATURE_FLAG) { + actions.add(new ActionHandler(GetDataStreamMappingsAction.INSTANCE, TransportGetDataStreamMappingsAction.class)); + actions.add(new ActionHandler(UpdateDataStreamMappingsAction.INSTANCE, TransportUpdateDataStreamMappingsAction.class)); + } return actions; } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java new file mode 100644 index 000000000000..c6cf18cdcd75 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportGetDataStreamMappingsAction.java @@ -0,0 +1,90 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.get.GetSettingsAction; +import org.elasticsearch.action.datastreams.GetDataStreamMappingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.local.TransportLocalProjectMetadataAction; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TransportGetDataStreamMappingsAction extends TransportLocalProjectMetadataAction< + GetDataStreamMappingsAction.Request, + GetDataStreamMappingsAction.Response> { + private final IndexNameExpressionResolver indexNameExpressionResolver; + + @Inject + public TransportGetDataStreamMappingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + GetSettingsAction.NAME, + actionFilters, + transportService.getTaskManager(), + clusterService, + threadPool.executor(ThreadPool.Names.MANAGEMENT), + projectResolver + ); + this.indexNameExpressionResolver = indexNameExpressionResolver; + } + + @Override + protected ClusterBlockException checkBlock(GetDataStreamMappingsAction.Request request, ProjectState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected void localClusterStateOperation( + Task task, + GetDataStreamMappingsAction.Request request, + ProjectState project, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + project.metadata(), + IndicesOptions.DEFAULT, + request.indices() + ); + Map dataStreamMap = project.metadata().dataStreams(); + List responseList = new ArrayList<>(dataStreamNames.size()); + for (String dataStreamName : dataStreamNames) { + DataStream dataStream = dataStreamMap.get(dataStreamName); + responseList.add( + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + dataStream.getMappings(), + dataStream.getEffectiveMappings(project.metadata()) + ) + ); + } + listener.onResponse(new GetDataStreamMappingsAction.Response(responseList)); + } +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java new file mode 100644 index 000000000000..0fc176201ff2 --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/TransportUpdateDataStreamMappingsAction.java @@ -0,0 +1,197 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.datastreams.action; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.datastreams.UpdateDataStreamMappingsAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CountDownActionListener; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetadataDataStreamsService; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.mapper.Mapping; +import org.elasticsearch.indices.SystemIndices; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TransportUpdateDataStreamMappingsAction extends TransportMasterNodeAction< + UpdateDataStreamMappingsAction.Request, + UpdateDataStreamMappingsAction.Response> { + private static final Logger logger = LogManager.getLogger(TransportUpdateDataStreamMappingsAction.class); + private final MetadataDataStreamsService metadataDataStreamsService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final SystemIndices systemIndices; + private final ProjectResolver projectResolver; + + @Inject + public TransportUpdateDataStreamMappingsAction( + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + ProjectResolver projectResolver, + MetadataDataStreamsService metadataDataStreamsService, + IndexNameExpressionResolver indexNameExpressionResolver, + SystemIndices systemIndices + ) { + super( + UpdateDataStreamMappingsAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + UpdateDataStreamMappingsAction.Request::new, + UpdateDataStreamMappingsAction.Response::new, + EsExecutors.DIRECT_EXECUTOR_SERVICE + ); + this.projectResolver = projectResolver; + this.metadataDataStreamsService = metadataDataStreamsService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.systemIndices = systemIndices; + } + + @Override + protected void masterOperation( + Task task, + UpdateDataStreamMappingsAction.Request request, + ClusterState state, + ActionListener listener + ) throws Exception { + List dataStreamNames = indexNameExpressionResolver.dataStreamNames( + state.projectState(projectResolver.getProjectId()).metadata(), + IndicesOptions.DEFAULT, + request.indices() + ); + List dataStreamMappingsResponse = new ArrayList<>(); + CountDownActionListener countDownListener = new CountDownActionListener( + dataStreamNames.size() + 1, + listener.delegateFailure( + (responseActionListener, unused) -> responseActionListener.onResponse( + new UpdateDataStreamMappingsAction.Response(dataStreamMappingsResponse) + ) + ) + ); + countDownListener.onResponse(null); + for (String dataStreamName : dataStreamNames) { + updateSingleDataStream( + dataStreamName, + request.getMappings(), + request.masterNodeTimeout(), + request.ackTimeout(), + request.isDryRun(), + ActionListener.wrap(dataStreamResponse -> { + dataStreamMappingsResponse.add(dataStreamResponse); + countDownListener.onResponse(null); + }, e -> { + dataStreamMappingsResponse.add( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + Strings.hasText(e.getMessage()) ? e.getMessage() : e.toString(), + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + countDownListener.onResponse(null); + }) + ); + } + } + + private void updateSingleDataStream( + String dataStreamName, + CompressedXContent mappingsOverrides, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + boolean dryRun, + ActionListener listener + ) { + logger.debug("updating mappings for {}", dataStreamName); + if (systemIndices.isSystemDataStream(dataStreamName)) { + listener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + "Cannot update a system data stream", + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + return; + } + metadataDataStreamsService.updateMappings( + projectResolver.getProjectId(), + masterNodeTimeout, + ackTimeout, + dataStreamName, + mappingsOverrides, + dryRun, + listener.delegateFailure((dataStreamMappingsResponseActionListener, dataStream) -> { + if (dataStream != null) { + try { + dataStreamMappingsResponseActionListener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + true, + null, + mappingsOverrides, + dataStream.getEffectiveMappings( + clusterService.state().projectState(projectResolver.getProjectId()).metadata() + ) + ) + ); + } catch (IOException e) { + dataStreamMappingsResponseActionListener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + e.getMessage(), + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + } + } else { + dataStreamMappingsResponseActionListener.onResponse( + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + dataStreamName, + false, + "Updating mappings not accepted for unknown reasons", + Mapping.EMPTY.toCompressedXContent(), + Mapping.EMPTY.toCompressedXContent() + ) + ); + } + }) + ); + } + + @Override + protected ClusterBlockException checkBlock(UpdateDataStreamMappingsAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java index 598dbd8c32b2..b993ea31a89d 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamAction.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -214,6 +215,7 @@ public class GetDataStreamAction extends ActionType uncompressedMappings = XContentHelper.convertToMap( + dataStream.getMappings().uncompressed(), + true, + builder.contentType() + ).v2(); + builder.map(uncompressedMappings); + } builder.startObject(DataStream.FAILURE_STORE_FIELD.getPreferredName()); builder.field(FAILURE_STORE_ENABLED.getPreferredName(), failureStoreEffectivelyEnabled); builder.field(DataStream.ROLLOVER_ON_WRITE_FIELD.getPreferredName(), dataStream.getFailureComponent().isRolloverOnWrite()); diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java new file mode 100644 index 000000000000..c5dc30e75c23 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsAction.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.local.LocalClusterStateRequest; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +public class GetDataStreamMappingsAction extends ActionType { + public static final String NAME = "indices:monitor/data_stream/mappings/get"; + public static final GetDataStreamMappingsAction INSTANCE = new GetDataStreamMappingsAction(); + + public GetDataStreamMappingsAction() { + super(NAME); + } + + public static class Request extends LocalClusterStateRequest implements IndicesRequest.Replaceable { + private String[] dataStreamNames; + + public Request(TimeValue masterNodeTimeout) { + super(masterNodeTimeout); + } + + @Override + public GetDataStreamMappingsAction.Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + GetDataStreamMappingsAction.Request request = (GetDataStreamMappingsAction.Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames); + } + + @Override + public int hashCode() { + return Arrays.hashCode(dataStreamNames); + } + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List dataStreamMappingsResponses; + + public Response(List DataStreamMappingsResponses) { + this.dataStreamMappingsResponses = DataStreamMappingsResponses; + } + + public List getDataStreamMappingsResponses() { + return dataStreamMappingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + TransportAction.localOnly(); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + dataStreamMappingsResponses.stream().map(dataStreamMappingsResponse -> (ToXContent) dataStreamMappingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + } + + public record DataStreamMappingsResponse(String dataStreamName, CompressedXContent mappings, CompressedXContent effectiveMappings) + implements + ToXContent { + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + Map uncompressedMappings = XContentHelper.convertToMap(mappings.uncompressed(), true, XContentType.JSON).v2(); + builder.field("mappings"); + builder.map(uncompressedMappings); + Map uncompressedEffectiveMappings = XContentHelper.convertToMap( + effectiveMappings.uncompressed(), + true, + builder.contentType() + ).v2(); + builder.field("effective_mappings"); + builder.map(uncompressedEffectiveMappings); + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java new file mode 100644 index 000000000000..561e59f3eed3 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsAction.java @@ -0,0 +1,224 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ChunkedToXContentObject; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class UpdateDataStreamMappingsAction extends ActionType { + + public static final String NAME = "indices:admin/data_stream/mappings/update"; + public static final UpdateDataStreamMappingsAction INSTANCE = new UpdateDataStreamMappingsAction(); + + public UpdateDataStreamMappingsAction() { + super(NAME); + } + + public static class Request extends AcknowledgedRequest implements IndicesRequest.Replaceable { + private final CompressedXContent mappings; + private final boolean dryRun; + private String[] dataStreamNames = Strings.EMPTY_ARRAY; + + public Request(CompressedXContent mappings, boolean dryRun, TimeValue masterNodeTimeout, TimeValue ackTimeout) { + super(masterNodeTimeout, ackTimeout); + this.mappings = mappings; + this.dryRun = dryRun; + } + + @Override + public Request indices(String... dataStreamNames) { + this.dataStreamNames = dataStreamNames; + return this; + } + + public CompressedXContent getMappings() { + return mappings; + } + + @Override + public boolean includeDataStreams() { + return true; + } + + public boolean isDryRun() { + return dryRun; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.dataStreamNames = in.readStringArray(); + this.mappings = CompressedXContent.readCompressedString(in); + this.dryRun = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(dataStreamNames); + mappings.writeTo(out); + out.writeBoolean(dryRun); + } + + @Override + public String[] indices() { + return dataStreamNames; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED; + } + + @Override + public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { + return new CancellableTask(id, type, action, "", parentTaskId, headers); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(dataStreamNames, request.dataStreamNames) + && mappings.equals(request.mappings) + && dryRun == request.dryRun + && Objects.equals(masterNodeTimeout(), request.masterNodeTimeout()) + && Objects.equals(ackTimeout(), request.ackTimeout()); + } + + @Override + public int hashCode() { + return Objects.hash(Arrays.hashCode(dataStreamNames), mappings, dryRun, masterNodeTimeout(), ackTimeout()); + } + + } + + public static class Response extends ActionResponse implements ChunkedToXContentObject { + private final List dataStreamMappingsResponses; + + public Response(List dataStreamMappingsResponses) { + this.dataStreamMappingsResponses = dataStreamMappingsResponses; + } + + public Response(StreamInput in) throws IOException { + this(in.readCollectionAsList(DataStreamMappingsResponse::new)); + } + + public List getDataStreamMappingsResponses() { + return dataStreamMappingsResponses; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeCollection(dataStreamMappingsResponses, (out1, value) -> value.writeTo(out1)); + } + + @Override + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, params1) -> builder.startObject().startArray("data_streams")), + dataStreamMappingsResponses.stream().map(dataStreamMappingsResponse -> (ToXContent) dataStreamMappingsResponse).iterator(), + Iterators.single((builder, params1) -> builder.endArray().endObject()) + ); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(dataStreamMappingsResponses, response.dataStreamMappingsResponses); + } + + @Override + public int hashCode() { + return Objects.hash(dataStreamMappingsResponses); + } + } + + public record DataStreamMappingsResponse( + String dataStreamName, + boolean dataStreamSucceeded, + String dataStreamErrorMessage, + CompressedXContent mappings, + CompressedXContent effectiveMappings + ) implements ToXContent, Writeable { + + public DataStreamMappingsResponse(StreamInput in) throws IOException { + this( + in.readString(), + in.readBoolean(), + in.readOptionalString(), + CompressedXContent.readCompressedString(in), + CompressedXContent.readCompressedString(in) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(dataStreamName); + out.writeBoolean(dataStreamSucceeded); + out.writeOptionalString(dataStreamErrorMessage); + mappings.writeTo(out); + effectiveMappings.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("name", dataStreamName); + builder.field("applied_to_data_stream", dataStreamSucceeded); + if (dataStreamErrorMessage != null) { + builder.field("error", dataStreamErrorMessage); + } + Map uncompressedMappings = XContentHelper.convertToMap(mappings.uncompressed(), true, XContentType.JSON).v2(); + if (uncompressedMappings.isEmpty() == false) { + builder.field("mappings"); + builder.map(uncompressedMappings); + } + Map uncompressedEffectiveMappings = XContentHelper.convertToMap( + effectiveMappings.uncompressed(), + true, + XContentType.JSON + ).v2(); + if (uncompressedEffectiveMappings.isEmpty() == false) { + builder.field("effective_mappings"); + builder.map(uncompressedEffectiveMappings); + } + builder.endObject(); + return builder; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 69e15a2b9888..e7adf1446ba4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -126,7 +126,7 @@ public final class DataStream implements SimpleDiffable, ToXContentO @Nullable private final Map metadata; private final Settings settings; - private final CompressedXContent mappings; + private final CompressedXContent mappings; // always stored with json content type private final boolean hidden; private final boolean replicated; private final boolean system; @@ -1591,6 +1591,7 @@ public final class DataStream implements SimpleDiffable, ToXContentO && generation == that.generation && Objects.equals(metadata, that.metadata) && Objects.equals(settings, that.settings) + && Objects.equals(mappings, that.mappings) && hidden == that.hidden && system == that.system && replicated == that.replicated @@ -1609,6 +1610,7 @@ public final class DataStream implements SimpleDiffable, ToXContentO generation, metadata, settings, + mappings, hidden, system, replicated, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java index 60a3f4c4469b..e90a5080515b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataDataStreamsService.java @@ -25,6 +25,7 @@ import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; @@ -60,6 +61,7 @@ public class MetadataDataStreamsService { private final MasterServiceTaskQueue setRolloverOnWriteTaskQueue; private final MasterServiceTaskQueue updateOptionsTaskQueue; private final MasterServiceTaskQueue updateSettingsTaskQueue; + private final MasterServiceTaskQueue updateMappingsTaskQueue; public MetadataDataStreamsService( ClusterService clusterService, @@ -164,6 +166,32 @@ public class MetadataDataStreamsService { Priority.NORMAL, updateSettingsExecutor ); + ClusterStateTaskExecutor updateMappingsExecutor = new SimpleBatchedAckListenerTaskExecutor<>() { + + @Override + public Tuple executeTask( + UpdateMappingsTask updateMappingsTask, + ClusterState clusterState + ) throws Exception { + DataStream dataStream = createDataStreamForUpdatedDataStreamMappings( + updateMappingsTask.projectId, + updateMappingsTask.dataStreamName, + updateMappingsTask.mappingsOverrides, + clusterState + ); + ProjectMetadata projectMetadata = clusterState.metadata().getProject(updateMappingsTask.projectId); + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(projectMetadata); + projectMetadataBuilder.removeDataStream(updateMappingsTask.dataStreamName); + projectMetadataBuilder.put(dataStream); + ClusterState updatedClusterState = ClusterState.builder(clusterState).putProjectMetadata(projectMetadataBuilder).build(); + return new Tuple<>(updatedClusterState, updateMappingsTask); + } + }; + this.updateMappingsTaskQueue = clusterService.createTaskQueue( + "update-data-stream-mappings", + Priority.NORMAL, + updateMappingsExecutor + ); } public void modifyDataStream( @@ -471,7 +499,7 @@ public class MetadataDataStreamsService { ComposableIndexTemplate mergedTemplate = template.mergeSettings(mergedSettings); MetadataIndexTemplateService.validateTemplate( mergedTemplate.template().settings(), - mergedTemplate.template().mappings(), + dataStream.getEffectiveMappings(projectMetadata), indicesService ); @@ -479,6 +507,60 @@ public class MetadataDataStreamsService { return dataStream.copy().setSettings(mergedSettings).build(); } + private DataStream createDataStreamForUpdatedDataStreamMappings( + ProjectId projectId, + String dataStreamName, + CompressedXContent mappingsOverrides, + ClusterState clusterState + ) throws Exception { + ProjectMetadata projectMetadata = clusterState.metadata().getProject(projectId); + Map dataStreamMap = projectMetadata.dataStreams(); + DataStream dataStream = dataStreamMap.get(dataStreamName); + + final ComposableIndexTemplate template = lookupTemplateForDataStream(dataStreamName, projectMetadata); + ComposableIndexTemplate mergedTemplate = template.mergeMappings(mappingsOverrides); + MetadataIndexTemplateService.validateTemplate( + dataStream.getEffectiveSettings(projectMetadata), + mergedTemplate.template().mappings(), + indicesService + ); + return dataStream.copy().setMappings(mappingsOverrides).build(); + } + + public void updateMappings( + ProjectId projectId, + TimeValue masterNodeTimeout, + TimeValue ackTimeout, + String dataStreamName, + CompressedXContent mappingsOverrides, + boolean dryRun, + ActionListener listener + ) { + if (dryRun) { + /* + * If this is a dry run, we'll do the settings validation and apply the changes to the data stream locally, but we won't run + * the task that actually updates the cluster state. + */ + try { + DataStream updatedDataStream = createDataStreamForUpdatedDataStreamMappings( + projectId, + dataStreamName, + mappingsOverrides, + clusterService.state() + ); + listener.onResponse(updatedDataStream); + } catch (Exception e) { + listener.onFailure(e); + } + } else { + updateMappingsTaskQueue.submitTask( + "updating mappings on data stream", + new UpdateMappingsTask(projectId, dataStreamName, mappingsOverrides, clusterService, ackTimeout, listener), + masterNodeTimeout + ); + } + } + private static void addBackingIndex( ProjectMetadata project, ProjectMetadata.Builder builder, @@ -749,4 +831,30 @@ public class MetadataDataStreamsService { this.settingsOverrides = settingsOverrides; } } + + static class UpdateMappingsTask extends AckedBatchedClusterStateUpdateTask { + final ProjectId projectId; + private final String dataStreamName; + private final CompressedXContent mappingsOverrides; + + UpdateMappingsTask( + ProjectId projectId, + String dataStreamName, + CompressedXContent mappingsOverrides, + ClusterService clusterService, + TimeValue ackTimeout, + ActionListener listener + ) { + super(ackTimeout, listener.safeMap(response -> { + if (response.isAcknowledged()) { + return clusterService.state().projectState(projectId).metadata().dataStreams().get(dataStreamName); + } else { + throw new ElasticsearchException("Updating mappings not accepted for unknown reasons"); + } + })); + this.projectId = projectId; + this.dataStreamName = dataStreamName; + this.mappingsOverrides = mappingsOverrides; + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java index 9aa1a2f2a2c0..c90669650a94 100644 --- a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamActionTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetention; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.core.TimeValue; @@ -28,6 +29,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomSettings; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +66,8 @@ public class GetDataStreamActionTests extends ESTestCase { assertThat(lifecycleResult.get("retention_determined_by"), equalTo("max_global_retention")); Map> settingsMap = (Map>) resultMap.get("settings"); assertThat(Settings.builder().loadFromMap(settingsMap).build(), equalTo(dataStreamInfo.getDataStream().getSettings())); + Map mappingsMap = (Map) resultMap.get("mappings"); + assertThat(new CompressedXContent(mappingsMap), equalTo(dataStreamInfo.getDataStream().getMappings())); } } @@ -105,6 +109,7 @@ public class GetDataStreamActionTests extends ESTestCase { List indices = List.of(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10))); DataStreamLifecycle lifecycle = DataStreamLifecycle.createDataLifecycle(true, retention, null); Settings settings = randomSettings(); + CompressedXContent mappings = randomMappings(); return DataStream.builder(randomAlphaOfLength(50), indices) .setGeneration(randomLongBetween(1, 1000)) .setMetadata(Map.of()) @@ -113,6 +118,7 @@ public class GetDataStreamActionTests extends ESTestCase { .setReplicated(randomBoolean()) .setLifecycle(lifecycle) .setSettings(settings) + .setMappings(mappings) .build(); } } diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java new file mode 100644 index 000000000000..1a580ddf1a08 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/GetDataStreamMappingsActionTests.java @@ -0,0 +1,119 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class GetDataStreamMappingsActionTests extends ESTestCase { + + public void testResponseToXContentEmpty() throws IOException { + List responseList = new ArrayList<>(); + GetDataStreamMappingsAction.Response response = new GetDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat(xContentMap, equalTo(Map.of("data_streams", List.of()))); + } + } + + public void testResponseToXContent() throws IOException { + Map dataStream1Mappings = Map.of( + "properties", + Map.of("field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")) + ); + Map dataStream1EffectiveMappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "text"), "field3", Map.of("type", "keyword")) + ); + Map dataStream2Mappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "text"), "field5", Map.of("type", "keyword")) + ); + Map dataStream2EffectiveMappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "text"), "field5", Map.of("type", "keyword"), "field6", Map.of("type", "keyword")) + ); + GetDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse1 = + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + new CompressedXContent(dataStream1Mappings), + new CompressedXContent(dataStream1EffectiveMappings) + ); + GetDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse2 = + new GetDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream2", + new CompressedXContent(dataStream2Mappings), + new CompressedXContent(dataStream2EffectiveMappings) + ); + List responseList = List.of( + DataStreamMappingsResponse1, + DataStreamMappingsResponse2 + ); + GetDataStreamMappingsAction.Response response = new GetDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + Map.of( + "name", + "dataStream1", + "mappings", + dataStream1Mappings, + "effective_mappings", + dataStream1EffectiveMappings + ), + Map.of( + "name", + "dataStream2", + "mappings", + dataStream2Mappings, + "effective_mappings", + dataStream2EffectiveMappings + ) + ) + ) + ) + ); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java new file mode 100644 index 000000000000..83305174fc50 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionRequestTests.java @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.cluster.metadata.ComponentTemplateTests; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.Arrays; +import java.util.function.Supplier; + +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; + +public class UpdateDataStreamMappingsActionRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamMappingsAction.Request::new; + } + + @Override + protected UpdateDataStreamMappingsAction.Request createTestInstance() { + UpdateDataStreamMappingsAction.Request request = new UpdateDataStreamMappingsAction.Request( + randomMappings(), + randomBoolean(), + randomTimeValue(), + randomTimeValue() + ); + request.indices(randomIndices()); + return request; + } + + @Override + protected UpdateDataStreamMappingsAction.Request mutateInstance(UpdateDataStreamMappingsAction.Request instance) throws IOException { + String[] indices = instance.indices(); + CompressedXContent mappings = instance.getMappings(); + boolean dryRun = instance.isDryRun(); + TimeValue masterNodeTimeout = instance.masterNodeTimeout(); + TimeValue ackTimeout = instance.ackTimeout(); + switch (between(0, 4)) { + case 0 -> { + indices = randomArrayValueOtherThan(indices, this::randomIndices); + } + case 1 -> { + mappings = randomValueOtherThan(mappings, ComponentTemplateTests::randomMappings); + } + case 2 -> { + dryRun = dryRun == false; + } + case 3 -> { + masterNodeTimeout = randomValueOtherThan(masterNodeTimeout, ESTestCase::randomTimeValue); + } + case 4 -> { + ackTimeout = randomValueOtherThan(ackTimeout, ESTestCase::randomTimeValue); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamMappingsAction.Request(mappings, dryRun, masterNodeTimeout, ackTimeout).indices(indices); + } + + private String[] randomIndices() { + return randomList(10, () -> randomAlphaOfLength(20)).toArray(new String[0]); + } + + public static T[] randomArrayValueOtherThan(T[] input, Supplier randomSupplier) { + return randomValueOtherThanMany(v -> Arrays.equals(input, v), randomSupplier); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java new file mode 100644 index 000000000000..e6b1a01bc613 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/datastreams/UpdateDataStreamMappingsActionResponseTests.java @@ -0,0 +1,169 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.datastreams; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.ComponentTemplateTests.randomMappings; +import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS; +import static org.hamcrest.Matchers.equalTo; + +public class UpdateDataStreamMappingsActionResponseTests extends AbstractWireSerializingTestCase { + + public void testToXContent() throws IOException { + Map dataStream1Mappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword")) + ); + Map dataStream1EffectiveMappings = Map.of( + "properties", + Map.of("field1", Map.of("type", "keyword"), "field2", Map.of("type", "keyword"), "field3", Map.of("type", "keyword")) + ); + Map dataStream2Mappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "keyword"), "field5", Map.of("type", "keyword")) + ); + Map dataStream2EffectiveMappings = Map.of( + "properties", + Map.of("field4", Map.of("type", "keyword"), "field5", Map.of("type", "keyword"), "field6", Map.of("type", "keyword")) + ); + boolean dataStream1Succeeded = randomBoolean(); + String dataStream1Error = randomBoolean() ? null : randomAlphaOfLength(20); + boolean dataStream2Succeeded = randomBoolean(); + String dataStream2Error = randomBoolean() ? null : randomAlphaOfLength(20); + UpdateDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse1 = + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + new CompressedXContent(dataStream1Mappings), + new CompressedXContent(dataStream1EffectiveMappings) + ); + UpdateDataStreamMappingsAction.DataStreamMappingsResponse DataStreamMappingsResponse2 = + new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + new CompressedXContent(dataStream2Mappings), + new CompressedXContent(dataStream2EffectiveMappings) + ); + List responseList = List.of( + DataStreamMappingsResponse1, + DataStreamMappingsResponse2 + ); + UpdateDataStreamMappingsAction.Response response = new UpdateDataStreamMappingsAction.Response(responseList); + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.humanReadable(true); + response.toXContentChunked(ToXContent.EMPTY_PARAMS).forEachRemaining(xcontent -> { + try { + xcontent.toXContent(builder, EMPTY_PARAMS); + } catch (IOException e) { + fail(e); + } + }); + Map xContentMap = XContentHelper.convertToMap(BytesReference.bytes(builder), false, builder.contentType()).v2(); + assertThat( + xContentMap, + equalTo( + Map.of( + "data_streams", + List.of( + buildExpectedMap( + "dataStream1", + dataStream1Succeeded, + dataStream1Error, + dataStream1Mappings, + dataStream1EffectiveMappings + ), + buildExpectedMap( + "dataStream2", + dataStream2Succeeded, + dataStream2Error, + dataStream2Mappings, + dataStream2EffectiveMappings + ) + ) + ) + ) + ); + } + } + + @Override + protected Writeable.Reader instanceReader() { + return UpdateDataStreamMappingsAction.Response::new; + } + + @Override + protected UpdateDataStreamMappingsAction.Response createTestInstance() { + return new UpdateDataStreamMappingsAction.Response(randomList(10, this::randomDataStreamMappingsResponse)); + } + + @Override + protected UpdateDataStreamMappingsAction.Response mutateInstance(UpdateDataStreamMappingsAction.Response instance) throws IOException { + List responseList = instance.getDataStreamMappingsResponses(); + List mutatedResponseList = new ArrayList<>(responseList); + switch (between(0, 1)) { + case 0 -> { + if (responseList.isEmpty()) { + mutatedResponseList.add(randomDataStreamMappingsResponse()); + } else { + mutatedResponseList.remove(randomInt(responseList.size() - 1)); + } + } + case 1 -> { + mutatedResponseList.add(randomDataStreamMappingsResponse()); + } + default -> throw new AssertionError("Should not be here"); + } + return new UpdateDataStreamMappingsAction.Response(mutatedResponseList); + } + + private Map buildExpectedMap( + String name, + boolean succeeded, + String error, + Map mappings, + Map effectiveMappings + ) { + Map result = new HashMap<>(); + result.put("name", name); + result.put("applied_to_data_stream", succeeded); + if (error != null) { + result.put("error", error); + } + result.put("mappings", mappings); + result.put("effective_mappings", effectiveMappings); + return result; + } + + private UpdateDataStreamMappingsAction.DataStreamMappingsResponse randomDataStreamMappingsResponse() { + return new UpdateDataStreamMappingsAction.DataStreamMappingsResponse( + "dataStream1", + randomBoolean(), + randomBoolean() ? null : randomAlphaOfLength(20), + randomMappings(), + randomMappings() + ); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index eaf3d63490c3..4af14fdaed0a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -109,7 +109,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase name = randomAlphaOfLength(10); case 1 -> indices = randomNonEmptyIndexInstances(); @@ -198,7 +198,7 @@ public class DataStreamTests extends AbstractXContentSerializingTestCase