mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Fix testClusterHealthRestCancellation
(#113680)
This test was failing due to a race between an early cancellation check and the cancel operation. With this commit we wait until the action is definitely blocked before cancelling the task. Closes #100062
This commit is contained in:
parent
4318ebbe8f
commit
e9d0dd9e28
1 changed files with 14 additions and 24 deletions
|
@ -19,25 +19,15 @@ import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
|
|
||||||
|
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
|
import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
|
||||||
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
|
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
|
||||||
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;
|
|
||||||
|
|
||||||
public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {
|
public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100062")
|
|
||||||
@TestIssueLogging(
|
|
||||||
issueUrl = "https://github.com/elastic/elasticsearch/issues/100062",
|
|
||||||
value = "org.elasticsearch.test.TaskAssertions:TRACE"
|
|
||||||
+ ",org.elasticsearch.cluster.service.MasterService:TRACE"
|
|
||||||
+ ",org.elasticsearch.tasks.TaskManager:TRACE"
|
|
||||||
)
|
|
||||||
public void testClusterHealthRestCancellation() throws Exception {
|
public void testClusterHealthRestCancellation() throws Exception {
|
||||||
|
|
||||||
final var barrier = new CyclicBarrier(2);
|
final var barrier = new CyclicBarrier(2);
|
||||||
|
@ -47,18 +37,7 @@ public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
safeAwait(barrier);
|
safeAwait(barrier);
|
||||||
// safeAwait(barrier);
|
safeAwait(barrier);
|
||||||
|
|
||||||
// temporarily lengthen timeout on safeAwait while investigating #100062
|
|
||||||
try {
|
|
||||||
barrier.await(60, TimeUnit.SECONDS);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
throw new AssertionError("unexpected", e);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new AssertionError("unexpected", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,12 +51,23 @@ public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {
|
||||||
clusterHealthRequest.addParameter("wait_for_events", Priority.LANGUID.toString());
|
clusterHealthRequest.addParameter("wait_for_events", Priority.LANGUID.toString());
|
||||||
|
|
||||||
final PlainActionFuture<Response> future = new PlainActionFuture<>();
|
final PlainActionFuture<Response> future = new PlainActionFuture<>();
|
||||||
logger.info("--> sending cluster state request");
|
logger.info("--> sending cluster health request");
|
||||||
final Cancellable cancellable = getRestClient().performRequestAsync(clusterHealthRequest, wrapAsRestResponseListener(future));
|
final Cancellable cancellable = getRestClient().performRequestAsync(clusterHealthRequest, wrapAsRestResponseListener(future));
|
||||||
|
|
||||||
safeAwait(barrier);
|
safeAwait(barrier);
|
||||||
|
|
||||||
awaitTaskWithPrefixOnMaster(TransportClusterHealthAction.NAME);
|
// wait until the health request is waiting on the (blocked) master service
|
||||||
|
assertBusy(
|
||||||
|
() -> assertTrue(
|
||||||
|
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
|
||||||
|
.getMasterService()
|
||||||
|
.pendingTasks()
|
||||||
|
.stream()
|
||||||
|
.anyMatch(
|
||||||
|
pendingClusterTask -> pendingClusterTask.source().string().equals("cluster_health (wait_for_events [LANGUID])")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
logger.info("--> cancelling cluster health request");
|
logger.info("--> cancelling cluster health request");
|
||||||
cancellable.cancel();
|
cancellable.cancel();
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue