From e3e474c337b41f7ada6f0b5d16f1686653502be4 Mon Sep 17 00:00:00 2001 From: Jordan Powers Date: Wed, 5 Feb 2025 15:32:14 -0800 Subject: [PATCH] Copy metrics and default_metric properties when downsampling aggregate_metric_double (#121727) Fixes #119696 and #96076 --- docs/changelog/121727.yaml | 7 ++ .../datastreams/DataStreamFeatures.java | 6 +- .../downsample/80_downsample_aggregate.yml | 79 +++++++++++++++++++ .../downsample/TransportDownsampleAction.java | 43 ++++++++-- .../TransportDownsampleActionTests.java | 25 ++++++ 5 files changed, 154 insertions(+), 6 deletions(-) create mode 100644 docs/changelog/121727.yaml create mode 100644 x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/80_downsample_aggregate.yml diff --git a/docs/changelog/121727.yaml b/docs/changelog/121727.yaml new file mode 100644 index 000000000000..80c0a5eae433 --- /dev/null +++ b/docs/changelog/121727.yaml @@ -0,0 +1,7 @@ +pr: 121727 +summary: Copy metrics and `default_metric` properties when downsampling `aggregate_metric_double` +area: Downsampling +type: bug +issues: + - 119696 + - 96076 diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java index 8026ec641d04..506c107b382a 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamFeatures.java @@ -21,6 +21,10 @@ public class DataStreamFeatures implements FeatureSpecification { public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix"); + public static final NodeFeature DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX = new NodeFeature( + "data_stream.downsample.default_aggregate_metric_fix" + ); + @Override public Set getFeatures() { return Set.of(); @@ -28,6 +32,6 @@ public class DataStreamFeatures implements FeatureSpecification { @Override public Set getTestFeatures() { - return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX); + return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX); } } diff --git a/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/80_downsample_aggregate.yml b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/80_downsample_aggregate.yml new file mode 100644 index 000000000000..991aa3858d8b --- /dev/null +++ b/x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/80_downsample_aggregate.yml @@ -0,0 +1,79 @@ +"downsample aggregate field": + - requires: + cluster_features: ["data_stream.downsample.default_aggregate_metric_fix"] + reason: "#119696 fixed" + + - do: + indices.create: + index: test + body: + settings: + number_of_shards: 1 + index: + mode: time_series + routing_path: [sensor_id] + time_series: + start_time: 2021-04-28T00:00:00Z + end_time: 2021-04-29T00:00:00Z + mappings: + properties: + "@timestamp": + type: date + sensor_id: + type: keyword + time_series_dimension: true + temperature: + type: aggregate_metric_double + metrics: [min, sum, value_count] + default_metric: sum + time_series_metric: gauge + - do: + bulk: + refresh: true + index: test + body: + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:00:00Z", "sensor_id": "1", "temperature": {"min": 24.7, "sum": 50.2, "value_count": 2}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T18:30:00Z", "sensor_id": "1", "temperature": {"min": 24.2, "sum": 73.8, "value_count": 3}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:00:00Z", "sensor_id": "1", "temperature": {"min": 25.1, "sum": 51.0, "value_count": 2}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T19:30:00Z", "sensor_id": "1", "temperature": {"min": 24.8, "sum": 24.8, "value_count": 1}}' + - '{"index": {}}' + - '{"@timestamp": "2021-04-28T20:00:00Z", "sensor_id": "1", "temperature": {"min": 24.6, "sum": 49.1, "value_count": 2}}' + + - do: + indices.put_settings: + index: test + body: + index.blocks.write: true + + - do: + indices.downsample: + index: test + target_index: test-downsample + body: > + { + "fixed_interval": "1h" + } + - is_true: acknowledged + + - do: + search: + index: test-downsample + body: + size: 0 + + - match: + hits.total.value: 3 + + - do: + indices.get_mapping: + index: test-downsample + - match: + test-downsample.mappings.properties.temperature: + type: aggregate_metric_double + metrics: [min, sum, value_count] + default_metric: sum + time_series_metric: gauge diff --git a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index ccaed615afcb..2c08dcd9017f 100644 --- a/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -91,6 +91,7 @@ import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -739,6 +740,39 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc .endObject(); } + // public for testing + public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List supportedMetrics) {} + + // public for testing + public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics( + final TimeSeriesParams.MetricType metricType, + final Map fieldProperties + ) { + boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE); + List supportedAggs = List.of(metricType.supportedAggs()); + + if (sourceIsAggregate) { + @SuppressWarnings("unchecked") + List currentAggs = (List) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.METRICS); + supportedAggs = supportedAggs.stream().filter(currentAggs::contains).toList(); + } + + assert supportedAggs.size() > 0; + + String defaultMetric = "max"; + if (supportedAggs.contains(defaultMetric) == false) { + defaultMetric = supportedAggs.get(0); + } + if (sourceIsAggregate) { + defaultMetric = Objects.requireNonNullElse( + (String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC), + defaultMetric + ); + } + + return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs); + } + private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map fieldProperties) throws IOException { final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString( @@ -752,12 +786,11 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc builder.field(fieldProperty, fieldProperties.get(fieldProperty)); } } else { - final String[] supportedAggsArray = metricType.supportedAggs(); - // We choose max as the default metric - final String defaultMetric = List.of(supportedAggsArray).contains("max") ? "max" : supportedAggsArray[0]; + var supported = getSupportedMetrics(metricType, fieldProperties); + builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE) - .array(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedAggsArray) - .field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, defaultMetric) + .stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics) + .field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric) .field(TIME_SERIES_METRIC_PARAM, metricType); } builder.endObject(); diff --git a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java index fb699fd7c341..1b2cc32e12a6 100644 --- a/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java +++ b/x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/TransportDownsampleActionTests.java @@ -13,12 +13,16 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; +import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ilm.LifecycleSettings; import java.util.List; +import java.util.Map; import java.util.UUID; +import static org.hamcrest.Matchers.is; + public class TransportDownsampleActionTests extends ESTestCase { public void testCopyIndexMetadata() { // GIVEN @@ -107,4 +111,25 @@ public class TransportDownsampleActionTests extends ESTestCase { settings.get(IndexMetadata.SETTING_CREATION_DATE) ); } + + public void testGetSupportedMetrics() { + TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.GAUGE; + Map fieldProperties = Map.of( + "type", + "aggregate_metric_double", + "metrics", + List.of("max", "sum"), + "default_metric", + "sum" + ); + + var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties); + assertThat(supported.defaultMetric(), is("sum")); + assertThat(supported.supportedMetrics(), is(List.of("max", "sum"))); + + fieldProperties = Map.of("type", "integer"); + supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties); + assertThat(supported.defaultMetric(), is("max")); + assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs()))); + } }