diff --git a/docs/changelog/106392.yaml b/docs/changelog/106392.yaml new file mode 100644 index 000000000000..ff1a0284ee5d --- /dev/null +++ b/docs/changelog/106392.yaml @@ -0,0 +1,6 @@ +pr: 106392 +summary: Resume driver when failing to fetch pages +area: ES|QL +type: bug +issues: + - 106262 diff --git a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java index ffa817ed0967..8c87ef597711 100644 --- a/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java +++ b/test/external-modules/esql-heap-attack/src/javaRestTest/java/org/elasticsearch/xpack/esql/heap_attack/HeapAttackIT.java @@ -10,7 +10,6 @@ package org.elasticsearch.xpack.esql.heap_attack; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; import org.apache.http.util.EntityUtils; -import org.apache.lucene.tests.util.LuceneTestCase; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.Response; @@ -58,7 +57,6 @@ import static org.hamcrest.Matchers.hasSize; * Tests that run ESQL queries that have, in the past, used so much memory they * crash Elasticsearch. */ -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/106262") public class HeapAttackIT extends ESRestTestCase { @ClassRule diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java index efb646daec0e..a8afce1a3b22 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeService.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.util.concurrent.AbstractAsyncTask; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockStreamInput; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -112,8 +113,9 @@ public final class ExchangeService extends AbstractLifecycleComponent { /** * Removes the exchange sink handler associated with the given exchange id. + * W will abort the sink handler if the given failure is not null. */ - public void finishSinkHandler(String exchangeId, Exception failure) { + public void finishSinkHandler(String exchangeId, @Nullable Exception failure) { final ExchangeSinkHandler sinkHandler = sinks.remove(exchangeId); if (sinkHandler != null) { if (failure != null) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java index 945fdff50d31..ab155d6ee847 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkHandler.java @@ -184,4 +184,12 @@ public final class ExchangeSinkHandler { long lastUpdatedTimeInMillis() { return lastUpdatedInMillis.get(); } + + /** + * Returns the number of pages available in the buffer. + * This method should be used for testing only. + */ + public int bufferSize() { + return buffer.size(); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java index 7492fa8c1938..f1698ea401d2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceHandler.java @@ -203,6 +203,7 @@ public final class ExchangeSourceHandler { } return first; }); + buffer.waitForReading().onResponse(null); // resume the Driver if it is being blocked on reading onSinkComplete(); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 5d022cd25cda..3728eb624aaa 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -12,35 +12,51 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.internal.Client; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.DriverTaskRunner; +import org.elasticsearch.compute.operator.exchange.ExchangeService; +import org.elasticsearch.compute.operator.exchange.ExchangeSinkHandler; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator; import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskInfo; import org.elasticsearch.test.junit.annotations.TestLogging; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.junit.Before; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.emptyOrNullString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -309,4 +325,81 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { ) ); } + + /** + * Ensure that when some exchange requests fail, we cancel the ESQL request, and complete all + * exchange sinks with the failure, despite having outstanding pages in the buffer. + */ + public void testCancelRequestWhenFailingFetchingPages() throws Exception { + String coordinator = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + String dataNode = internalCluster().startDataOnlyNode(); + // block, then fail exchange requests when we have outstanding pages + var transportService = (MockTransportService) internalCluster().getInstance(TransportService.class, dataNode); + CountDownLatch fetchingStarted = new CountDownLatch(1); + CountDownLatch allowedFetching = new CountDownLatch(1); + transportService.addRequestHandlingBehavior(ExchangeService.EXCHANGE_ACTION_NAME, (handler, request, channel, task) -> { + AbstractRunnable runnable = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + channel.sendResponse(e); + } + + @Override + protected void doRun() throws Exception { + fetchingStarted.countDown(); + assertTrue(allowedFetching.await(1, TimeUnit.MINUTES)); + onFailure(new IOException("failed to fetch pages")); + } + }; + transportService.getThreadPool().executor(ThreadPool.Names.GENERIC).execute(runnable); + }); + try { + scriptPermits.release(numberOfDocs()); // do not block Lucene operators + Client client = client(coordinator); + EsqlQueryRequest request = new EsqlQueryRequest(); + client().admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(Settings.builder().put("index.routing.allocation.include._name", dataNode).build()) + .get(); + ensureYellowAndNoInitializingShards("test"); + request.query("FROM test | LIMIT 10"); + request.pragmas(randomPragmas()); + PlainActionFuture future = new PlainActionFuture<>(); + client.execute(EsqlQueryAction.INSTANCE, request, future); + try { + List foundTasks = new ArrayList<>(); + assertBusy(() -> { + List tasks = client().admin() + .cluster() + .prepareListTasks() + .setActions(EsqlQueryAction.NAME) + .setDetailed(true) + .get() + .getTasks(); + assertThat(tasks, hasSize(1)); + foundTasks.addAll(tasks); + }); + String sessionId = foundTasks.get(0).taskId().toString(); + ExchangeService exchangeService = internalCluster().getInstance(ExchangeService.class, dataNode); + assertTrue(fetchingStarted.await(1, TimeUnit.MINUTES)); + ExchangeSinkHandler exchangeSink = exchangeService.getSinkHandler(sessionId); + if (randomBoolean()) { + // do not fail exchange requests when we have some pages + assertBusy(() -> assertThat(exchangeSink.bufferSize(), greaterThan(0))); + } + } finally { + allowedFetching.countDown(); + } + Exception failure = expectThrows(Exception.class, () -> future.actionGet().close()); + assertThat(failure.getMessage(), containsString("failed to fetch pages")); + } finally { + transportService.clearAllRules(); + } + } + + @Override + protected Collection> nodePlugins() { + return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index ba3d8564e133..90cbc018b77d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -690,6 +690,7 @@ public class ComputeService { dataNodeRequestExecutor.start(); // run the node-level reduction var externalSink = exchangeService.getSinkHandler(externalId); + task.addListener(() -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled()))); var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor); exchangeSource.addRemoteSink(internalSink::fetchPageAsync, 1); ActionListener reductionListener = cancelOnFailure(task, cancelled, refs.acquire());