Effort to fix testDataStreamLifecycleDownsampleRollingRestart #123769 (#125478)

This commit is contained in:
Mary Gouseti 2025-03-28 15:26:09 +02:00 committed by GitHub
parent 67798dd25b
commit 1943844d5a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 55 additions and 42 deletions

View file

@ -11,21 +11,19 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.rollover.RolloverAction;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.downsample.DownsampleConfig;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
@ -33,15 +31,14 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices;
import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4)
public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class);
public static final int DOC_COUNT = 50_000;
public static final int DOC_COUNT = 25_000;
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
@ -55,7 +52,6 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
return settings.build();
}
@TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging")
public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception {
final InternalTestCluster cluster = internalCluster();
cluster.startMasterOnlyNodes(1);
@ -88,38 +84,38 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase {
// testing so DSL doesn't have to wait for the end_time to lapse)
putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle);
client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet();
String sourceIndex = getBackingIndices(client(), dataStreamName).get(0);
final String targetIndex = "downsample-5m-" + sourceIndex;
// DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
// downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution.
long sleepTime = randomLongBetween(3000, 4500);
logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime);
Thread.sleep(sleepTime);
List<String> backingIndices = getBackingIndices(client(), dataStreamName);
// first generation index
String sourceIndex = backingIndices.get(0);
/**
* DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts
* downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index.
*/
logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption.");
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8));
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() {
});
logger.info("-> Starting the disruption.");
internalCluster().rollingRestart(new InternalTestCluster.RestartCallback());
// if the source index has already been downsampled and moved into the data stream just use its name directly
final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex;
assertBusy(() -> {
try {
GetSettingsResponse getSettingsResponse = cluster.client()
.admin()
.indices()
.getSettings(
new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
)
.actionGet();
Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex);
assertThat(indexSettings, is(notNullValue()));
assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS));
assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings));
} catch (Exception e) {
throw new AssertionError(e);
}
}, 120, TimeUnit.SECONDS);
ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120));
ensureGreen(targetIndex);
logger.info("-> Relocation has finished");
}
private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) {
final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class);
final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> {
final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex);
if (indexMetadata == null) {
return false;
}
var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings());
if (expectedStatus == downsamplingStatus) {
logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus);
return true;
}
return false;
});
safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS);
}
}