Refactor data stream lifecycle to use the template paradigm (#124593)

This commit is contained in:
Mary Gouseti 2025-03-18 13:24:06 +02:00 committed by GitHub
parent 9f76a7f32d
commit ce04da7dea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
46 changed files with 1185 additions and 793 deletions

View file

@ -64,14 +64,12 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
ensureGreen();
final String dataStreamName = "metrics-foo";
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder()
.downsampling(
new DataStreamLifecycle.Downsampling(
List.of(
new DataStreamLifecycle.Downsampling.Round(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
)
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
)
)
)

View file

@ -12,7 +12,6 @@ import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.datastreams.lifecycle.PutDataStreamLifecycleAction;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
@ -55,12 +54,16 @@ public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
public void testDownsampling() throws Exception {
String dataStreamName = "metrics-foo";
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder()
.downsampling(
new Downsampling(
List.of(
new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))),
new Downsampling.Round(TimeValue.timeValueSeconds(10), new DownsampleConfig(new DateHistogramInterval("10m")))
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
),
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueSeconds(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
)
)
@ -124,14 +127,18 @@ public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
public void testDownsamplingOnlyExecutesTheLastMatchingRound() throws Exception {
String dataStreamName = "metrics-bar";
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder()
.downsampling(
new Downsampling(
List.of(
new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))),
// data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
// least 2 seconds since rollover. only the 10 seconds round should be executed.
new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10m")))
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
),
// data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
// least 2 seconds since rollover. only the 10 seconds round should be executed.
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
)
)
@ -188,14 +195,18 @@ public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
// we expect the earlier round to be ignored
String dataStreamName = "metrics-baz";
DataStreamLifecycle lifecycle = DataStreamLifecycle.newBuilder()
DataStreamLifecycle.Template lifecycle = DataStreamLifecycle.Template.builder()
.downsampling(
new Downsampling(
List.of(
new Downsampling.Round(TimeValue.timeValueMillis(0), new DownsampleConfig(new DateHistogramInterval("5m"))),
// data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
// least 2 seconds since rollover. only the 10 seconds round should be executed.
new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("10m")))
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(0),
new DownsampleConfig(new DateHistogramInterval("5m"))
),
// data stream lifecycle runs every 1 second, so by the time we forcemerge the backing index it would've been at
// least 2 seconds since rollover. only the 10 seconds round should be executed.
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("10m"))
)
)
)
@ -248,10 +259,13 @@ public class DataStreamLifecycleDownsampleIT extends ESIntegTestCase {
// update the lifecycle so that it only has one round, for the same `after` parameter as before, but a different interval
// the different interval should yield a different downsample index name so we expect the data stream lifecycle to get the previous
// `10s` interval downsample index, downsample it to `30s` and replace it in the data stream instead of the `10s` one.
DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.newBuilder()
DataStreamLifecycle updatedLifecycle = DataStreamLifecycle.builder()
.downsampling(
new Downsampling(
List.of(new Downsampling.Round(TimeValue.timeValueMillis(10), new DownsampleConfig(new DateHistogramInterval("20m"))))
List.of(
new DataStreamLifecycle.DownsamplingRound(
TimeValue.timeValueMillis(10),
new DownsampleConfig(new DateHistogramInterval("20m"))
)
)
)
.build();

View file

@ -70,7 +70,7 @@ public class DataStreamLifecycleDriver {
String dataStreamName,
@Nullable String startTime,
@Nullable String endTime,
DataStreamLifecycle lifecycle,
DataStreamLifecycle.Template lifecycle,
int docCount,
String firstDocTimestamp
) throws IOException {
@ -94,7 +94,7 @@ public class DataStreamLifecycleDriver {
String pattern,
@Nullable String startTime,
@Nullable String endTime,
DataStreamLifecycle lifecycle
DataStreamLifecycle.Template lifecycle
) throws IOException {
Settings.Builder settings = indexSettings(1, 0).put(IndexSettings.MODE.getKey(), IndexMode.TIME_SERIES)
.putList(IndexMetadata.INDEX_ROUTING_PATH.getKey(), List.of(FIELD_DIMENSION_1));
@ -138,8 +138,8 @@ public class DataStreamLifecycleDriver {
List<String> patterns,
@Nullable Settings settings,
@Nullable Map<String, Object> metadata,
@Nullable DataStreamLifecycle lifecycle
) throws IOException {
@Nullable DataStreamLifecycle.Template lifecycle
) {
TransportPutComposableIndexTemplateAction.Request request = new TransportPutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
ComposableIndexTemplate.builder()