diff --git a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java index 8803ad4af734..250fe5a3a79f 100644 --- a/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java +++ b/qa/smoke-test-http/src/javaRestTest/java/org/elasticsearch/http/BlockedSearcherRestCancellationTestCase.java @@ -30,6 +30,7 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.EnginePlugin; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.tasks.Task; import java.util.ArrayList; import java.util.Collection; @@ -76,6 +77,10 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build()); ensureGreen("test"); + assert request.getOptions().containsHeader(Task.X_OPAQUE_ID_HTTP_HEADER) == false; + final var opaqueId = getTestClass().getSimpleName() + "-" + getTestName() + "-" + randomUUID(); + request.setOptions(request.getOptions().toBuilder().addHeader(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId)); + final List searcherBlocks = new ArrayList<>(); for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) { for (final IndexService indexService : indicesService) { @@ -96,7 +101,8 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT } final PlainActionFuture future = new PlainActionFuture<>(); - logger.info("--> sending request"); + logger.info("--> sending request, opaque id={}", opaqueId); + final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future)); awaitTaskWithPrefix(actionPrefix); @@ -108,7 +114,7 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT cancellable.cancel(); expectThrows(CancellationException.class, future::actionGet); - assertAllCancellableTasksAreCancelled(actionPrefix); + assertAllCancellableTasksAreCancelled(actionPrefix, opaqueId); } finally { Releasables.close(releasables); } diff --git a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java index 8a0aa2033a30..8a385617bee8 100644 --- a/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/elasticsearch/tasks/CancellableTask.java @@ -109,6 +109,11 @@ public class CancellableTask extends Task { return true; } + @Override + public String toString() { + return "CancellableTask{" + super.toString() + ", reason='" + reason + '\'' + ", isCancelled=" + isCancelled + '}'; + } + private TaskCancelledException getTaskCancelledException() { assert Thread.holdsLock(this); assert isCancelled; diff --git a/server/src/main/java/org/elasticsearch/tasks/Task.java b/server/src/main/java/org/elasticsearch/tasks/Task.java index 83ee08574df4..46eb59c3a8cd 100644 --- a/server/src/main/java/org/elasticsearch/tasks/Task.java +++ b/server/src/main/java/org/elasticsearch/tasks/Task.java @@ -225,6 +225,8 @@ public class Task implements Traceable { + parentTask + ", startTime=" + startTime + + ", headers=" + + headers + ", startTimeNanos=" + startTimeNanos + '}'; diff --git a/test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java index b4ecc36fc5b9..d0862c91537c 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/TaskAssertions.java @@ -10,16 +10,19 @@ package org.elasticsearch.test; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.core.Nullable; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskInfo; -import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.transport.TransportService; +import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertTrue; import static junit.framework.TestCase.fail; import static org.elasticsearch.test.ESIntegTestCase.client; @@ -59,30 +62,28 @@ public class TaskAssertions { }); } - public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception { + public static void assertAllCancellableTasksAreCancelled(String actionPrefix, @Nullable String opaqueId) throws Exception { logger.info("--> checking that all tasks with prefix {} are marked as cancelled", actionPrefix); assertBusy(() -> { - boolean foundTask = false; + var tasks = new ArrayList(); for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { - final TaskManager taskManager = transportService.getTaskManager(); + var taskManager = transportService.getTaskManager(); assertTrue(taskManager.assertCancellableTaskConsistency()); - for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) { - if (cancellableTask.getAction().startsWith(actionPrefix)) { - logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask); - foundTask = true; - assertTrue( - "task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled", - cancellableTask.isCancelled() - ); - logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask); - } - } + taskManager.getCancellableTasks().values().stream().filter(t -> t.getAction().startsWith(actionPrefix)).forEach(tasks::add); } - assertTrue("found no cancellable tasks", foundTask); + assertFalse("no tasks found for action: " + actionPrefix, tasks.isEmpty()); + assertTrue( + tasks.toString(), + tasks.stream().allMatch(t -> t.isCancelled() && Objects.equals(t.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER), opaqueId)) + ); }, 30, TimeUnit.SECONDS); } + public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception { + assertAllCancellableTasksAreCancelled(actionPrefix, null); + } + public static void assertAllTasksHaveFinished(String actionPrefix) throws Exception { logger.info("--> checking that all tasks with prefix {} have finished", actionPrefix); assertBusy(() -> {