mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 15:47:23 -04:00
Added additional index.look_ahead_time validation (#87847)
Added a requirement that index.look_ahead_time index setting can't be lower than time_series.poll_interval setting. Additional changes: * Fixed a mistake in the docs that referenced indices.lifecycle.poll_interval instead of time_series.poll_interval. * Moved index.look_ahead_time setting to data stream module.
This commit is contained in:
parent
c61e4dab18
commit
6ca2e796a9
10 changed files with 172 additions and 17 deletions
|
@ -33,7 +33,8 @@ preview:[] (<<_static_index_settings,Static>>, <<time-units,time units>>)
|
||||||
Interval used to calculate the `index.time_series.end_time` for a TSDS's write
|
Interval used to calculate the `index.time_series.end_time` for a TSDS's write
|
||||||
index. Defaults to `2h` (2 hours). Accepts `1m` (one minute) to `7d` (seven
|
index. Defaults to `2h` (2 hours). Accepts `1m` (one minute) to `7d` (seven
|
||||||
days). Only indices with an `index.mode` of `time_series` support this setting.
|
days). Only indices with an `index.mode` of `time_series` support this setting.
|
||||||
For more information, refer to <<tsds-look-ahead-time>>.
|
For more information, refer to <<tsds-look-ahead-time>>. Additionally this setting
|
||||||
|
can not be less than `time_series.poll_interval` cluster setting.
|
||||||
|
|
||||||
[[index-routing-path]] `index.routing_path`:: preview:[]
|
[[index-routing-path]] `index.routing_path`:: preview:[]
|
||||||
(<<_static_index_settings,Static>>, string or array of strings) Plain `keyword`
|
(<<_static_index_settings,Static>>, string or array of strings) Plain `keyword`
|
||||||
|
|
|
@ -244,12 +244,12 @@ create a new write index for a TSDS, {es} calculates the index's
|
||||||
|
|
||||||
`now + index.look_ahead_time`
|
`now + index.look_ahead_time`
|
||||||
|
|
||||||
At the <<indices-lifecycle-poll-interval,`indices.lifecycle.poll_interval`>>,
|
At the time series poll interval (controlled via `time_series.poll_interval` setting),
|
||||||
{es} checks if the write index has met the rollover criteria in its index
|
{es} checks if the write index has met the rollover criteria in its index
|
||||||
lifecycle policy. If not, {es} refreshes the `now` value and updates the write
|
lifecycle policy. If not, {es} refreshes the `now` value and updates the write
|
||||||
index's `index.time_series.end_time` to:
|
index's `index.time_series.end_time` to:
|
||||||
|
|
||||||
`now + index.look_ahead_time + indices.lifecycle.poll_interval`
|
`now + index.look_ahead_time + time_series.poll_interval`
|
||||||
|
|
||||||
This process continues until the write index rolls over. When the index rolls
|
This process continues until the write index rolls over. When the index rolls
|
||||||
over, {es} sets a final `index.time_series.end_time` value for the index. This
|
over, {es} sets a final `index.time_series.end_time` value for the index. This
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class DataStreamIndexSettingsProvider implements IndexSettingProvider {
|
||||||
if (indexMode != null) {
|
if (indexMode != null) {
|
||||||
if (indexMode == IndexMode.TIME_SERIES) {
|
if (indexMode == IndexMode.TIME_SERIES) {
|
||||||
Settings.Builder builder = Settings.builder();
|
Settings.Builder builder = Settings.builder();
|
||||||
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
|
TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(allSettings);
|
||||||
final Instant start;
|
final Instant start;
|
||||||
final Instant end;
|
final Instant end;
|
||||||
if (dataStream == null || migrating) {
|
if (dataStream == null || migrating) {
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.datastreams;
|
package org.elasticsearch.datastreams;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.action.ActionRequest;
|
import org.elasticsearch.action.ActionRequest;
|
||||||
import org.elasticsearch.action.ActionResponse;
|
import org.elasticsearch.action.ActionResponse;
|
||||||
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
|
import org.elasticsearch.action.datastreams.CreateDataStreamAction;
|
||||||
|
@ -57,6 +58,7 @@ import org.elasticsearch.xcontent.NamedXContentRegistry;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Locale;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class DataStreamsPlugin extends Plugin implements ActionPlugin {
|
public class DataStreamsPlugin extends Plugin implements ActionPlugin {
|
||||||
|
@ -70,13 +72,38 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
|
||||||
Setting.Property.Dynamic
|
Setting.Property.Dynamic
|
||||||
);
|
);
|
||||||
|
|
||||||
|
public static final Setting<TimeValue> LOOK_AHEAD_TIME = Setting.timeSetting(
|
||||||
|
"index.look_ahead_time",
|
||||||
|
TimeValue.timeValueHours(2),
|
||||||
|
TimeValue.timeValueMinutes(1),
|
||||||
|
TimeValue.timeValueDays(7),
|
||||||
|
Setting.Property.IndexScope,
|
||||||
|
Setting.Property.Dynamic
|
||||||
|
);
|
||||||
|
// The dependency of index.look_ahead_time is a cluster setting and currently there is no clean validation approach for this:
|
||||||
|
private final SetOnce<UpdateTimeSeriesRangeService> service = new SetOnce<>();
|
||||||
|
|
||||||
|
static void additionalLookAheadTimeValidation(TimeValue lookAhead, TimeValue timeSeriesPollInterval) {
|
||||||
|
if (lookAhead.compareTo(timeSeriesPollInterval) < 0) {
|
||||||
|
final String message = String.format(
|
||||||
|
Locale.ROOT,
|
||||||
|
"failed to parse value%s for setting [%s], must be lower than setting [%s] which is [%s]",
|
||||||
|
" [" + lookAhead.getStringRep() + "]",
|
||||||
|
LOOK_AHEAD_TIME.getKey(),
|
||||||
|
TIME_SERIES_POLL_INTERVAL.getKey(),
|
||||||
|
timeSeriesPollInterval.getStringRep()
|
||||||
|
);
|
||||||
|
throw new IllegalArgumentException(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<Setting<?>> getSettings() {
|
public List<Setting<?>> getSettings() {
|
||||||
if (IndexSettings.isTimeSeriesModeEnabled() == false) {
|
if (IndexSettings.isTimeSeriesModeEnabled() == false) {
|
||||||
return List.of();
|
return List.of();
|
||||||
}
|
}
|
||||||
|
|
||||||
return List.of(TIME_SERIES_POLL_INTERVAL);
|
return List.of(TIME_SERIES_POLL_INTERVAL, LOOK_AHEAD_TIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -98,6 +125,7 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
|
||||||
}
|
}
|
||||||
|
|
||||||
var service = new UpdateTimeSeriesRangeService(environment.settings(), threadPool, clusterService);
|
var service = new UpdateTimeSeriesRangeService(environment.settings(), threadPool, clusterService);
|
||||||
|
this.service.set(service);
|
||||||
return List.of(service);
|
return List.of(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,6 +151,11 @@ public class DataStreamsPlugin extends Plugin implements ActionPlugin {
|
||||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||||
Supplier<DiscoveryNodes> nodesInCluster
|
Supplier<DiscoveryNodes> nodesInCluster
|
||||||
) {
|
) {
|
||||||
|
indexScopedSettings.addSettingsUpdateConsumer(LOOK_AHEAD_TIME, value -> {
|
||||||
|
TimeValue timeSeriesPollInterval = service.get().pollInterval;
|
||||||
|
additionalLookAheadTimeValidation(value, timeSeriesPollInterval);
|
||||||
|
});
|
||||||
|
|
||||||
var createDsAction = new RestCreateDataStreamAction();
|
var createDsAction = new RestCreateDataStreamAction();
|
||||||
var deleteDsAction = new RestDeleteDataStreamAction();
|
var deleteDsAction = new RestDeleteDataStreamAction();
|
||||||
var getDsAction = new RestGetDataStreamsAction();
|
var getDsAction = new RestGetDataStreamsAction();
|
||||||
|
|
|
@ -107,7 +107,7 @@ public class UpdateTimeSeriesRangeService extends AbstractLifecycleComponent imp
|
||||||
Index head = dataStream.getWriteIndex();
|
Index head = dataStream.getWriteIndex();
|
||||||
IndexMetadata im = current.metadata().getIndexSafe(head);
|
IndexMetadata im = current.metadata().getIndexSafe(head);
|
||||||
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
|
Instant currentEnd = IndexSettings.TIME_SERIES_END_TIME.get(im.getSettings());
|
||||||
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(im.getSettings());
|
TimeValue lookAheadTime = DataStreamsPlugin.LOOK_AHEAD_TIME.get(im.getSettings());
|
||||||
Instant newEnd = now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS);
|
Instant newEnd = now.plus(lookAheadTime.getMillis(), ChronoUnit.MILLIS).plus(pollInterval.getMillis(), ChronoUnit.MILLIS);
|
||||||
if (newEnd.isAfter(currentEnd)) {
|
if (newEnd.isAfter(currentEnd)) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||||
|
* or more contributor license agreements. Licensed under the Elastic License
|
||||||
|
* 2.0 and the Server Side Public License, v 1; you may not use this file except
|
||||||
|
* in compliance with, at your election, the Elastic License 2.0 or the Server
|
||||||
|
* Side Public License, v 1.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.datastreams;
|
||||||
|
|
||||||
|
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||||
|
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
import org.junit.After;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
|
||||||
|
public class LookAHeadTimeTests extends ESSingleNodeTestCase {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<Class<? extends Plugin>> getPlugins() {
|
||||||
|
return List.of(DataStreamsPlugin.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
updateClusterSettings(Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), (String) null).build());
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTimeSeriesPollIntervalSetting() {
|
||||||
|
var settings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "9m").build();
|
||||||
|
updateClusterSettings(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTimeSeriesPollIntervalSettingToLow() {
|
||||||
|
var settings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "1s").build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateClusterSettings(settings));
|
||||||
|
assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [time_series.poll_interval], must be >= [1m]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTimeSeriesPollIntervalSettingToHigh() {
|
||||||
|
Settings settings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "11m").build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateClusterSettings(settings));
|
||||||
|
assertThat(e.getMessage(), equalTo("failed to parse value [11m] for setting [time_series.poll_interval], must be <= [10m]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLookAheadTimeSetting() throws IOException {
|
||||||
|
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "10m").build();
|
||||||
|
updateIndexSettings(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLookAheadTimeSettingToLow() {
|
||||||
|
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "1s").build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
|
||||||
|
assertThat(e.getMessage(), equalTo("failed to parse value [1s] for setting [index.look_ahead_time], must be >= [1m]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLookAheadTimeSettingToHigh() {
|
||||||
|
var settings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "8d").build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
|
||||||
|
assertThat(e.getMessage(), equalTo("failed to parse value [8d] for setting [index.look_ahead_time], must be <= [7d]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLookAheadTimeSettingLowerThanTimeSeriesPollIntervalSetting() {
|
||||||
|
{
|
||||||
|
var settings = Settings.builder()
|
||||||
|
.put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "3m")
|
||||||
|
// default time_series.poll_interval is 5m
|
||||||
|
.build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(settings));
|
||||||
|
assertThat(
|
||||||
|
e.getMessage(),
|
||||||
|
equalTo(
|
||||||
|
"failed to parse value [3m] for setting [index.look_ahead_time], must be lower than setting "
|
||||||
|
+ "[time_series.poll_interval] which is [5m]"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "3m").build();
|
||||||
|
updateClusterSettings(clusterSettings);
|
||||||
|
var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "1m").build();
|
||||||
|
var e = expectThrows(IllegalArgumentException.class, () -> updateIndexSettings(indexSettings));
|
||||||
|
assertThat(
|
||||||
|
e.getMessage(),
|
||||||
|
equalTo(
|
||||||
|
"failed to parse value [1m] for setting [index.look_ahead_time], must be lower than setting "
|
||||||
|
+ "[time_series.poll_interval] which is [3m]"
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLookAheadTimeSettingHigherThanTimeSeriesPollIntervalSetting() throws IOException {
|
||||||
|
var clusterSettings = Settings.builder().put(DataStreamsPlugin.TIME_SERIES_POLL_INTERVAL.getKey(), "10m").build();
|
||||||
|
updateClusterSettings(clusterSettings);
|
||||||
|
var indexSettings = Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), "100m").build();
|
||||||
|
updateIndexSettings(indexSettings);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateClusterSettings(Settings settings) {
|
||||||
|
client().admin().cluster().updateSettings(new ClusterUpdateSettingsRequest().persistentSettings(settings)).actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateIndexSettings(Settings settings) throws IOException {
|
||||||
|
try {
|
||||||
|
createIndex("test");
|
||||||
|
} catch (ResourceAlreadyExistsException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
client().admin().indices().updateSettings(new UpdateSettingsRequest(settings)).actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -107,7 +107,7 @@ public class UpdateTimeSeriesRangeServiceTests extends ESTestCase {
|
||||||
List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
|
List.of(new Tuple<>(start.minus(4, ChronoUnit.HOURS), start), new Tuple<>(start, end))
|
||||||
).getMetadata();
|
).getMetadata();
|
||||||
metadata = Metadata.builder(metadata)
|
metadata = Metadata.builder(metadata)
|
||||||
.updateSettings(Settings.builder().put(IndexSettings.LOOK_AHEAD_TIME.getKey(), lookAHeadTimeMinutes + "m").build())
|
.updateSettings(Settings.builder().put(DataStreamsPlugin.LOOK_AHEAD_TIME.getKey(), lookAHeadTimeMinutes + "m").build())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
var in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
|
var in = ClusterState.builder(ClusterState.EMPTY_STATE).metadata(metadata).build();
|
||||||
|
|
|
@ -188,7 +188,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
||||||
result.add(IndexMetadata.INDEX_ROUTING_PATH);
|
result.add(IndexMetadata.INDEX_ROUTING_PATH);
|
||||||
result.add(IndexSettings.TIME_SERIES_START_TIME);
|
result.add(IndexSettings.TIME_SERIES_START_TIME);
|
||||||
result.add(IndexSettings.TIME_SERIES_END_TIME);
|
result.add(IndexSettings.TIME_SERIES_END_TIME);
|
||||||
result.add(IndexSettings.LOOK_AHEAD_TIME);
|
|
||||||
return Set.copyOf(result);
|
return Set.copyOf(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1915,6 +1915,15 @@ public class Setting<T> implements ToXContentObject {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Setting<TimeValue> timeSetting(
|
||||||
|
String key,
|
||||||
|
TimeValue defaultValue,
|
||||||
|
Validator<TimeValue> validator,
|
||||||
|
Property... properties
|
||||||
|
) {
|
||||||
|
return new Setting<>(key, defaultValue.getStringRep(), (s) -> TimeValue.parseTimeValue(s, key), validator, properties);
|
||||||
|
}
|
||||||
|
|
||||||
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
|
public static Setting<TimeValue> positiveTimeSetting(String key, TimeValue defaultValue, Property... properties) {
|
||||||
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
|
return timeSetting(key, defaultValue, TimeValue.timeValueMillis(0), properties);
|
||||||
}
|
}
|
||||||
|
|
|
@ -538,15 +538,6 @@ public final class IndexSettings {
|
||||||
Property.Final
|
Property.Final
|
||||||
);
|
);
|
||||||
|
|
||||||
public static final Setting<TimeValue> LOOK_AHEAD_TIME = Setting.timeSetting(
|
|
||||||
"index.look_ahead_time",
|
|
||||||
TimeValue.timeValueHours(2),
|
|
||||||
TimeValue.timeValueMinutes(1),
|
|
||||||
TimeValue.timeValueDays(7),
|
|
||||||
Property.IndexScope,
|
|
||||||
Property.Final
|
|
||||||
);
|
|
||||||
|
|
||||||
private final Index index;
|
private final Index index;
|
||||||
private final Version version;
|
private final Version version;
|
||||||
private final Logger logger;
|
private final Logger logger;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue