diff --git a/muted-tests.yml b/muted-tests.yml index 93521577d7ba..ad656c5d121c 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -102,9 +102,6 @@ tests: - class: org.elasticsearch.xpack.shutdown.NodeShutdownIT method: testAllocationPreventedForRemoval issue: https://github.com/elastic/elasticsearch/issues/116363 -- class: org.elasticsearch.xpack.downsample.ILMDownsampleDisruptionIT - method: testILMDownsampleRollingRestart - issue: https://github.com/elastic/elasticsearch/issues/114233 - class: org.elasticsearch.reservedstate.service.RepositoriesFileSettingsIT method: testSettingsApplied issue: https://github.com/elastic/elasticsearch/issues/116694 diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java index 5c2a05da1e8d..5fba98b765a6 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/ILMDownsampleDisruptionIT.java @@ -10,8 +10,6 @@ package org.elasticsearch.xpack.downsample; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.DocWriteRequest; -import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; -import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest; @@ -57,9 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import static org.elasticsearch.core.Strings.format; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -144,7 +140,7 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase { public void testILMDownsampleRollingRestart() throws Exception { final InternalTestCluster cluster = internalCluster(); - final List masterNodes = cluster.startMasterOnlyNodes(1); + cluster.startMasterOnlyNodes(1); cluster.startDataOnlyNodes(3); ensureStableCluster(cluster.size()); ensureGreen(); @@ -169,46 +165,16 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase { .endObject(); }; int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT); - final CountDownLatch disruptionStart = new CountDownLatch(1); - final CountDownLatch disruptionEnd = new CountDownLatch(1); - new Thread(new Disruptor(cluster, sourceIndex, new DisruptionListener() { - @Override - public void disruptionStart() { - disruptionStart.countDown(); - } - - @Override - public void disruptionEnd() { - disruptionEnd.countDown(); - } - }, masterNodes.get(0), (ignored) -> { - try { - cluster.rollingRestart(new InternalTestCluster.RestartCallback() { - @Override - public boolean validateClusterForming() { - return true; - } - }); - } catch (Exception e) { - throw new RuntimeException(e); - } - })).start(); + cluster.rollingRestart(new InternalTestCluster.RestartCallback()); final String targetIndex = "downsample-1h-" + sourceIndex; - startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd); - waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS); - ensureStableCluster(cluster.numDataAndMasterNodes()); - assertTargetIndex(cluster, targetIndex, indexedDocs); + startDownsampleTaskViaIlm(sourceIndex, targetIndex); + assertBusy(() -> assertTargetIndex(cluster, targetIndex, indexedDocs)); + ensureGreen(targetIndex); } - private void startDownsampleTaskViaIlm( - String sourceIndex, - String targetIndex, - CountDownLatch disruptionStart, - CountDownLatch disruptionEnd - ) throws Exception { - disruptionStart.await(); + private void startDownsampleTaskViaIlm(String sourceIndex, String targetIndex) throws Exception { var request = new UpdateSettingsRequest(sourceIndex).settings( Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME) ); @@ -231,7 +197,6 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase { var getSettingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(targetIndex)).actionGet(); assertThat(getSettingsResponse.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()), equalTo("success")); }, 60, TimeUnit.SECONDS); - disruptionEnd.await(); } private void assertTargetIndex(final InternalTestCluster cluster, final String targetIndex, int indexedDocs) { @@ -294,53 +259,4 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase { public interface SourceSupplier { XContentBuilder get() throws IOException; } - - interface DisruptionListener { - void disruptionStart(); - - void disruptionEnd(); - } - - private class Disruptor implements Runnable { - final InternalTestCluster cluster; - private final String sourceIndex; - private final DisruptionListener listener; - private final String clientNode; - private final Consumer disruption; - - private Disruptor( - final InternalTestCluster cluster, - final String sourceIndex, - final DisruptionListener listener, - final String clientNode, - final Consumer disruption - ) { - this.cluster = cluster; - this.sourceIndex = sourceIndex; - this.listener = listener; - this.clientNode = clientNode; - this.disruption = disruption; - } - - @Override - public void run() { - listener.disruptionStart(); - try { - final String candidateNode = safeExecute( - cluster.client(clientNode), - TransportClusterSearchShardsAction.TYPE, - new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, sourceIndex) - ).getNodes()[0].getName(); - logger.info("Candidate node [" + candidateNode + "]"); - disruption.accept(candidateNode); - ensureGreen(sourceIndex); - ensureStableCluster(cluster.numDataAndMasterNodes(), clientNode); - - } catch (Exception e) { - logger.error("Ignoring Error while injecting disruption [" + e.getMessage() + "]"); - } finally { - listener.disruptionEnd(); - } - } - } }