mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
Copy metrics and default_metric properties when downsampling aggregate_metric_double (#121727)
Fixes #119696 and #96076
This commit is contained in:
parent
56cac1bfe9
commit
e3e474c337
5 changed files with 154 additions and 6 deletions
7
docs/changelog/121727.yaml
Normal file
7
docs/changelog/121727.yaml
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
pr: 121727
|
||||||
|
summary: Copy metrics and `default_metric` properties when downsampling `aggregate_metric_double`
|
||||||
|
area: Downsampling
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 119696
|
||||||
|
- 96076
|
|
@ -21,6 +21,10 @@ public class DataStreamFeatures implements FeatureSpecification {
|
||||||
|
|
||||||
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");
|
public static final NodeFeature DATA_STREAM_FAILURE_STORE_TSDB_FIX = new NodeFeature("data_stream.failure_store.tsdb_fix");
|
||||||
|
|
||||||
|
public static final NodeFeature DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX = new NodeFeature(
|
||||||
|
"data_stream.downsample.default_aggregate_metric_fix"
|
||||||
|
);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<NodeFeature> getFeatures() {
|
public Set<NodeFeature> getFeatures() {
|
||||||
return Set.of();
|
return Set.of();
|
||||||
|
@ -28,6 +32,6 @@ public class DataStreamFeatures implements FeatureSpecification {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<NodeFeature> getTestFeatures() {
|
public Set<NodeFeature> getTestFeatures() {
|
||||||
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX);
|
return Set.of(DATA_STREAM_FAILURE_STORE_TSDB_FIX, DOWNSAMPLE_AGGREGATE_DEFAULT_METRIC_FIX);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
"downsample aggregate field":
|
||||||
|
- requires:
|
||||||
|
cluster_features: ["data_stream.downsample.default_aggregate_metric_fix"]
|
||||||
|
reason: "#119696 fixed"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.create:
|
||||||
|
index: test
|
||||||
|
body:
|
||||||
|
settings:
|
||||||
|
number_of_shards: 1
|
||||||
|
index:
|
||||||
|
mode: time_series
|
||||||
|
routing_path: [sensor_id]
|
||||||
|
time_series:
|
||||||
|
start_time: 2021-04-28T00:00:00Z
|
||||||
|
end_time: 2021-04-29T00:00:00Z
|
||||||
|
mappings:
|
||||||
|
properties:
|
||||||
|
"@timestamp":
|
||||||
|
type: date
|
||||||
|
sensor_id:
|
||||||
|
type: keyword
|
||||||
|
time_series_dimension: true
|
||||||
|
temperature:
|
||||||
|
type: aggregate_metric_double
|
||||||
|
metrics: [min, sum, value_count]
|
||||||
|
default_metric: sum
|
||||||
|
time_series_metric: gauge
|
||||||
|
- do:
|
||||||
|
bulk:
|
||||||
|
refresh: true
|
||||||
|
index: test
|
||||||
|
body:
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-28T18:00:00Z", "sensor_id": "1", "temperature": {"min": 24.7, "sum": 50.2, "value_count": 2}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-28T18:30:00Z", "sensor_id": "1", "temperature": {"min": 24.2, "sum": 73.8, "value_count": 3}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-28T19:00:00Z", "sensor_id": "1", "temperature": {"min": 25.1, "sum": 51.0, "value_count": 2}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-28T19:30:00Z", "sensor_id": "1", "temperature": {"min": 24.8, "sum": 24.8, "value_count": 1}}'
|
||||||
|
- '{"index": {}}'
|
||||||
|
- '{"@timestamp": "2021-04-28T20:00:00Z", "sensor_id": "1", "temperature": {"min": 24.6, "sum": 49.1, "value_count": 2}}'
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.put_settings:
|
||||||
|
index: test
|
||||||
|
body:
|
||||||
|
index.blocks.write: true
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.downsample:
|
||||||
|
index: test
|
||||||
|
target_index: test-downsample
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"fixed_interval": "1h"
|
||||||
|
}
|
||||||
|
- is_true: acknowledged
|
||||||
|
|
||||||
|
- do:
|
||||||
|
search:
|
||||||
|
index: test-downsample
|
||||||
|
body:
|
||||||
|
size: 0
|
||||||
|
|
||||||
|
- match:
|
||||||
|
hits.total.value: 3
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.get_mapping:
|
||||||
|
index: test-downsample
|
||||||
|
- match:
|
||||||
|
test-downsample.mappings.properties.temperature:
|
||||||
|
type: aggregate_metric_double
|
||||||
|
metrics: [min, sum, value_count]
|
||||||
|
default_metric: sum
|
||||||
|
time_series_metric: gauge
|
|
@ -91,6 +91,7 @@ import java.time.format.DateTimeFormatter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -739,6 +740,39 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
||||||
.endObject();
|
.endObject();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// public for testing
|
||||||
|
public record AggregateMetricDoubleFieldSupportedMetrics(String defaultMetric, List<String> supportedMetrics) {}
|
||||||
|
|
||||||
|
// public for testing
|
||||||
|
public static AggregateMetricDoubleFieldSupportedMetrics getSupportedMetrics(
|
||||||
|
final TimeSeriesParams.MetricType metricType,
|
||||||
|
final Map<String, ?> fieldProperties
|
||||||
|
) {
|
||||||
|
boolean sourceIsAggregate = fieldProperties.get("type").equals(AggregateMetricDoubleFieldMapper.CONTENT_TYPE);
|
||||||
|
List<String> supportedAggs = List.of(metricType.supportedAggs());
|
||||||
|
|
||||||
|
if (sourceIsAggregate) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
List<String> currentAggs = (List<String>) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.METRICS);
|
||||||
|
supportedAggs = supportedAggs.stream().filter(currentAggs::contains).toList();
|
||||||
|
}
|
||||||
|
|
||||||
|
assert supportedAggs.size() > 0;
|
||||||
|
|
||||||
|
String defaultMetric = "max";
|
||||||
|
if (supportedAggs.contains(defaultMetric) == false) {
|
||||||
|
defaultMetric = supportedAggs.get(0);
|
||||||
|
}
|
||||||
|
if (sourceIsAggregate) {
|
||||||
|
defaultMetric = Objects.requireNonNullElse(
|
||||||
|
(String) fieldProperties.get(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC),
|
||||||
|
defaultMetric
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new AggregateMetricDoubleFieldSupportedMetrics(defaultMetric, supportedAggs);
|
||||||
|
}
|
||||||
|
|
||||||
private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
|
private static void addMetricFieldMapping(final XContentBuilder builder, final String field, final Map<String, ?> fieldProperties)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
|
final TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.fromString(
|
||||||
|
@ -752,12 +786,11 @@ public class TransportDownsampleAction extends AcknowledgedTransportMasterNodeAc
|
||||||
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
|
builder.field(fieldProperty, fieldProperties.get(fieldProperty));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
final String[] supportedAggsArray = metricType.supportedAggs();
|
var supported = getSupportedMetrics(metricType, fieldProperties);
|
||||||
// We choose max as the default metric
|
|
||||||
final String defaultMetric = List.of(supportedAggsArray).contains("max") ? "max" : supportedAggsArray[0];
|
|
||||||
builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
|
builder.field("type", AggregateMetricDoubleFieldMapper.CONTENT_TYPE)
|
||||||
.array(AggregateMetricDoubleFieldMapper.Names.METRICS, supportedAggsArray)
|
.stringListField(AggregateMetricDoubleFieldMapper.Names.METRICS, supported.supportedMetrics)
|
||||||
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, defaultMetric)
|
.field(AggregateMetricDoubleFieldMapper.Names.DEFAULT_METRIC, supported.defaultMetric)
|
||||||
.field(TIME_SERIES_METRIC_PARAM, metricType);
|
.field(TIME_SERIES_METRIC_PARAM, metricType);
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
|
|
|
@ -13,12 +13,16 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.IndexSettings;
|
import org.elasticsearch.index.IndexSettings;
|
||||||
import org.elasticsearch.index.IndexVersion;
|
import org.elasticsearch.index.IndexVersion;
|
||||||
|
import org.elasticsearch.index.mapper.TimeSeriesParams;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
import org.elasticsearch.xpack.core.ilm.LifecycleSettings;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
|
||||||
public class TransportDownsampleActionTests extends ESTestCase {
|
public class TransportDownsampleActionTests extends ESTestCase {
|
||||||
public void testCopyIndexMetadata() {
|
public void testCopyIndexMetadata() {
|
||||||
// GIVEN
|
// GIVEN
|
||||||
|
@ -107,4 +111,25 @@ public class TransportDownsampleActionTests extends ESTestCase {
|
||||||
settings.get(IndexMetadata.SETTING_CREATION_DATE)
|
settings.get(IndexMetadata.SETTING_CREATION_DATE)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testGetSupportedMetrics() {
|
||||||
|
TimeSeriesParams.MetricType metricType = TimeSeriesParams.MetricType.GAUGE;
|
||||||
|
Map<String, Object> fieldProperties = Map.of(
|
||||||
|
"type",
|
||||||
|
"aggregate_metric_double",
|
||||||
|
"metrics",
|
||||||
|
List.of("max", "sum"),
|
||||||
|
"default_metric",
|
||||||
|
"sum"
|
||||||
|
);
|
||||||
|
|
||||||
|
var supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
|
||||||
|
assertThat(supported.defaultMetric(), is("sum"));
|
||||||
|
assertThat(supported.supportedMetrics(), is(List.of("max", "sum")));
|
||||||
|
|
||||||
|
fieldProperties = Map.of("type", "integer");
|
||||||
|
supported = TransportDownsampleAction.getSupportedMetrics(metricType, fieldProperties);
|
||||||
|
assertThat(supported.defaultMetric(), is("max"));
|
||||||
|
assertThat(supported.supportedMetrics(), is(List.of(metricType.supportedAggs())));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue