Add metrics@custom component template to metrics-*-* index template (#109540)

This lets users customize the metrics data stream mappings, without
having to override a managed component template that may get overridden.

Fixes #109475
This commit is contained in:
Felix Barnsteiner 2024-06-10 19:32:31 +02:00 committed by GitHub
parent a9f31bd2aa
commit 540d2b10a3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 286 additions and 149 deletions

View file

@ -0,0 +1,6 @@
pr: 109540
summary: Add metrics@custom component template to metrics-*-* index template
area: Data streams
type: enhancement
issues:
- 109475

View file

@ -0,0 +1,169 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* This base class provides the boilerplate to simplify the development of integration tests.
* Aside from providing useful helper methods and disabling unnecessary plugins,
* it waits until an {@linkplain #indexTemplateName() index template} is installed, which happens asynchronously in StackTemplateRegistry.
* This avoids race conditions leading to flaky tests by ensuring the template has been installed before executing the tests.
*/
public abstract class AbstractDataStreamIT extends ESRestTestCase {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
// Disable apm-data so the index templates it installs do not impact
// tests such as testIgnoreDynamicBeyondLimit.
.setting("xpack.apm_data.enabled", "false")
.build();
protected RestClient client;
static void waitForIndexTemplate(RestClient client, String indexTemplate) throws Exception {
assertBusy(() -> {
try {
Request request = new Request("GET", "_index_template/" + indexTemplate);
assertOK(client.performRequest(request));
} catch (ResponseException e) {
fail(e.getMessage());
}
});
}
static void createDataStream(RestClient client, String name) throws IOException {
Request request = new Request("PUT", "_data_stream/" + name);
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static String getWriteBackingIndex(RestClient client, String name) throws IOException {
Request request = new Request("GET", "_data_stream/" + name);
List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
List<Map<String, String>> indices = (List<Map<String, String>>) dataStream.get("indices");
return indices.get(0).get("index_name");
}
@SuppressWarnings("unchecked")
static Map<String, Object> getSettings(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings");
return ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings");
}
static void putMapping(RestClient client, String indexName) throws IOException {
Request request = new Request("PUT", "/" + indexName + "/_mapping");
request.setJsonEntity("""
{
"properties": {
"numeric_field": {
"type": "integer"
}
}
}
""");
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static Map<String, Object> getMappingProperties(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_mapping");
Map<String, Object> map = (Map<String, Object>) entityAsMap(client.performRequest(request)).get(indexName);
Map<String, Object> mappings = (Map<String, Object>) map.get("mappings");
return (Map<String, Object>) mappings.get("properties");
}
static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException {
Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
request.setJsonEntity(doc);
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static List<Object> searchDocs(RestClient client, String dataStreamName, String query) throws IOException {
Request request = new Request("GET", "/" + dataStreamName + "/_search");
request.setJsonEntity(query);
Map<String, Object> hits = (Map<String, Object>) entityAsMap(client.performRequest(request)).get("hits");
return (List<Object>) hits.get("hits");
}
@SuppressWarnings("unchecked")
static Object getValueFromPath(Map<String, Object> map, List<String> path) {
Map<String, Object> current = map;
for (int i = 0; i < path.size(); i++) {
Object value = current.get(path.get(i));
if (i == path.size() - 1) {
return value;
}
if (value == null) {
throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map);
}
if (value instanceof Map<?, ?> next) {
current = (Map<String, Object>) next;
} else {
throw new IllegalStateException(
"Failed to reach the end of the path "
+ String.join(".", path)
+ " last reachable field was "
+ path.get(i)
+ " in "
+ map
);
}
}
return current;
}
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected Settings restAdminSettings() {
if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
return super.restAdminSettings();
} else {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}
@Before
public void setup() throws Exception {
client = client();
AbstractDataStreamIT.waitForIndexTemplate(client, indexTemplateName());
}
protected abstract String indexTemplateName();
@After
public void cleanUp() throws IOException {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
}

View file

@ -26,7 +26,7 @@ import static org.elasticsearch.datastreams.LogsDataStreamIT.getValueFromPath;
import static org.elasticsearch.datastreams.LogsDataStreamIT.getWriteBackingIndex; import static org.elasticsearch.datastreams.LogsDataStreamIT.getWriteBackingIndex;
import static org.elasticsearch.datastreams.LogsDataStreamIT.indexDoc; import static org.elasticsearch.datastreams.LogsDataStreamIT.indexDoc;
import static org.elasticsearch.datastreams.LogsDataStreamIT.searchDocs; import static org.elasticsearch.datastreams.LogsDataStreamIT.searchDocs;
import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForLogs; import static org.elasticsearch.datastreams.LogsDataStreamIT.waitForIndexTemplate;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase { public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase {
@ -38,7 +38,7 @@ public class EcsLogsDataStreamIT extends DisabledSecurityDataStreamTestCase {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
client = client(); client = client();
waitForLogs(client); waitForIndexTemplate(client, "logs");
{ {
Request request = new Request("PUT", "/_ingest/pipeline/logs@custom"); Request request = new Request("PUT", "/_ingest/pipeline/logs@custom");

View file

@ -9,20 +9,7 @@
package org.elasticsearch.datastreams; package org.elasticsearch.datastreams;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.cluster.ElasticsearchCluster;
import org.elasticsearch.test.cluster.FeatureFlag;
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -35,46 +22,7 @@ import static org.hamcrest.Matchers.matchesRegex;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
public class LogsDataStreamIT extends ESRestTestCase { public class LogsDataStreamIT extends AbstractDataStreamIT {
@ClassRule
public static ElasticsearchCluster cluster = ElasticsearchCluster.local()
.distribution(DistributionType.DEFAULT)
.feature(FeatureFlag.FAILURE_STORE_ENABLED)
.setting("xpack.security.enabled", "false")
.setting("xpack.watcher.enabled", "false")
// Disable apm-data so the index templates it installs do not impact
// tests such as testIgnoreDynamicBeyondLimit.
.setting("xpack.apm_data.enabled", "false")
.build();
@Override
protected String getTestRestCluster() {
return cluster.getHttpAddresses();
}
@Override
protected Settings restAdminSettings() {
if (super.restAdminSettings().keySet().contains(ThreadContext.PREFIX + ".Authorization")) {
return super.restAdminSettings();
} else {
String token = basicAuthHeaderValue("admin", new SecureString("admin-password".toCharArray()));
return Settings.builder().put(super.restAdminSettings()).put(ThreadContext.PREFIX + ".Authorization", token).build();
}
}
private RestClient client;
@Before
public void setup() throws Exception {
client = client();
waitForLogs(client);
}
@After
public void cleanUp() throws IOException {
adminClient().performRequest(new Request("DELETE", "_data_stream/*"));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void testDefaultLogsSettingAndMapping() throws Exception { public void testDefaultLogsSettingAndMapping() throws Exception {
@ -791,97 +739,8 @@ public class LogsDataStreamIT extends ESRestTestCase {
assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty()); assertThat(ignored.stream().filter(i -> i.startsWith("field") == false).toList(), empty());
} }
static void waitForLogs(RestClient client) throws Exception { @Override
assertBusy(() -> { protected String indexTemplateName() {
try { return "logs";
Request request = new Request("GET", "_index_template/logs");
assertOK(client.performRequest(request));
} catch (ResponseException e) {
fail(e.getMessage());
}
});
}
static void createDataStream(RestClient client, String name) throws IOException {
Request request = new Request("PUT", "_data_stream/" + name);
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static String getWriteBackingIndex(RestClient client, String name) throws IOException {
Request request = new Request("GET", "_data_stream/" + name);
List<Object> dataStreams = (List<Object>) entityAsMap(client.performRequest(request)).get("data_streams");
Map<String, Object> dataStream = (Map<String, Object>) dataStreams.get(0);
List<Map<String, String>> indices = (List<Map<String, String>>) dataStream.get("indices");
return indices.get(0).get("index_name");
}
@SuppressWarnings("unchecked")
static Map<String, Object> getSettings(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_settings?flat_settings");
return ((Map<String, Map<String, Object>>) entityAsMap(client.performRequest(request)).get(indexName)).get("settings");
}
static void putMapping(RestClient client, String indexName) throws IOException {
Request request = new Request("PUT", "/" + indexName + "/_mapping");
request.setJsonEntity("""
{
"properties": {
"numeric_field": {
"type": "integer"
}
}
}
""");
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static Map<String, Object> getMappingProperties(RestClient client, String indexName) throws IOException {
Request request = new Request("GET", "/" + indexName + "/_mapping");
Map<String, Object> map = (Map<String, Object>) entityAsMap(client.performRequest(request)).get(indexName);
Map<String, Object> mappings = (Map<String, Object>) map.get("mappings");
return (Map<String, Object>) mappings.get("properties");
}
static void indexDoc(RestClient client, String dataStreamName, String doc) throws IOException {
Request request = new Request("POST", "/" + dataStreamName + "/_doc?refresh=true");
request.setJsonEntity(doc);
assertOK(client.performRequest(request));
}
@SuppressWarnings("unchecked")
static List<Object> searchDocs(RestClient client, String dataStreamName, String query) throws IOException {
Request request = new Request("GET", "/" + dataStreamName + "/_search");
request.setJsonEntity(query);
Map<String, Object> hits = (Map<String, Object>) entityAsMap(client.performRequest(request)).get("hits");
return (List<Object>) hits.get("hits");
}
@SuppressWarnings("unchecked")
static Object getValueFromPath(Map<String, Object> map, List<String> path) {
Map<String, Object> current = map;
for (int i = 0; i < path.size(); i++) {
Object value = current.get(path.get(i));
if (i == path.size() - 1) {
return value;
}
if (value == null) {
throw new IllegalStateException("Path " + String.join(".", path) + " was not found in " + map);
}
if (value instanceof Map<?, ?> next) {
current = (Map<String, Object>) next;
} else {
throw new IllegalStateException(
"Failed to reach the end of the path "
+ String.join(".", path)
+ " last reachable field was "
+ path.get(i)
+ " in "
+ map
);
}
}
return current;
} }
} }

View file

@ -0,0 +1,101 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.datastreams;
import org.elasticsearch.client.Request;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class MetricsDataStreamIT extends AbstractDataStreamIT {
@SuppressWarnings("unchecked")
public void testCustomMapping() throws Exception {
{
Request request = new Request("POST", "/_component_template/metrics@custom");
request.setJsonEntity("""
{
"template": {
"settings": {
"index": {
"query": {
"default_field": ["custom-message"]
}
}
},
"mappings": {
"properties": {
"numeric_field": {
"type": "integer"
},
"socket": {
"properties": {
"ip": {
"type": "keyword"
}
}
}
}
}
}
}
""");
assertOK(client.performRequest(request));
}
String dataStreamName = "metrics-generic-default";
createDataStream(client, dataStreamName);
String backingIndex = getWriteBackingIndex(client, dataStreamName);
// Verify that the custom settings.index.query.default_field overrides the default query field - "message"
Map<String, Object> settings = getSettings(client, backingIndex);
assertThat(settings.get("index.query.default_field"), is(List.of("custom-message")));
// Verify that the new field from the custom component template is applied
putMapping(client, backingIndex);
Map<String, Object> mappingProperties = getMappingProperties(client, backingIndex);
assertThat(getValueFromPath(mappingProperties, List.of("numeric_field", "type")), equalTo("integer"));
assertThat(getValueFromPath(mappingProperties, List.of("socket", "properties", "ip", "type")), is("keyword"));
// Insert valid doc and verify successful indexing
{
indexDoc(client, dataStreamName, """
{
"@timestamp": "2024-06-10",
"test": "doc-with-ip",
"socket": {
"ip": "127.0.0.1"
}
}
""");
List<Object> results = searchDocs(client, dataStreamName, """
{
"query": {
"term": {
"test": {
"value": "doc-with-ip"
}
}
},
"fields": ["socket.ip"]
}
""");
Map<String, Object> fields = ((Map<String, Map<String, Object>>) results.get(0)).get("_source");
assertThat(fields.get("socket"), is(Map.of("ip", "127.0.0.1")));
}
}
@Override
protected String indexTemplateName() {
return "metrics";
}
}

View file

@ -5,8 +5,10 @@
"composed_of": [ "composed_of": [
"metrics@mappings", "metrics@mappings",
"data-streams@mappings", "data-streams@mappings",
"metrics@settings" "metrics@settings",
"metrics@custom"
], ],
"ignore_missing_component_templates": ["metrics@custom"],
"allow_auto_create": true, "allow_auto_create": true,
"_meta": { "_meta": {
"description": "default metrics template installed by x-pack", "description": "default metrics template installed by x-pack",

View file

@ -47,7 +47,7 @@ public class StackTemplateRegistry extends IndexTemplateRegistry {
// The stack template registry version. This number must be incremented when we make changes // The stack template registry version. This number must be incremented when we make changes
// to built-in templates. // to built-in templates.
public static final int REGISTRY_VERSION = 10; public static final int REGISTRY_VERSION = 11;
public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version"; public static final String TEMPLATE_VERSION_VARIABLE = "xpack.stack.template.version";
public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting( public static final Setting<Boolean> STACK_TEMPLATES_ENABLED = Setting.boolSetting(