mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
Fix ILMDownsampleDisruptionIT.testILMDownsampleRollingRestart
(#119196)
This removes a redundant thread creation when triggering a rolling restart as the method is already async and drops the check for cluster health as that might hit a node that's being shut down (the master node in particular).
This commit is contained in:
parent
be0d4d9e45
commit
2153cac4dd
2 changed files with 6 additions and 93 deletions
|
@ -102,9 +102,6 @@ tests:
|
||||||
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
|
- class: org.elasticsearch.xpack.shutdown.NodeShutdownIT
|
||||||
method: testAllocationPreventedForRemoval
|
method: testAllocationPreventedForRemoval
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/116363
|
issue: https://github.com/elastic/elasticsearch/issues/116363
|
||||||
- class: org.elasticsearch.xpack.downsample.ILMDownsampleDisruptionIT
|
|
||||||
method: testILMDownsampleRollingRestart
|
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/114233
|
|
||||||
- class: org.elasticsearch.reservedstate.service.RepositoriesFileSettingsIT
|
- class: org.elasticsearch.reservedstate.service.RepositoriesFileSettingsIT
|
||||||
method: testSettingsApplied
|
method: testSettingsApplied
|
||||||
issue: https://github.com/elastic/elasticsearch/issues/116694
|
issue: https://github.com/elastic/elasticsearch/issues/116694
|
||||||
|
|
|
@ -10,8 +10,6 @@ package org.elasticsearch.xpack.downsample;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.elasticsearch.action.DocWriteRequest;
|
import org.elasticsearch.action.DocWriteRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
|
|
||||||
import org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction;
|
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
|
||||||
|
@ -57,9 +55,7 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
|
||||||
|
|
||||||
import static org.elasticsearch.core.Strings.format;
|
import static org.elasticsearch.core.Strings.format;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
|
@ -144,7 +140,7 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
|
||||||
|
|
||||||
public void testILMDownsampleRollingRestart() throws Exception {
|
public void testILMDownsampleRollingRestart() throws Exception {
|
||||||
final InternalTestCluster cluster = internalCluster();
|
final InternalTestCluster cluster = internalCluster();
|
||||||
final List<String> masterNodes = cluster.startMasterOnlyNodes(1);
|
cluster.startMasterOnlyNodes(1);
|
||||||
cluster.startDataOnlyNodes(3);
|
cluster.startDataOnlyNodes(3);
|
||||||
ensureStableCluster(cluster.size());
|
ensureStableCluster(cluster.size());
|
||||||
ensureGreen();
|
ensureGreen();
|
||||||
|
@ -169,46 +165,16 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
|
||||||
.endObject();
|
.endObject();
|
||||||
};
|
};
|
||||||
int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT);
|
int indexedDocs = bulkIndex(sourceIndex, sourceSupplier, DOC_COUNT);
|
||||||
final CountDownLatch disruptionStart = new CountDownLatch(1);
|
|
||||||
final CountDownLatch disruptionEnd = new CountDownLatch(1);
|
|
||||||
|
|
||||||
new Thread(new Disruptor(cluster, sourceIndex, new DisruptionListener() {
|
cluster.rollingRestart(new InternalTestCluster.RestartCallback());
|
||||||
@Override
|
|
||||||
public void disruptionStart() {
|
|
||||||
disruptionStart.countDown();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void disruptionEnd() {
|
|
||||||
disruptionEnd.countDown();
|
|
||||||
}
|
|
||||||
}, masterNodes.get(0), (ignored) -> {
|
|
||||||
try {
|
|
||||||
cluster.rollingRestart(new InternalTestCluster.RestartCallback() {
|
|
||||||
@Override
|
|
||||||
public boolean validateClusterForming() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
})).start();
|
|
||||||
|
|
||||||
final String targetIndex = "downsample-1h-" + sourceIndex;
|
final String targetIndex = "downsample-1h-" + sourceIndex;
|
||||||
startDownsampleTaskViaIlm(sourceIndex, targetIndex, disruptionStart, disruptionEnd);
|
startDownsampleTaskViaIlm(sourceIndex, targetIndex);
|
||||||
waitUntil(() -> getClusterPendingTasks(cluster.client()).pendingTasks().isEmpty(), 60, TimeUnit.SECONDS);
|
assertBusy(() -> assertTargetIndex(cluster, targetIndex, indexedDocs));
|
||||||
ensureStableCluster(cluster.numDataAndMasterNodes());
|
ensureGreen(targetIndex);
|
||||||
assertTargetIndex(cluster, targetIndex, indexedDocs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startDownsampleTaskViaIlm(
|
private void startDownsampleTaskViaIlm(String sourceIndex, String targetIndex) throws Exception {
|
||||||
String sourceIndex,
|
|
||||||
String targetIndex,
|
|
||||||
CountDownLatch disruptionStart,
|
|
||||||
CountDownLatch disruptionEnd
|
|
||||||
) throws Exception {
|
|
||||||
disruptionStart.await();
|
|
||||||
var request = new UpdateSettingsRequest(sourceIndex).settings(
|
var request = new UpdateSettingsRequest(sourceIndex).settings(
|
||||||
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME)
|
Settings.builder().put(LifecycleSettings.LIFECYCLE_NAME, POLICY_NAME)
|
||||||
);
|
);
|
||||||
|
@ -231,7 +197,6 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
|
||||||
var getSettingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(targetIndex)).actionGet();
|
var getSettingsResponse = client().admin().indices().getSettings(new GetSettingsRequest().indices(targetIndex)).actionGet();
|
||||||
assertThat(getSettingsResponse.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()), equalTo("success"));
|
assertThat(getSettingsResponse.getSetting(targetIndex, IndexMetadata.INDEX_DOWNSAMPLE_STATUS.getKey()), equalTo("success"));
|
||||||
}, 60, TimeUnit.SECONDS);
|
}, 60, TimeUnit.SECONDS);
|
||||||
disruptionEnd.await();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertTargetIndex(final InternalTestCluster cluster, final String targetIndex, int indexedDocs) {
|
private void assertTargetIndex(final InternalTestCluster cluster, final String targetIndex, int indexedDocs) {
|
||||||
|
@ -294,53 +259,4 @@ public class ILMDownsampleDisruptionIT extends ESIntegTestCase {
|
||||||
public interface SourceSupplier {
|
public interface SourceSupplier {
|
||||||
XContentBuilder get() throws IOException;
|
XContentBuilder get() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface DisruptionListener {
|
|
||||||
void disruptionStart();
|
|
||||||
|
|
||||||
void disruptionEnd();
|
|
||||||
}
|
|
||||||
|
|
||||||
private class Disruptor implements Runnable {
|
|
||||||
final InternalTestCluster cluster;
|
|
||||||
private final String sourceIndex;
|
|
||||||
private final DisruptionListener listener;
|
|
||||||
private final String clientNode;
|
|
||||||
private final Consumer<String> disruption;
|
|
||||||
|
|
||||||
private Disruptor(
|
|
||||||
final InternalTestCluster cluster,
|
|
||||||
final String sourceIndex,
|
|
||||||
final DisruptionListener listener,
|
|
||||||
final String clientNode,
|
|
||||||
final Consumer<String> disruption
|
|
||||||
) {
|
|
||||||
this.cluster = cluster;
|
|
||||||
this.sourceIndex = sourceIndex;
|
|
||||||
this.listener = listener;
|
|
||||||
this.clientNode = clientNode;
|
|
||||||
this.disruption = disruption;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
listener.disruptionStart();
|
|
||||||
try {
|
|
||||||
final String candidateNode = safeExecute(
|
|
||||||
cluster.client(clientNode),
|
|
||||||
TransportClusterSearchShardsAction.TYPE,
|
|
||||||
new ClusterSearchShardsRequest(TEST_REQUEST_TIMEOUT, sourceIndex)
|
|
||||||
).getNodes()[0].getName();
|
|
||||||
logger.info("Candidate node [" + candidateNode + "]");
|
|
||||||
disruption.accept(candidateNode);
|
|
||||||
ensureGreen(sourceIndex);
|
|
||||||
ensureStableCluster(cluster.numDataAndMasterNodes(), clientNode);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("Ignoring Error while injecting disruption [" + e.getMessage() + "]");
|
|
||||||
} finally {
|
|
||||||
listener.disruptionEnd();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue