diff --git a/muted-tests.yml b/muted-tests.yml index 2063ded32821..4f0d1d1deae9 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -242,9 +242,6 @@ tests: - class: org.elasticsearch.action.admin.cluster.node.tasks.CancellableTasksIT method: testChildrenTasksCancelledOnTimeout issue: https://github.com/elastic/elasticsearch/issues/123568 -- class: org.elasticsearch.xpack.downsample.DataStreamLifecycleDownsampleDisruptionIT - method: testDataStreamLifecycleDownsampleRollingRestart - issue: https://github.com/elastic/elasticsearch/issues/123769 - class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests method: testCreateAndRestorePartialSearchableSnapshot issue: https://github.com/elastic/elasticsearch/issues/123773 diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 9d779156e363..c5a49f77e4fd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -2477,9 +2477,19 @@ public abstract class ESTestCase extends LuceneTestCase { * @return The value with which the {@code listener} was completed. */ public static T safeAwait(SubscribableListener listener) { + return safeAwait(listener, SAFE_AWAIT_TIMEOUT.getMillis(), TimeUnit.MILLISECONDS); + } + + /** + * Wait for the successful completion of the given {@link SubscribableListener}, respecting the provided timeout, + * preserving the thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure. + * + * @return The value with which the {@code listener} was completed. + */ + public static T safeAwait(SubscribableListener listener, long timeout, TimeUnit unit) { final var future = new TestPlainActionFuture(); listener.addListener(future); - return safeGet(future); + return safeGet(future, timeout, unit); } /** @@ -2509,8 +2519,18 @@ public abstract class ESTestCase extends LuceneTestCase { * @return The value with which the {@code future} was completed. */ public static T safeGet(Future future) { + return safeGet(future, SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS); + } + + /** + * Wait for the successful completion of the given {@link Future}, respecting the provided timeout, preserving the + * thread's interrupt status flag and converting all exceptions into an {@link AssertionError} to trigger a test failure. + * + * @return The value with which the {@code future} was completed. + */ + public static T safeGet(Future future, long timeout, TimeUnit unit) { try { - return future.get(SAFE_AWAIT_TIMEOUT.millis(), TimeUnit.MILLISECONDS); + return future.get(timeout, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new AssertionError("safeGet: interrupted waiting for SubscribableListener", e); diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java index f45a602af544..a96a7f20a815 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DataStreamLifecycleDownsampleDisruptionIT.java @@ -11,21 +11,19 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.admin.indices.rollover.RolloverAction; import org.elasticsearch.action.admin.indices.rollover.RolloverRequest; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; -import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.downsample.DownsampleConfig; -import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.datastreams.DataStreamsPlugin; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; @@ -33,15 +31,14 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS; import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.getBackingIndices; import static org.elasticsearch.xpack.downsample.DataStreamLifecycleDriver.putTSDBIndexTemplate; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 4) public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { private static final Logger logger = LogManager.getLogger(DataStreamLifecycleDownsampleDisruptionIT.class); - public static final int DOC_COUNT = 50_000; + public static final int DOC_COUNT = 25_000; @Override protected Collection> nodePlugins() { @@ -55,7 +52,6 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { return settings.build(); } - @TestLogging(value = "org.elasticsearch.datastreams.lifecycle:TRACE", reason = "debugging") public void testDataStreamLifecycleDownsampleRollingRestart() throws Exception { final InternalTestCluster cluster = internalCluster(); cluster.startMasterOnlyNodes(1); @@ -88,38 +84,38 @@ public class DataStreamLifecycleDownsampleDisruptionIT extends ESIntegTestCase { // testing so DSL doesn't have to wait for the end_time to lapse) putTSDBIndexTemplate(client(), dataStreamName, null, null, lifecycle); client().execute(RolloverAction.INSTANCE, new RolloverRequest(dataStreamName, null)).actionGet(); + String sourceIndex = getBackingIndices(client(), dataStreamName).get(0); + final String targetIndex = "downsample-5m-" + sourceIndex; - // DSL runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts - // downsampling. This sleep here tries to get as close as possible to having disruption during the downsample execution. - long sleepTime = randomLongBetween(3000, 4500); - logger.info("-> giving data stream lifecycle [{}] millis to make some progress before starting the disruption", sleepTime); - Thread.sleep(sleepTime); - List backingIndices = getBackingIndices(client(), dataStreamName); - // first generation index - String sourceIndex = backingIndices.get(0); + /** + * DLM runs every second and it has to tail forcemerge the index (2 seconds) and mark it as read-only (2s) before it starts + * downsampling. We try to detect if the downsampling has started by checking the downsample status in the target index. + */ + logger.info("-> Waiting for the data stream lifecycle to start the downsampling operation before starting the disruption."); + ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.STARTED, TimeValue.timeValueSeconds(8)); - internalCluster().rollingRestart(new InternalTestCluster.RestartCallback() { - }); + logger.info("-> Starting the disruption."); + internalCluster().rollingRestart(new InternalTestCluster.RestartCallback()); - // if the source index has already been downsampled and moved into the data stream just use its name directly - final String targetIndex = sourceIndex.startsWith("downsample-5m-") ? sourceIndex : "downsample-5m-" + sourceIndex; - assertBusy(() -> { - try { - GetSettingsResponse getSettingsResponse = cluster.client() - .admin() - .indices() - .getSettings( - new GetSettingsRequest(TEST_REQUEST_TIMEOUT).indices(targetIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) - ) - .actionGet(); - Settings indexSettings = getSettingsResponse.getIndexToSettings().get(targetIndex); - assertThat(indexSettings, is(notNullValue())); - assertThat(IndexMetadata.INDEX_DOWNSAMPLE_STATUS.get(indexSettings), is(IndexMetadata.DownsampleTaskStatus.SUCCESS)); - assertEquals("5m", IndexMetadata.INDEX_DOWNSAMPLE_INTERVAL.get(indexSettings)); - } catch (Exception e) { - throw new AssertionError(e); - } - }, 120, TimeUnit.SECONDS); + ensureDownsamplingStatus(targetIndex, IndexMetadata.DownsampleTaskStatus.SUCCESS, TimeValue.timeValueSeconds(120)); ensureGreen(targetIndex); + logger.info("-> Relocation has finished"); + } + + private void ensureDownsamplingStatus(String downsampledIndex, IndexMetadata.DownsampleTaskStatus expectedStatus, TimeValue timeout) { + final var clusterService = internalCluster().getCurrentMasterNodeInstance(ClusterService.class); + final var listener = ClusterServiceUtils.addTemporaryStateListener(clusterService, clusterState -> { + final var indexMetadata = clusterState.metadata().getProject().index(downsampledIndex); + if (indexMetadata == null) { + return false; + } + var downsamplingStatus = INDEX_DOWNSAMPLE_STATUS.get(indexMetadata.getSettings()); + if (expectedStatus == downsamplingStatus) { + logger.info("-> Downsampling status for index [{}] is [{}]", downsampledIndex, downsamplingStatus); + return true; + } + return false; + }); + safeAwait(listener, timeout.millis(), TimeUnit.MILLISECONDS); } }