Resume driver when failing to fetch pages (#106392)

I investigated a heap attack test failure and found that an ESQL request 
was stuck. This occurred in the following:

1. The ExchangeSource on the coordinator was blocked on reading because 
there were no available pages.

2. Meanwhile, the ExchangeSink on the data node had pages ready for 
fetching.

3. When an exchange request tried to fetch pages, it failed due to a 
CircuitBreakingException. Despite the failure, no cancellation was
triggered because the status of the ExchangeSource on the coordinator
remained unchanged.  To fix this issue, this PR introduces two changes:

Resumes the ExchangeSourceOperator and Driver on the coordinator, 
eventually allowing the coordinator to trigger cancellation of the
request when failing to fetch pages.

Ensures that an exchange sink on the data nodes fails when a data node 
request is cancelled. This callback was inadvertently omitted when
introducing the node-level reduction in Run empty reduction node level
on data nodes #106204.

I plan to spend some time to harden the exchange and compute service.

Closes #106262
This commit is contained in:
Nhat Nguyen 2024-03-18 09:32:31 -07:00 committed by GitHub
parent bead96b2ff
commit d66c7d4bc8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 112 additions and 3 deletions

View file

@ -0,0 +1,6 @@
pr: 106392
summary: Resume driver when failing to fetch pages
area: ES|QL
type: bug
issues:
- 106262

View file

@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.heap_attack;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig; import org.apache.http.client.config.RequestConfig;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.client.Request; import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
@ -58,7 +57,6 @@ import static org.hamcrest.Matchers.hasSize;
* Tests that run ESQL queries that have, in the past, used so much memory they * Tests that run ESQL queries that have, in the past, used so much memory they
* crash Elasticsearch. * crash Elasticsearch.
*/ */
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/106262")
public class HeapAttackIT extends ESRestTestCase { public class HeapAttackIT extends ESRestTestCase {
@ClassRule @ClassRule

View file

@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.AbstractAsyncTask;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockStreamInput; import org.elasticsearch.compute.data.BlockStreamInput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.TimeValue;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -112,8 +113,9 @@ public final class ExchangeService extends AbstractLifecycleComponent {
/** /**
* Removes the exchange sink handler associated with the given exchange id. * Removes the exchange sink handler associated with the given exchange id.
* W will abort the sink handler if the given failure is not null.
*/ */
public void finishSinkHandler(String exchangeId, Exception failure) { public void finishSinkHandler(String exchangeId, @Nullable Exception failure) {
final ExchangeSinkHandler sinkHandler = sinks.remove(exchangeId); final ExchangeSinkHandler sinkHandler = sinks.remove(exchangeId);
if (sinkHandler != null) { if (sinkHandler != null) {
if (failure != null) { if (failure != null) {

View file

@ -184,4 +184,12 @@ public final class ExchangeSinkHandler {
long lastUpdatedTimeInMillis() { long lastUpdatedTimeInMillis() {
return lastUpdatedInMillis.get(); return lastUpdatedInMillis.get();
} }
/**
* Returns the number of pages available in the buffer.
* This method should be used for testing only.
*/
public int bufferSize() {
return buffer.size();
}
} }

View file

@ -203,6 +203,7 @@ public final class ExchangeSourceHandler {
} }
return first; return first;
}); });
buffer.waitForReading().onResponse(null); // resume the Driver if it is being blocked on reading
onSinkComplete(); onSinkComplete();
} }

View file

@ -12,35 +12,51 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverStatus;
import org.elasticsearch.compute.operator.DriverTaskRunner; import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger; import org.elasticsearch.logging.Logger;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before; import org.junit.Before;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.assertMap;
import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.elasticsearch.test.MapMatcher.matchesMap;
import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.emptyOrNullString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -309,4 +325,81 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
) )
); );
} }
/**
* Ensure that when some exchange requests fail, we cancel the ESQL request, and complete all
* exchange sinks with the failure, despite having outstanding pages in the buffer.
*/
public void testCancelRequestWhenFailingFetchingPages() throws Exception {
String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY);
String dataNode = internalCluster().startDataOnlyNode();
// block, then fail exchange requests when we have outstanding pages
var transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode);
CountDownLatch fetchingStarted = new CountDownLatch(1);
CountDownLatch allowedFetching = new CountDownLatch(1);
transportService.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> {
AbstractRunnable runnable = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
channel.sendResponse(e);
}
@Override
protected void doRun() throws Exception {
fetchingStarted.countDown();
assertTrue(allowedFetching.await(1, TimeUnit.MINUTES));
onFailure(new IOException("failed to fetch pages"));
}
};
transportService.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(runnable);
});
try {
scriptPermits.release(numberOfDocs()); // do not block Lucene operators
Client client = client(coordinator);
EsqlQueryRequest request = new EsqlQueryRequest();
client().admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put("index.routing.allocation.include._name", dataNode).build())
.get();
ensureYellowAndNoInitializingShards("test");
request.query("FROM test | LIMIT 10");
request.pragmas(randomPragmas());
PlainActionFuture<EsqlQueryResponse> future = new PlainActionFuture<>();
client.execute(EsqlQueryAction.INSTANCE, request, future);
try {
List<TaskInfo> foundTasks = new ArrayList<>();
assertBusy(() -> {
List<TaskInfo> tasks = client().admin()
.cluster()
.prepareListTasks()
.setActions(EsqlQueryAction.NAME)
.setDetailed(true)
.get()
.getTasks();
assertThat(tasks, hasSize(1));
foundTasks.addAll(tasks);
});
String sessionId = foundTasks.get(0).taskId().toString();
ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode);
assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES));
ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId);
if (randomBoolean()) {
// do not fail exchange requests when we have some pages
assertBusy(() -> assertThat(exchangeSink.bufferSize(), greaterThan(0)));
}
} finally {
allowedFetching.countDown();
}
Exception failure = expectThrows(Exception.class, () -> future.actionGet().close());
assertThat(failure.getMessage(), containsString("failed to fetch pages"));
} finally {
transportService.clearAllRules();
}
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class);
}
} }

View file

@ -690,6 +690,7 @@ public class ComputeService {
dataNodeRequestExecutor.start(); dataNodeRequestExecutor.start();
// run the node-level reduction // run the node-level reduction
var externalSink = exchangeService.getSinkHandler(externalId); var externalSink = exchangeService.getSinkHandler(externalId);
task.addListener(() -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())));
var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor); var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor);
exchangeSource.addRemoteSink(internalSink::fetchPageAsync, 1); exchangeSource.addRemoteSink(internalSink::fetchPageAsync, 1);
ActionListener<Void> reductionListener = cancelOnFailure(task, cancelled, refs.acquire()); ActionListener<Void> reductionListener = cancelOnFailure(task, cancelled, refs.acquire());