Index into TSDB

This commit is contained in:
Felix Barnsteiner 2025-06-06 17:50:14 +02:00
parent f77032e54e
commit 3e6100f441
No known key found for this signature in database
GPG key ID: B13943FF9047831D
9 changed files with 214 additions and 18 deletions

View file

@ -195,6 +195,7 @@ public final class ClientHelper {
public static final String INFERENCE_ORIGIN = "inference"; public static final String INFERENCE_ORIGIN = "inference";
public static final String APM_ORIGIN = "apm"; public static final String APM_ORIGIN = "apm";
public static final String OTEL_ORIGIN = "otel"; public static final String OTEL_ORIGIN = "otel";
public static final String METRICSDB_ORIGIN = "metricsdb";
public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream"; public static final String REINDEX_DATA_STREAM_ORIGIN = "reindex_data_stream";
public static final String ESQL_ORIGIN = "esql"; public static final String ESQL_ORIGIN = "esql";

View file

@ -11,11 +11,11 @@ apply plugin: 'elasticsearch.internal-cluster-test'
esplugin { esplugin {
name = 'metricsdb' name = 'metricsdb'
description = 'A plugin for metrics related functionality' description = 'A plugin for metrics related functionality'
classname ='org.elasticsearch.xpack.metrics.MetricsPlugin' classname ='org.elasticsearch.xpack.metrics.MetricsDBPlugin'
extendedPlugins = ['x-pack-core'] extendedPlugins = ['x-pack-core']
} }
base { base {
archivesName = 'x-pack-metrics' archivesName = 'x-pack-metricsdb'
} }
dependencies { dependencies {
@ -23,6 +23,7 @@ dependencies {
testImplementation project(':modules:data-streams') testImplementation project(':modules:data-streams')
testImplementation project(':x-pack:plugin:esql') testImplementation project(':x-pack:plugin:esql')
testImplementation project(':x-pack:plugin:esql-core') testImplementation project(':x-pack:plugin:esql-core')
testImplementation project(':x-pack:plugin:mapper-version')
def otelVersion = "1.48.0" def otelVersion = "1.48.0"
implementation "io.opentelemetry:opentelemetry-api:$otelVersion" implementation "io.opentelemetry:opentelemetry-api:$otelVersion"

View file

@ -11,6 +11,7 @@ import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider; import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.MetricData;
@ -22,6 +23,7 @@ import io.opentelemetry.sdk.resources.Resource;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.indices.template.get.GetComposableIndexTemplateAction;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
@ -33,6 +35,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder; import org.elasticsearch.xpack.esql.action.EsqlQueryRequestBuilder;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.versionfield.VersionFieldPlugin;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -42,6 +45,8 @@ import java.util.List;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.emptyArray;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
@ -54,7 +59,13 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> getPlugins() { protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class, InternalSettingsPlugin.class, MetricsDBPlugin.class, EsqlPlugin.class); return List.of(
DataStreamsPlugin.class,
InternalSettingsPlugin.class,
MetricsDBPlugin.class,
EsqlPlugin.class,
VersionFieldPlugin.class
);
} }
@Override @Override
@ -83,6 +94,14 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
.build() .build()
) )
.build(); .build();
assertBusy(() -> {
GetComposableIndexTemplateAction.Request getReq = new GetComposableIndexTemplateAction.Request(
TEST_REQUEST_TIMEOUT,
"metricsdb@template"
);
var templates = client().execute(GetComposableIndexTemplateAction.INSTANCE, getReq).actionGet().indexTemplates();
assertThat(templates, not(anEmptyMap()));
});
} }
private int getHttpPort() { private int getHttpPort() {
@ -111,16 +130,16 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
.setUnit("By") .setUnit("By")
.buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty())); .buildWithCallback(result -> result.record(Runtime.getRuntime().totalMemory(), Attributes.empty()));
var result = meterProvider.forceFlush().join(1, TimeUnit.SECONDS); var result = meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
assertThat(result.isSuccess(), is(true)); assertThat(result.isSuccess(), is(true));
admin().indices().prepareRefresh().execute().actionGet(); admin().indices().prepareRefresh().execute().actionGet();
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices(); String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices();
assertThat(indices, not(emptyArray())); assertThat(indices, not(emptyArray()));
try (EsqlQueryResponse resp = query(""" try (EsqlQueryResponse resp = query("""
FROM metrics* FROM metricsdb
| STATS avg(value_double) WHERE metric_name == "jvm.memory.total" | STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total"
""")) { """)) {
double avgJvmMemoryTotal = (double) resp.column(0).next(); double avgJvmMemoryTotal = (double) resp.column(0).next();
assertThat(avgJvmMemoryTotal, greaterThan(0.0)); assertThat(avgJvmMemoryTotal, greaterThan(0.0));
@ -138,8 +157,8 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
ImmutableGaugeData.create( ImmutableGaugeData.create(
List.of( List.of(
ImmutableDoublePointData.create( ImmutableDoublePointData.create(
System.currentTimeMillis(), Clock.getDefault().now(),
System.currentTimeMillis(), Clock.getDefault().now(),
Attributes.empty(), Attributes.empty(),
Runtime.getRuntime().totalMemory() Runtime.getRuntime().totalMemory()
) )
@ -147,16 +166,16 @@ public class MetricsDBIndexingIT extends ESSingleNodeTestCase {
) )
); );
var result = exporter.export(List.of(jvmMemoryMetricData)).join(1, TimeUnit.SECONDS); var result = exporter.export(List.of(jvmMemoryMetricData)).join(10, TimeUnit.SECONDS);
assertThat(result.isSuccess(), is(true)); assertThat(result.isSuccess(), is(true));
admin().indices().prepareRefresh().execute().actionGet(); admin().indices().prepareRefresh().execute().actionGet();
String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metrics*").get().indices(); String[] indices = admin().indices().prepareGetIndex(TimeValue.timeValueSeconds(1)).setIndices("metricsdb").get().indices();
assertThat(indices, not(emptyArray())); assertThat(indices, not(emptyArray()));
try (EsqlQueryResponse resp = query(""" try (EsqlQueryResponse resp = query("""
FROM metrics* FROM metricsdb
| STATS avg(value_double) WHERE metric_name == "jvm.memory.total" | STATS avg(metric.value.double) WHERE metric.name == "jvm.memory.total"
""")) { """)) {
double avgJvmMemoryTotal = (double) resp.column(0).next(); double avgJvmMemoryTotal = (double) resp.column(0).next();
assertThat(avgJvmMemoryTotal, greaterThan(0.0)); assertThat(avgJvmMemoryTotal, greaterThan(0.0));

View file

@ -0,0 +1,48 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.metrics;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.template.YamlTemplateRegistry;
public class MetricsDBIndexTemplateRegistry extends YamlTemplateRegistry {
public static final String OTEL_TEMPLATE_VERSION_VARIABLE = "xpack.metricsdb.template.version";
public MetricsDBIndexTemplateRegistry(
Settings nodeSettings,
ClusterService clusterService,
ThreadPool threadPool,
Client client,
NamedXContentRegistry xContentRegistry,
ProjectResolver projectResolver
) {
super(nodeSettings, clusterService, threadPool, client, xContentRegistry, projectResolver);
}
@Override
protected String getOrigin() {
return ClientHelper.METRICSDB_ORIGIN;
}
@Override
public String getName() {
return "MetricsDB";
}
@Override
protected String getVersionProperty() {
return OTEL_TEMPLATE_VERSION_VARIABLE;
}
}

View file

@ -7,8 +7,10 @@
package org.elasticsearch.xpack.metrics; package org.elasticsearch.xpack.metrics;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.IndexScopedSettings;
@ -26,6 +28,9 @@ import java.util.function.Predicate;
import java.util.function.Supplier; import java.util.function.Supplier;
public class MetricsDBPlugin extends Plugin implements ActionPlugin { public class MetricsDBPlugin extends Plugin implements ActionPlugin {
final SetOnce<MetricsDBIndexTemplateRegistry> registry = new SetOnce<>();
@Override @Override
public Collection<RestHandler> getRestHandlers( public Collection<RestHandler> getRestHandlers(
Settings settings, Settings settings,
@ -41,6 +46,26 @@ public class MetricsDBPlugin extends Plugin implements ActionPlugin {
return List.of(new MetricsDBRestAction()); return List.of(new MetricsDBRestAction());
} }
@Override
public Collection<?> createComponents(PluginServices services) {
Settings settings = services.environment().settings();
ClusterService clusterService = services.clusterService();
registry.set(
new MetricsDBIndexTemplateRegistry(
settings,
clusterService,
services.threadPool(),
services.client(),
services.xContentRegistry(),
services.projectResolver()
)
);
MetricsDBIndexTemplateRegistry registryInstance = registry.get();
registryInstance.setEnabled(true);
registryInstance.initialize();
return List.of();
}
@Override @Override
public Collection<ActionHandler> getActions() { public Collection<ActionHandler> getActions() {
return List.of(new ActionHandler(MetricsDBTransportAction.TYPE, MetricsDBTransportAction.class)); return List.of(new ActionHandler(MetricsDBTransportAction.TYPE, MetricsDBTransportAction.class));

View file

@ -40,6 +40,7 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Nullable;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -80,9 +81,10 @@ public class MetricsDBTransportAction extends HandledTransportAction<
public void onResponse(BulkResponse bulkItemResponses) { public void onResponse(BulkResponse bulkItemResponses) {
MessageLite response; MessageLite response;
if (bulkItemResponses.hasFailures()) { if (bulkItemResponses.hasFailures()) {
long failures = Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count();
response = ExportMetricsServiceResponse.newBuilder() response = ExportMetricsServiceResponse.newBuilder()
.getPartialSuccessBuilder() .getPartialSuccessBuilder()
.setRejectedDataPoints(Arrays.stream(bulkItemResponses.getItems()).filter(BulkItemResponse::isFailed).count()) .setRejectedDataPoints(failures)
.setErrorMessage(bulkItemResponses.buildFailureMessage()) .setErrorMessage(bulkItemResponses.buildFailureMessage())
.build(); .build();
} else { } else {
@ -168,7 +170,13 @@ public class MetricsDBTransportAction extends HandledTransportAction<
metric, metric,
dp dp
); );
bulkRequestBuilder.add(client.prepareIndex("metricsdb").setCreate(true).setSource(xContentBuilder)); bulkRequestBuilder.add(
client.prepareIndex("metricsdb")
.setCreate(true)
.setRequireDataStream(true)
.setPipeline(IngestService.NOOP_PIPELINE_NAME)
.setSource(xContentBuilder)
);
} }
} }
} }
@ -197,7 +205,6 @@ public class MetricsDBTransportAction extends HandledTransportAction<
builder.startObject("attributes"); builder.startObject("attributes");
buildAttributes(builder, resourceAttributes); buildAttributes(builder, resourceAttributes);
builder.endObject(); builder.endObject();
builder.field("metric_name", metric.getName());
builder.field("unit", metric.getUnit()); builder.field("unit", metric.getUnit());
builder.field("type"); builder.field("type");
if (metric.getDataCase() == Metric.DataCase.SUM) { if (metric.getDataCase() == Metric.DataCase.SUM) {
@ -217,11 +224,16 @@ public class MetricsDBTransportAction extends HandledTransportAction<
metric.getExponentialHistogram().getAggregationTemporality().toString() metric.getExponentialHistogram().getAggregationTemporality().toString()
); );
} }
builder.startObject("metric");
builder.field("name", metric.getName());
builder.startObject("value");
switch (dp.getValueCase()) { switch (dp.getValueCase()) {
case AS_DOUBLE -> builder.field("value_double", dp.getAsDouble()); case AS_DOUBLE -> builder.field("double", dp.getAsDouble());
case AS_INT -> builder.field("value_long", dp.getAsInt()); case AS_INT -> builder.field("long", dp.getAsInt());
} }
builder.endObject(); builder.endObject();
builder.endObject();
builder.endObject();
} }
private void buildAttributes(XContentBuilder builder, List<KeyValue> resourceAttributes) throws IOException { private void buildAttributes(XContentBuilder builder, List<KeyValue> resourceAttributes) throws IOException {

View file

@ -0,0 +1,81 @@
---
version: ${xpack.metricsdb.template.version}
index_patterns: ["metricsdb"]
priority: 120
data_stream: {}
allow_auto_create: true
_meta:
description: default OpenTelemetry metrics template installed by x-pack
managed: true
template:
settings:
index.mode: time_series
index.mapping.ignore_malformed: true
mappings:
date_detection: false
dynamic: false
properties:
"@timestamp":
type: date
attributes:
type: passthrough
dynamic: true
priority: 20
time_series_dimension: true
dropped_attributes_count:
type: long
scope:
properties:
name:
type: keyword
ignore_above: 1024
time_series_dimension: true
version:
type: version
schema_url:
type: keyword
ignore_above: 1024
dropped_attributes_count:
type: long
attributes:
type: passthrough
dynamic: true
priority: 30
time_series_dimension: true
resource:
properties:
schema_url:
type: keyword
ignore_above: 1024
dropped_attributes_count:
type: long
attributes:
type: passthrough
dynamic: true
priority: 40
time_series_dimension: true
start_timestamp:
type: date
metric:
properties:
name:
type: keyword
time_series_dimension: true
value:
properties:
long:
type: long
double:
type: double
unit:
type: keyword
time_series_dimension: true
ignore_above: 1024
temporality:
type: keyword
time_series_dimension: true
ignore_above: 1024
type:
type: keyword
time_series_dimension: true
ignore_above: 1024

View file

@ -0,0 +1,7 @@
# "version" holds the version of the templates and ingest pipelines installed.
# This must be increased whenever an existing template is
# changed, in order for it to be updated on Elasticsearch upgrade.
version: 1
index-templates:
- metricsdb@template

View file

@ -38,6 +38,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.IDP_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INDEX_LIFECYCLE_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.INFERENCE_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.INFERENCE_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.LOGSTASH_MANAGEMENT_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.METRICSDB_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.MONITORING_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.OTEL_ORIGIN;
@ -158,6 +159,7 @@ public final class AuthorizationUtils {
case PROFILING_ORIGIN: case PROFILING_ORIGIN:
case APM_ORIGIN: case APM_ORIGIN:
case OTEL_ORIGIN: case OTEL_ORIGIN:
case METRICSDB_ORIGIN:
case STACK_ORIGIN: case STACK_ORIGIN:
case SEARCHABLE_SNAPSHOTS_ORIGIN: case SEARCHABLE_SNAPSHOTS_ORIGIN:
case LOGSTASH_MANAGEMENT_ORIGIN: case LOGSTASH_MANAGEMENT_ORIGIN: