Use opaque id in task cancellation assertion (#110680)

Add use of Opaque ID HTTP header in task cancellation assertion. In some
tests, like this #88201 `testCatSegmentsRestCancellation`, we assert
that all tasks related to specific HTTP request are cancelled. But we do
blanket approach in assertion block catching all tasks by action name. I
think narrowing down assertion to specific http request in this case
would be more accurate.

It is still not clear why test mentioned above failing, but after hours
of investigation and injecting random delays, I'm inclining more to
@DaveCTurner's comment about interference from other tests or cluster
activity. I added additional log that will report when we spot task with
different opaque id.
This commit is contained in:
Mikhail Berezovskiy 2024-07-11 21:35:57 -07:00 committed by GitHub
parent 55532c8d6f
commit c1c5fe64b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 32 additions and 18 deletions

View file

@ -30,6 +30,7 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.EnginePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.Task;
import java.util.ArrayList;
import java.util.Collection;
@ -76,6 +77,10 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
ensureGreen("test");
assert request.getOptions().containsHeader(Task.X_OPAQUE_ID_HTTP_HEADER) == false;
final var opaqueId = getTestClass().getSimpleName() + "-" + getTestName() + "-" + randomUUID();
request.setOptions(request.getOptions().toBuilder().addHeader(Task.X_OPAQUE_ID_HTTP_HEADER, opaqueId));
final List<Semaphore> searcherBlocks = new ArrayList<>();
for (final IndicesService indicesService : internalCluster().getInstances(IndicesService.class)) {
for (final IndexService indexService : indicesService) {
@ -96,7 +101,8 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
}
final PlainActionFuture<Response> future = new PlainActionFuture<>();
logger.info("--> sending request");
logger.info("--> sending request, opaque id={}", opaqueId);
final Cancellable cancellable = getRestClient().performRequestAsync(request, wrapAsRestResponseListener(future));
awaitTaskWithPrefix(actionPrefix);
@ -108,7 +114,7 @@ public abstract class BlockedSearcherRestCancellationTestCase extends HttpSmokeT
cancellable.cancel();
expectThrows(CancellationException.class, future::actionGet);
assertAllCancellableTasksAreCancelled(actionPrefix);
assertAllCancellableTasksAreCancelled(actionPrefix, opaqueId);
} finally {
Releasables.close(releasables);
}

View file

@ -109,6 +109,11 @@ public class CancellableTask extends Task {
return true;
}
@Override
public String toString() {
return "CancellableTask{" + super.toString() + ", reason='" + reason + '\'' + ", isCancelled=" + isCancelled + '}';
}
private TaskCancelledException getTaskCancelledException() {
assert Thread.holdsLock(this);
assert isCancelled;

View file

@ -225,6 +225,8 @@ public class Task implements Traceable {
+ parentTask
+ ", startTime="
+ startTime
+ ", headers="
+ headers
+ ", startTimeNanos="
+ startTimeNanos
+ '}';

View file

@ -10,16 +10,19 @@ package org.elasticsearch.test;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static junit.framework.TestCase.assertFalse;
import static junit.framework.TestCase.assertTrue;
import static junit.framework.TestCase.fail;
import static org.elasticsearch.test.ESIntegTestCase.client;
@ -59,30 +62,28 @@ public class TaskAssertions {
});
}
public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception {
public static void assertAllCancellableTasksAreCancelled(String actionPrefix, @Nullable String opaqueId) throws Exception {
logger.info("--> checking that all tasks with prefix {} are marked as cancelled", actionPrefix);
assertBusy(() -> {
boolean foundTask = false;
var tasks = new ArrayList<CancellableTask>();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
final TaskManager taskManager = transportService.getTaskManager();
var taskManager = transportService.getTaskManager();
assertTrue(taskManager.assertCancellableTaskConsistency());
for (CancellableTask cancellableTask : taskManager.getCancellableTasks().values()) {
if (cancellableTask.getAction().startsWith(actionPrefix)) {
logger.trace("--> found task with prefix [{}]: [{}]", actionPrefix, cancellableTask);
foundTask = true;
taskManager.getCancellableTasks().values().stream().filter(t -> t.getAction().startsWith(actionPrefix)).forEach(tasks::add);
}
assertFalse("no tasks found for action: " + actionPrefix, tasks.isEmpty());
assertTrue(
"task " + cancellableTask.getId() + "/" + cancellableTask.getAction() + " not cancelled",
cancellableTask.isCancelled()
tasks.toString(),
tasks.stream().allMatch(t -> t.isCancelled() && Objects.equals(t.getHeader(Task.X_OPAQUE_ID_HTTP_HEADER), opaqueId))
);
logger.trace("--> Task with prefix [{}] is marked as cancelled: [{}]", actionPrefix, cancellableTask);
}
}
}
assertTrue("found no cancellable tasks", foundTask);
}, 30, TimeUnit.SECONDS);
}
public static void assertAllCancellableTasksAreCancelled(String actionPrefix) throws Exception {
assertAllCancellableTasksAreCancelled(actionPrefix, null);
}
public static void assertAllTasksHaveFinished(String actionPrefix) throws Exception {
logger.info("--> checking that all tasks with prefix {} have finished", actionPrefix);
assertBusy(() -> {