Improve resiliency of UpdateTimeSeriesRangeService (#126637)

If updating the `index.time_series.end_time` fails for one data stream,
then UpdateTimeSeriesRangeService should continue updating this setting for other data streams.

The following error was observed in the wild:

```
[2025-04-07T08:50:39,698][WARN ][o.e.d.UpdateTimeSeriesRangeService] [node-01] failed to update tsdb data stream end times
java.lang.IllegalArgumentException: [index.time_series.end_time] requires [index.mode=time_series]
        at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:636) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.index.IndexSettings$1.validate(IndexSettings.java:619) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.common.settings.Setting.get(Setting.java:563) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.common.settings.Setting.get(Setting.java:535) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService.updateTimeSeriesTemporalRange(UpdateTimeSeriesRangeService.java:111) ~[?:?]
        at org.elasticsearch.datastreams.UpdateTimeSeriesRangeService$UpdateTimeSeriesExecutor.execute(UpdateTimeSeriesRangeService.java:210) ~[?:?]
        at org.elasticsearch.cluster.service.MasterService.innerExecuteTasks(MasterService.java:1075) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:1038) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService.executeAndPublishBatch(MasterService.java:245) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.lambda$run$2(MasterService.java:1691) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService$BatchingTaskQueue$Processor.run(MasterService.java:1688) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService$5.lambda$doRun$0(MasterService.java:1283) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.action.ActionListener.run(ActionListener.java:452) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.cluster.service.MasterService$5.doRun(MasterService.java:1262) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023) ~[elasticsearch-8.17.3.jar:?]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27) ~[elasticsearch-8.17.3.jar:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
        at java.lang.Thread.run(Thread.java:1575) ~[?:?]
```

Which resulted in a situation, that causes the `index.time_series.end_time` index setting not being updated for any data stream. This then caused data loss as metrics couldn't be indexed, because no suitable backing index could be resolved:

```
the document timestamp [2025-03-26T15:26:10.000Z] is outside of ranges of currently writable indices [[2025-01-31T07:22:43.000Z,2025-02-15T07:24:06.000Z][2025-02-15T07:24:06.000Z,2025-03-02T07:34:07.000Z][2025-03-02T07:34:07.000Z,2025-03-10T12:45:37.000Z][2025-03-10T12:45:37.000Z,2025-03-10T14:30:37.000Z][2025-03-10T14:30:37.000Z,2025-03-25T12:50:40.000Z][2025-03-25T12:50:40.000Z,2025-03-25T14:35:40.000Z
```
This commit is contained in:
Martijn van Groningen 2025-04-11 12:58:10 +02:00 committed by GitHub
parent 6da2f34f70
commit 6012590929
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 141 additions and 19 deletions

View file

@ -0,0 +1,5 @@
pr: 126637
summary: Improve resiliency of `UpdateTimeSeriesRangeService`
area: TSDB
type: bug
issues: []

View file

@ -127,14 +127,14 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
// getWriteIndex() selects the latest added index:
Index head = dataStream.getWriteIndex();
IndexMetadata im = project.getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
try {
try {
IndexMetadata im = project.getIndexSafe(head);
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
TimeValue lookAheadTime = DataStreamsPlugin.getLookAheadTime(im.getSettings());
Instant newEnd = DataStream.getCanonicalTimestampBound(
now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS)
);
if (newEnd.isAfter(currentEnd)) {
Settings settings = Settings.builder()
.put(IndexSettings.TIME_SERIES_END_TIME.getKey(), DEFAULT_DATE_TIME_FORMATTER.format(newEnd))
.build();
@ -151,17 +151,17 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
mBuilder.updateSettings(settings, head.getName());
// Verify that all temporal ranges of each backing index is still valid:
dataStream.validate(mBuilder::get);
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
} catch (Exception e) {
LOGGER.error(
() -> format(
"unable to update [%s] for data stream [%s] and backing index [%s]",
IndexSettings.TIME_SERIES_END_TIME.getKey(),
dataStream.getName(),
head.getName()
),
e
);
}
}
return mBuilder;

View file

@ -8,12 +8,19 @@
*/
package org.elasticsearch.datastreams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
@ -23,15 +30,22 @@ import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.cluster.metadata.DataStream.getDefaultBackingIndexName;
import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.createIndexMetadata;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
@ -42,6 +56,22 @@ import static org.mockito.Mockito.when;
public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
static MockAppender appender;
static Logger testLogger1 = LogManager.getLogger(UpdateTimeSeriesRangeService.class);
@BeforeClass
public static void classInit() throws IllegalAccessException {
appender = new MockAppender("mock_appender");
appender.start();
Loggers.addAppender(testLogger1, appender);
}
@AfterClass
public static void classCleanup() {
Loggers.removeAppender(testLogger1, appender);
appender.stop();
}
private ThreadPool threadPool;
private UpdateTimeSeriesRangeService instance;
@ -199,6 +229,70 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(start));
}
public void testUpdateTimeSeriesTemporalOneBadDataStream() {
String dataStreamName1 = "logs-app1";
String dataStreamName2 = "logs-app2-broken";
String dataStreamName3 = "logs-app3";
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Instant start = now.minus(90, ChronoUnit.MINUTES);
Instant end = start.plus(30, ChronoUnit.MINUTES);
final var projectId = randomProjectIdOrDefault();
ProjectMetadata.Builder mbBuilder = ProjectMetadata.builder(projectId);
for (String dataStreamName : List.of(dataStreamName1, dataStreamName2, dataStreamName3)) {
DataStreamTestHelper.getClusterStateWithDataStream(mbBuilder, dataStreamName, List.of(new Tuple<>(start, end)));
}
Settings settings = Settings.builder().put("index.mode", "logsdb").build();
var im = createIndexMetadata(getDefaultBackingIndexName(dataStreamName2, 2, start.toEpochMilli()), true, settings, 0);
mbBuilder.put(im, true);
var ds2 = mbBuilder.dataStreamMetadata().dataStreams().get(dataStreamName2);
var ds2Indices = new ArrayList<>(ds2.getIndices());
ds2Indices.add(im.getIndex());
var copy = new HashMap<>(mbBuilder.dataStreamMetadata().dataStreams());
copy.put(
dataStreamName2,
new DataStream(
ds2.getName(),
ds2Indices,
2,
ds2.getMetadata(),
ds2.isHidden(),
ds2.isReplicated(),
ds2.isSystem(),
ds2.isAllowCustomRouting(),
ds2.getIndexMode(),
ds2.getDataLifecycle(),
ds2.getDataStreamOptions(),
ds2.getFailureIndices(),
ds2.rolloverOnWrite(),
ds2.getAutoShardingEvent()
)
);
mbBuilder.dataStreams(copy, Map.of());
now = now.minus(45, ChronoUnit.MINUTES);
ClusterState before = ClusterState.builder(ClusterState.EMPTY_STATE).putProjectMetadata(mbBuilder).build();
ClusterState result = instance.updateTimeSeriesTemporalRange(before, now);
assertThat(result, not(sameInstance(before)));
final var project = result.getMetadata().getProject(projectId);
final var expectedEndTime = now.plus(35, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.SECONDS);
assertThat(getEndTime(project, dataStreamName1, 0), equalTo(expectedEndTime));
assertThat(getEndTime(project, dataStreamName2, 0), equalTo(end)); // failed to update end_time, because broken data stream
assertThat(getEndTime(project, dataStreamName3, 0), equalTo(expectedEndTime));
String message = appender.getLastEventAndReset().getMessage().getFormattedMessage();
assertThat(
message,
equalTo(
"unable to update [index.time_series.end_time] for data stream [logs-app2-broken] and "
+ "backing index ["
+ im.getIndex().getName()
+ "]"
)
);
}
public void testUpdateTimeSeriesTemporalRange_multipleProjects() {
String dataStreamName = "logs-app1";
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
@ -253,4 +347,27 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
return IndexSettings.TIME_SERIES_START_TIME.get(indexSettings);
}
static class MockAppender extends AbstractAppender {
public LogEvent lastEvent;
MockAppender(final String name) throws IllegalAccessException {
super(name, RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null, false);
}
@Override
public void append(LogEvent event) {
lastEvent = event.toImmutable();
}
Message lastMessage() {
return lastEvent.getMessage();
}
public LogEvent getLastEventAndReset() {
LogEvent toReturn = lastEvent;
lastEvent = null;
return toReturn;
}
}
}

View file

@ -617,7 +617,7 @@ public final class DataStreamTestHelper {
builder.put(dataStreamBuilder.build());
}
private static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
public static IndexMetadata createIndexMetadata(String name, boolean hidden, Settings settings, int replicas) {
Settings.Builder b = Settings.builder()
.put(settings)
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())