mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 17:34:17 -04:00
Today we rely on registering the channel after registering the task to be cancelled to ensure that the task is cancelled even if the channel is closed concurrently. However the client may already have processed a cancellable request on the channel and therefore this mechanism doesn't work. With this change we make sure not to register another task after draining the registrations in order to cancel them. Closes #88201
This commit is contained in:
parent
b5ad5d9dcd
commit
e930e9fd4e
4 changed files with 94 additions and 40 deletions
6
docs/changelog/126686.yaml
Normal file
6
docs/changelog/126686.yaml
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
pr: 126686
|
||||||
|
summary: Fix race condition in `RestCancellableNodeClient`
|
||||||
|
area: Task Management
|
||||||
|
type: bug
|
||||||
|
issues:
|
||||||
|
- 88201
|
|
@ -12,23 +12,12 @@ package org.elasticsearch.http;
|
||||||
import org.apache.http.client.methods.HttpGet;
|
import org.apache.http.client.methods.HttpGet;
|
||||||
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
|
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsAction;
|
||||||
import org.elasticsearch.client.Request;
|
import org.elasticsearch.client.Request;
|
||||||
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
|
|
||||||
|
|
||||||
public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
|
public class IndicesSegmentsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
|
||||||
@TestIssueLogging(
|
|
||||||
issueUrl = "https://github.com/elastic/elasticsearch/issues/88201",
|
|
||||||
value = "org.elasticsearch.http.BlockedSearcherRestCancellationTestCase:DEBUG"
|
|
||||||
+ ",org.elasticsearch.transport.TransportService:TRACE"
|
|
||||||
)
|
|
||||||
public void testIndicesSegmentsRestCancellation() throws Exception {
|
public void testIndicesSegmentsRestCancellation() throws Exception {
|
||||||
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME);
|
runTest(new Request(HttpGet.METHOD_NAME, "/_segments"), IndicesSegmentsAction.NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@TestIssueLogging(
|
|
||||||
issueUrl = "https://github.com/elastic/elasticsearch/issues/88201",
|
|
||||||
value = "org.elasticsearch.http.BlockedSearcherRestCancellationTestCase:DEBUG"
|
|
||||||
+ ",org.elasticsearch.transport.TransportService:TRACE"
|
|
||||||
)
|
|
||||||
public void testCatSegmentsRestCancellation() throws Exception {
|
public void testCatSegmentsRestCancellation() throws Exception {
|
||||||
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME);
|
runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"), IndicesSegmentsAction.NAME);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,14 @@ import org.elasticsearch.client.internal.Client;
|
||||||
import org.elasticsearch.client.internal.FilterClient;
|
import org.elasticsearch.client.internal.FilterClient;
|
||||||
import org.elasticsearch.client.internal.OriginSettingClient;
|
import org.elasticsearch.client.internal.OriginSettingClient;
|
||||||
import org.elasticsearch.client.internal.node.NodeClient;
|
import org.elasticsearch.client.internal.node.NodeClient;
|
||||||
|
import org.elasticsearch.core.Nullable;
|
||||||
import org.elasticsearch.http.HttpChannel;
|
import org.elasticsearch.http.HttpChannel;
|
||||||
import org.elasticsearch.tasks.CancellableTask;
|
import org.elasticsearch.tasks.CancellableTask;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
import org.elasticsearch.tasks.TaskId;
|
import org.elasticsearch.tasks.TaskId;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.Collection;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -112,12 +112,14 @@ public class RestCancellableNodeClient extends FilterClient {
|
||||||
|
|
||||||
private class CloseListener implements ActionListener<Void> {
|
private class CloseListener implements ActionListener<Void> {
|
||||||
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
|
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
|
||||||
private final Set<TaskId> tasks = new HashSet<>();
|
|
||||||
|
@Nullable // if already drained
|
||||||
|
private Set<TaskId> tasks = new HashSet<>();
|
||||||
|
|
||||||
CloseListener() {}
|
CloseListener() {}
|
||||||
|
|
||||||
synchronized int getNumTasks() {
|
synchronized int getNumTasks() {
|
||||||
return tasks.size();
|
return tasks == null ? 0 : tasks.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
void maybeRegisterChannel(HttpChannel httpChannel) {
|
void maybeRegisterChannel(HttpChannel httpChannel) {
|
||||||
|
@ -130,16 +132,23 @@ public class RestCancellableNodeClient extends FilterClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void registerTask(TaskHolder taskHolder, TaskId taskId) {
|
void registerTask(TaskHolder taskHolder, TaskId taskId) {
|
||||||
|
synchronized (this) {
|
||||||
taskHolder.taskId = taskId;
|
taskHolder.taskId = taskId;
|
||||||
|
if (tasks != null) {
|
||||||
if (taskHolder.completed == false) {
|
if (taskHolder.completed == false) {
|
||||||
this.tasks.add(taskId);
|
tasks.add(taskId);
|
||||||
}
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// else tasks == null so the channel is already closed
|
||||||
|
cancelTask(taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void unregisterTask(TaskHolder taskHolder) {
|
synchronized void unregisterTask(TaskHolder taskHolder) {
|
||||||
if (taskHolder.taskId != null) {
|
if (taskHolder.taskId != null && tasks != null) {
|
||||||
this.tasks.remove(taskHolder.taskId);
|
tasks.remove(taskHolder.taskId);
|
||||||
}
|
}
|
||||||
taskHolder.completed = true;
|
taskHolder.completed = true;
|
||||||
}
|
}
|
||||||
|
@ -149,18 +158,20 @@ public class RestCancellableNodeClient extends FilterClient {
|
||||||
final HttpChannel httpChannel = channel.get();
|
final HttpChannel httpChannel = channel.get();
|
||||||
assert httpChannel != null : "channel not registered";
|
assert httpChannel != null : "channel not registered";
|
||||||
// when the channel gets closed it won't be reused: we can remove it from the map and forget about it.
|
// when the channel gets closed it won't be reused: we can remove it from the map and forget about it.
|
||||||
CloseListener closeListener = httpChannels.remove(httpChannel);
|
final CloseListener closeListener = httpChannels.remove(httpChannel);
|
||||||
assert closeListener != null : "channel not found in the map of tracked channels";
|
assert closeListener != null : "channel not found in the map of tracked channels: " + httpChannel;
|
||||||
final List<TaskId> toCancel;
|
assert closeListener == CloseListener.this : "channel had a different CloseListener registered: " + httpChannel;
|
||||||
synchronized (this) {
|
for (final var taskId : drainTasks()) {
|
||||||
toCancel = new ArrayList<>(tasks);
|
|
||||||
tasks.clear();
|
|
||||||
}
|
|
||||||
for (TaskId taskId : toCancel) {
|
|
||||||
cancelTask(taskId);
|
cancelTask(taskId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized Collection<TaskId> drainTasks() {
|
||||||
|
final var drained = tasks;
|
||||||
|
tasks = null;
|
||||||
|
return drained;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
onResponse(null);
|
onResponse(null);
|
||||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.action.search.TransportSearchAction;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.client.internal.node.NodeClient;
|
import org.elasticsearch.client.internal.node.NodeClient;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
import org.elasticsearch.http.HttpChannel;
|
import org.elasticsearch.http.HttpChannel;
|
||||||
import org.elasticsearch.http.HttpResponse;
|
import org.elasticsearch.http.HttpResponse;
|
||||||
import org.elasticsearch.tasks.Task;
|
import org.elasticsearch.tasks.Task;
|
||||||
|
@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import java.util.function.LongSupplier;
|
||||||
|
|
||||||
public class RestCancellableNodeClientTests extends ESTestCase {
|
public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -148,8 +150,42 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
assertEquals(totalSearches, testClient.cancelledTasks.size());
|
assertEquals(totalSearches, testClient.cancelledTasks.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testConcurrentExecuteAndClose() throws Exception {
|
||||||
|
final var testClient = new TestClient(Settings.EMPTY, threadPool, true);
|
||||||
|
int initialHttpChannels = RestCancellableNodeClient.getNumChannels();
|
||||||
|
int numTasks = randomIntBetween(1, 30);
|
||||||
|
TestHttpChannel channel = new TestHttpChannel();
|
||||||
|
final var startLatch = new CountDownLatch(1);
|
||||||
|
final var doneLatch = new CountDownLatch(numTasks + 1);
|
||||||
|
final var expectedTasks = Sets.<TaskId>newHashSetWithExpectedSize(numTasks);
|
||||||
|
for (int j = 0; j < numTasks; j++) {
|
||||||
|
RestCancellableNodeClient client = new RestCancellableNodeClient(testClient, channel);
|
||||||
|
threadPool.generic().execute(() -> {
|
||||||
|
client.execute(TransportSearchAction.TYPE, new SearchRequest(), ActionListener.running(ESTestCase::fail));
|
||||||
|
startLatch.countDown();
|
||||||
|
doneLatch.countDown();
|
||||||
|
});
|
||||||
|
expectedTasks.add(new TaskId(testClient.getLocalNodeId(), j));
|
||||||
|
}
|
||||||
|
threadPool.generic().execute(() -> {
|
||||||
|
try {
|
||||||
|
safeAwait(startLatch);
|
||||||
|
channel.awaitClose();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new AssertionError(e);
|
||||||
|
} finally {
|
||||||
|
doneLatch.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
safeAwait(doneLatch);
|
||||||
|
assertEquals(initialHttpChannels, RestCancellableNodeClient.getNumChannels());
|
||||||
|
assertEquals(expectedTasks, testClient.cancelledTasks);
|
||||||
|
}
|
||||||
|
|
||||||
private static class TestClient extends NodeClient {
|
private static class TestClient extends NodeClient {
|
||||||
private final AtomicLong counter = new AtomicLong(0);
|
private final LongSupplier searchTaskIdGenerator = new AtomicLong(0)::getAndIncrement;
|
||||||
|
private final LongSupplier cancelTaskIdGenerator = new AtomicLong(1000)::getAndIncrement;
|
||||||
private final Set<TaskId> cancelledTasks = new CopyOnWriteArraySet<>();
|
private final Set<TaskId> cancelledTasks = new CopyOnWriteArraySet<>();
|
||||||
private final AtomicInteger searchRequests = new AtomicInteger(0);
|
private final AtomicInteger searchRequests = new AtomicInteger(0);
|
||||||
private final boolean timeout;
|
private final boolean timeout;
|
||||||
|
@ -167,9 +203,17 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
) {
|
) {
|
||||||
switch (action.name()) {
|
switch (action.name()) {
|
||||||
case TransportCancelTasksAction.NAME -> {
|
case TransportCancelTasksAction.NAME -> {
|
||||||
CancelTasksRequest cancelTasksRequest = (CancelTasksRequest) request;
|
assertTrue(
|
||||||
assertTrue("tried to cancel the same task more than once", cancelledTasks.add(cancelTasksRequest.getTargetTaskId()));
|
"tried to cancel the same task more than once",
|
||||||
Task task = request.createTask(counter.getAndIncrement(), "cancel_task", action.name(), null, Collections.emptyMap());
|
cancelledTasks.add(asInstanceOf(CancelTasksRequest.class, request).getTargetTaskId())
|
||||||
|
);
|
||||||
|
Task task = request.createTask(
|
||||||
|
cancelTaskIdGenerator.getAsLong(),
|
||||||
|
"cancel_task",
|
||||||
|
action.name(),
|
||||||
|
null,
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
listener.onResponse(null);
|
listener.onResponse(null);
|
||||||
} else {
|
} else {
|
||||||
|
@ -180,7 +224,13 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
case TransportSearchAction.NAME -> {
|
case TransportSearchAction.NAME -> {
|
||||||
searchRequests.incrementAndGet();
|
searchRequests.incrementAndGet();
|
||||||
Task searchTask = request.createTask(counter.getAndIncrement(), "search", action.name(), null, Collections.emptyMap());
|
Task searchTask = request.createTask(
|
||||||
|
searchTaskIdGenerator.getAsLong(),
|
||||||
|
"search",
|
||||||
|
action.name(),
|
||||||
|
null,
|
||||||
|
Collections.emptyMap()
|
||||||
|
);
|
||||||
if (timeout == false) {
|
if (timeout == false) {
|
||||||
if (rarely()) {
|
if (rarely()) {
|
||||||
// make sure that search is sometimes also called from the same thread before the task is returned
|
// make sure that search is sometimes also called from the same thread before the task is returned
|
||||||
|
@ -191,7 +241,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
return searchTask;
|
return searchTask;
|
||||||
}
|
}
|
||||||
default -> throw new UnsupportedOperationException();
|
default -> throw new AssertionError("unexpected action " + action.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -222,10 +272,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
if (open.compareAndSet(true, false) == false) {
|
assertTrue("HttpChannel is already closed", open.compareAndSet(true, false));
|
||||||
assert false : "HttpChannel is already closed";
|
|
||||||
return; // nothing to do
|
|
||||||
}
|
|
||||||
ActionListener<Void> listener = closeListener.get();
|
ActionListener<Void> listener = closeListener.get();
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
boolean failure = randomBoolean();
|
boolean failure = randomBoolean();
|
||||||
|
@ -241,6 +288,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void awaitClose() throws InterruptedException {
|
private void awaitClose() throws InterruptedException {
|
||||||
|
assertNotNull("must set closeListener before calling awaitClose", closeListener.get());
|
||||||
close();
|
close();
|
||||||
closeLatch.await();
|
closeLatch.await();
|
||||||
}
|
}
|
||||||
|
@ -257,7 +305,7 @@ public class RestCancellableNodeClientTests extends ESTestCase {
|
||||||
listener.onResponse(null);
|
listener.onResponse(null);
|
||||||
} else {
|
} else {
|
||||||
if (closeListener.compareAndSet(null, listener) == false) {
|
if (closeListener.compareAndSet(null, listener) == false) {
|
||||||
throw new IllegalStateException("close listener already set, only one is allowed!");
|
throw new AssertionError("close listener already set, only one is allowed!");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue