From 3e6100f441dd9f5890eda635f5502f87ffd7009f Mon Sep 17 00:00:00 2001 From: Felix Barnsteiner Date: Fri, 6 Jun 2025 17:50:14 +0200 Subject: [PATCH] Index into TSDB --- .../xpack/core/ClientHelper.java | 1 + x-pack/plugin/metricsdb/build.gradle | 5 +- .../xpack/metrics/MetricsDBIndexingIT.java | 41 +++++++--- .../MetricsDBIndexTemplateRegistry.java | 48 +++++++++++ .../xpack/metrics/MetricsDBPlugin.java | 25 ++++++ .../metrics/MetricsDBTransportAction.java | 22 +++-- .../index-templates/metricsdb@template.yaml | 81 +++++++++++++++++++ .../src/main/resources/resources.yaml | 7 ++ .../security/authz/AuthorizationUtils.java | 2 + 9 files changed, 214 insertions(+), 18 deletions(-) create mode 100644 x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBIndexTemplateRegistry.java create mode 100644 x-pack/plugin/metricsdb/src/main/resources/index-templates/metricsdb@template.yaml create mode 100644 x-pack/plugin/metricsdb/src/main/resources/resources.yaml diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java index 3577b1d834f8..2cb6b48876e5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ClientHelper.java @@ -195,6 +195,7 @@ public final class ClientHelper { public static final String INFERENCE_ORIGIN = "inference"; public static final String APM_ORIGIN = "apm"; public static final String OTEL_ORIGIN = "otel"; + public static final String METRICSDB_ORIGIN = "metricsdb"; public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream"; public static final String ESQL_ORIGIN = "esql"; diff --git a/x-pack/plugin/metricsdb/build.gradle b/x-pack/plugin/metricsdb/build.gradle index 355e5c3658f6..2b20f635b1bf 100644 --- a/x-pack/plugin/metricsdb/build.gradle +++ b/x-pack/plugin/metricsdb/build.gradle @@ -11,11 +11,11 @@ apply plugin: 'elasticsearch.internal-cluster-test' esplugin { name = 'metricsdb' description = 'A plugin for metrics related functionality' - classname ='org.elasticsearch.xpack.metrics.MetricsPlugin' + classname ='org.elasticsearch.xpack.metrics.MetricsDBPlugin' extendedPlugins = ['x-pack-core'] } base { - archivesName = 'x-pack-metrics' + archivesName = 'x-pack-metricsdb' } dependencies { @@ -23,6 +23,7 @@ dependencies { testImplementation project(':modules:data-streams') testImplementation project(':x-pack:plugin:esql') testImplementation project(':x-pack:plugin:esql-core') + testImplementation project(':x-pack:plugin:mapper-version') def otelVersion = "1.48.0" implementation "io.opentelemetry:opentelemetry-api:$otelVersion" diff --git a/x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java b/x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java index a7f59fdba8d6..2a28513a5b5b 100644 --- a/x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java +++ b/x-pack/plugin/metricsdb/src/internalClusterTest/java/org/elasticsearch/xpack/metrics/MetricsDBIndexingIT.java @@ -11,6 +11,7 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; +import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -22,6 +23,7 @@ import io.opentelemetry.sdk.resources.Resource; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.TimeValue; @@ -33,6 +35,7 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; +import org.elasticsearch.xpack.versionfield.VersionFieldPlugin; import org.junit.Test; import java.net.InetSocketAddress; @@ -42,6 +45,8 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; @@ -54,7 +59,13 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, MetricsDBPlugin.class, EsqlPlugin.class); + return List.of( + DataStreamsPlugin.class, + InternalSettingsPlugin.class, + MetricsDBPlugin.class, + EsqlPlugin.class, + VersionFieldPlugin.class + ); } @Override @@ -83,6 +94,14 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase { .build() ) .build(); + assertBusy(() -> { + GetComposableIndexTemplateAction.Request getReq = new GetComposableIndexTemplateAction.Request( + TEST_REQUEST_TIMEOUT, + "metricsdb@template" + ); + var templates = client().execute(GetComposableIndexTemplateAction.INSTANCE, getReq).actionGet().indexTemplates(); + assertThat(templates, not(anEmptyMap())); + }); } private int getHttpPort() { @@ -111,16 +130,16 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase { .setUnit("By") .buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty())); - var result = meterProvider.forceFlush().join(1, TimeUnit.SECONDS); + var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS); assertThat(result.isSuccess(), is(true)); admin().indices().prepareRefresh().execute().actionGet(); - String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices(); + String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices(); assertThat(indices, not(emptyArray())); try (EsqlQueryResponse resp = query(""" - FROM metrics* - | STATS avg(value_double) WHERE metric_name == "jvm.memory.total" + FROM metricsdb + | STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total" """)) { double avgJvmMemoryTotal = (double) resp.column(0).next(); assertThat(avgJvmMemoryTotal, greaterThan(0.0)); @@ -138,8 +157,8 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase { ImmutableGaugeData.create( List.of( ImmutableDoublePointData.create( - System.currentTimeMillis(), - System.currentTimeMillis(), + Clock.getDefault().now(), + Clock.getDefault().now(), Attributes.empty(), Runtime.getRuntime().totalMemory() ) @@ -147,16 +166,16 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase { ) ); - var result = exporter.export(List.of(jvmMemoryMetricData)).join(1, TimeUnit.SECONDS); + var result = exporter.export(List.of(jvmMemoryMetricData)).join(10, TimeUnit.SECONDS); assertThat(result.isSuccess(), is(true)); admin().indices().prepareRefresh().execute().actionGet(); - String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices(); + String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices(); assertThat(indices, not(emptyArray())); try (EsqlQueryResponse resp = query(""" - FROM metrics* - | STATS avg(value_double) WHERE metric_name == "jvm.memory.total" + FROM metricsdb + | STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total" """)) { double avgJvmMemoryTotal = (double) resp.column(0).next(); assertThat(avgJvmMemoryTotal, greaterThan(0.0)); diff --git a/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBIndexTemplateRegistry.java b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBIndexTemplateRegistry.java new file mode 100644 index 000000000000..ba595400a1de --- /dev/null +++ b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBIndexTemplateRegistry.java @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.metrics; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xcontent.NamedXContentRegistry; +import org.elasticsearch.xpack.core.ClientHelper; +import org.elasticsearch.xpack.core.template.YamlTemplateRegistry; + +public class MetricsDBIndexTemplateRegistry extends YamlTemplateRegistry { + + public static final String OTEL_TEMPLATE_VERSION_VARIABLE = "xpack.metricsdb.template.version"; + + public MetricsDBIndexTemplateRegistry( + Settings nodeSettings, + ClusterService clusterService, + ThreadPool threadPool, + Client client, + NamedXContentRegistry xContentRegistry, + ProjectResolver projectResolver + ) { + super(nodeSettings, clusterService, threadPool, client, xContentRegistry, projectResolver); + } + + @Override + protected String getOrigin() { + return ClientHelper.METRICSDB_ORIGIN; + } + + @Override + public String getName() { + return "MetricsDB"; + } + + @Override + protected String getVersionProperty() { + return OTEL_TEMPLATE_VERSION_VARIABLE; + } +} diff --git a/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBPlugin.java b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBPlugin.java index 0db45770d97b..73ea12027092 100644 --- a/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBPlugin.java +++ b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBPlugin.java @@ -7,8 +7,10 @@ package org.elasticsearch.xpack.metrics; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; @@ -26,6 +28,9 @@ import java.util.function.Predicate; import java.util.function.Supplier; public class MetricsDBPlugin extends Plugin implements ActionPlugin { + + final SetOnce registry = new SetOnce<>(); + @Override public Collection getRestHandlers( Settings settings, @@ -41,6 +46,26 @@ public class MetricsDBPlugin extends Plugin implements ActionPlugin { return List.of(new MetricsDBRestAction()); } + @Override + public Collection createComponents(PluginServices services) { + Settings settings = services.environment().settings(); + ClusterService clusterService = services.clusterService(); + registry.set( + new MetricsDBIndexTemplateRegistry( + settings, + clusterService, + services.threadPool(), + services.client(), + services.xContentRegistry(), + services.projectResolver() + ) + ); + MetricsDBIndexTemplateRegistry registryInstance = registry.get(); + registryInstance.setEnabled(true); + registryInstance.initialize(); + return List.of(); + } + @Override public Collection getActions() { return List.of(new ActionHandler(MetricsDBTransportAction.TYPE, MetricsDBTransportAction.class)); diff --git a/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBTransportAction.java b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBTransportAction.java index 02b70a40ae92..5082e8a7d1ba 100644 --- a/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBTransportAction.java +++ b/x-pack/plugin/metricsdb/src/main/java/org/elasticsearch/xpack/metrics/MetricsDBTransportAction.java @@ -40,6 +40,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.core.Nullable; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -80,9 +81,10 @@ public class MetricsDBTransportAction extends HandledTransportAction< public void onResponse(BulkResponse bulkItemResponses) { MessageLite response; if (bulkItemResponses.hasFailures()) { + long failures = Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count(); response = ExportMetricsServiceResponse.newBuilder() .getPartialSuccessBuilder() - .setRejectedDataPoints(Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count()) + .setRejectedDataPoints(failures) .setErrorMessage(bulkItemResponses.buildFailureMessage()) .build(); } else { @@ -168,7 +170,13 @@ public class MetricsDBTransportAction extends HandledTransportAction< metric, dp ); - bulkRequestBuilder.add(client.prepareIndex("metricsdb").setCreate(true).setSource(xContentBuilder)); + bulkRequestBuilder.add( + client.prepareIndex("metricsdb") + .setCreate(true) + .setRequireDataStream(true) + .setPipeline(IngestService.NOOP_PIPELINE_NAME) + .setSource(xContentBuilder) + ); } } } @@ -197,7 +205,6 @@ public class MetricsDBTransportAction extends HandledTransportAction< builder.startObject("attributes"); buildAttributes(builder, resourceAttributes); builder.endObject(); - builder.field("metric_name", metric.getName()); builder.field("unit", metric.getUnit()); builder.field("type"); if (metric.getDataCase() == Metric.DataCase.SUM) { @@ -217,11 +224,16 @@ public class MetricsDBTransportAction extends HandledTransportAction< metric.getExponentialHistogram().getAggregationTemporality().toString() ); } + builder.startObject("metric"); + builder.field("name", metric.getName()); + builder.startObject("value"); switch (dp.getValueCase()) { - case AS_DOUBLE -> builder.field("value_double", dp.getAsDouble()); - case AS_INT -> builder.field("value_long", dp.getAsInt()); + case AS_DOUBLE -> builder.field("double", dp.getAsDouble()); + case AS_INT -> builder.field("long", dp.getAsInt()); } builder.endObject(); + builder.endObject(); + builder.endObject(); } private void buildAttributes(XContentBuilder builder, List resourceAttributes) throws IOException { diff --git a/x-pack/plugin/metricsdb/src/main/resources/index-templates/metricsdb@template.yaml b/x-pack/plugin/metricsdb/src/main/resources/index-templates/metricsdb@template.yaml new file mode 100644 index 000000000000..8c46605ecaa1 --- /dev/null +++ b/x-pack/plugin/metricsdb/src/main/resources/index-templates/metricsdb@template.yaml @@ -0,0 +1,81 @@ +--- +version: ${xpack.metricsdb.template.version} +index_patterns: ["metricsdb"] +priority: 120 +data_stream: {} +allow_auto_create: true +_meta: + description: default OpenTelemetry metrics template installed by x-pack + managed: true +template: + settings: + index.mode: time_series + index.mapping.ignore_malformed: true + mappings: + date_detection: false + dynamic: false + properties: + "@timestamp": + type: date + attributes: + type: passthrough + dynamic: true + priority: 20 + time_series_dimension: true + dropped_attributes_count: + type: long + scope: + properties: + name: + type: keyword + ignore_above: 1024 + time_series_dimension: true + version: + type: version + schema_url: + type: keyword + ignore_above: 1024 + dropped_attributes_count: + type: long + attributes: + type: passthrough + dynamic: true + priority: 30 + time_series_dimension: true + resource: + properties: + schema_url: + type: keyword + ignore_above: 1024 + dropped_attributes_count: + type: long + attributes: + type: passthrough + dynamic: true + priority: 40 + time_series_dimension: true + start_timestamp: + type: date + metric: + properties: + name: + type: keyword + time_series_dimension: true + value: + properties: + long: + type: long + double: + type: double + unit: + type: keyword + time_series_dimension: true + ignore_above: 1024 + temporality: + type: keyword + time_series_dimension: true + ignore_above: 1024 + type: + type: keyword + time_series_dimension: true + ignore_above: 1024 diff --git a/x-pack/plugin/metricsdb/src/main/resources/resources.yaml b/x-pack/plugin/metricsdb/src/main/resources/resources.yaml new file mode 100644 index 000000000000..0f72e17cc44b --- /dev/null +++ b/x-pack/plugin/metricsdb/src/main/resources/resources.yaml @@ -0,0 +1,7 @@ +# "version" holds the version of the templates and ingest pipelines installed. +# This must be increased whenever an existing template is +# changed, in order for it to be updated on Elasticsearch upgrade. +version: 1 + +index-templates: + - metricsdb@template diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java index 481c59aa25b6..fd8f74855eb5 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationUtils.java @@ -38,6 +38,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INFERENCE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN; +import static org.elasticsearch.xpack.core.ClientHelper.METRICSDB_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN; @@ -158,6 +159,7 @@ public final class AuthorizationUtils { case PROFILING_ORIGIN: case APM_ORIGIN: case OTEL_ORIGIN: + case METRICSDB_ORIGIN: case STACK_ORIGIN: case SEARCHABLE_SNAPSHOTS_ORIGIN: case LOGSTASH_MANAGEMENT_ORIGIN: