From 936f3385b0b4485b1590ee4047076a6cebb7d1f8 Mon Sep 17 00:00:00 2001 From: Gal Lalouche Date: Thu, 12 Jun 2025 15:37:08 +0300 Subject: [PATCH] ESQL: Change queries ID to be the same as the async (#127472) This PR changes the list and query API for ESQL, such that the ID now follows the same format as async query IDs. This is saved as part of the task status. For async queries, this is easy, but for sync queries, this is slightly more complicated, since when creating them, we don't have access to a node ID. So instead, the status itself is just the doc ID portion of the async execution ID, which is used for salting, since this part needs to be consistent, so that when we list the queries, we can compute the async execution ID correctly. Also, I've removed the individual ID, node, and data node tags, as mentioned in the ticket. In addition, I've changed the accept and content-type to be JSON for lists. Resolves #127187 --- docs/changelog/127472.yaml | 6 ++ .../rest-api-spec/api/esql.list_queries.json | 6 +- .../elasticsearch/tasks/TaskAwareRequest.java | 19 ++++++ .../org/elasticsearch/tasks/TaskManager.java | 9 ++- .../xpack/core/async/AsyncExecutionId.java | 13 +++- .../core/async/AsyncExecutionIdWireTests.java | 34 ++++++++++ .../xpack/esql/EsqlSecurityIT.java | 16 +++-- .../src/main/resources/query_task.json | 3 + .../action/AbstractPausableIntegTestCase.java | 3 + .../esql/action/EsqlListQueriesActionIT.java | 65 +++++++++++-------- .../esql/action/EsqlGetQueryRequest.java | 20 +++--- .../xpack/esql/action/EsqlQueryRequest.java | 32 ++++++++- .../action/RestEsqlListQueriesAction.java | 7 +- .../esql/plugin/EsqlGetQueryResponse.java | 9 +-- .../esql/plugin/EsqlListQueriesResponse.java | 8 +-- .../xpack/esql/plugin/EsqlPlugin.java | 1 + .../xpack/esql/plugin/EsqlQueryStatus.java | 44 +++++++++++++ .../plugin/TransportEsqlGetQueryAction.java | 9 +-- .../TransportEsqlListQueriesAction.java | 2 +- .../esql/plugin/TransportEsqlQueryAction.java | 7 +- .../esql/action/EsqlQueryRequestTests.java | 7 +- .../rest-api-spec/test/esql/200_queries.yml | 6 +- 22 files changed, 244 insertions(+), 82 deletions(-) create mode 100644 docs/changelog/127472.yaml create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java diff --git a/docs/changelog/127472.yaml b/docs/changelog/127472.yaml new file mode 100644 index 000000000000..b91288f82bcd --- /dev/null +++ b/docs/changelog/127472.yaml @@ -0,0 +1,6 @@ +pr: 127472 +summary: Change queries ID to be the same as the async +area: ES|QL +type: feature +issues: + - 127187 diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json index 472e70a8766e..c4f5abcdcb7a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json @@ -7,10 +7,8 @@ "stability": "experimental", "visibility": "public", "headers": { - "accept": [], - "content_type": [ - "application/json" - ] + "accept": ["application/json"], + "content_type": ["application/json"] }, "url": { "paths": [ diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index 029e120ce644..de81e9a1a96e 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -9,6 +9,8 @@ package org.elasticsearch.tasks; +import org.elasticsearch.core.Nullable; + import java.util.Map; /** @@ -52,6 +54,23 @@ public interface TaskAwareRequest { return new Task(id, type, action, getDescription(), parentTaskId, headers); } + /** + * Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID. + */ + // TODO remove the above overload, use only this one. + default Task createTask( + // TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We + // We should fix the tests, and replace this and id with TaskId instead. + @Nullable String localNodeId, + long id, + String type, + String action, + TaskId parentTaskId, + Map headers + ) { + return createTask(id, type, action, parentTaskId, headers); + } + /** * Returns optional description of the request to be displayed by the task manager */ diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index d1cf4bd799e4..249bb1d43119 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -141,7 +141,14 @@ public class TaskManager implements ClusterStateApplier { headers.put(key, httpHeader); } } - Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); + Task task = request.createTask( + lastDiscoveryNodes.getLocalNodeId(), + taskIdGenerator.incrementAndGet(), + type, + action, + request.getParentTask(), + headers + ); Objects.requireNonNull(task); assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; if (logger.isTraceEnabled()) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java index 8316b4cfa605..2910064744d1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java @@ -10,6 +10,8 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -20,7 +22,7 @@ import java.util.Objects; /** * A class that contains all information related to a submitted async execution. */ -public final class AsyncExecutionId { +public final class AsyncExecutionId implements Writeable { public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id"; public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running"; @@ -115,4 +117,13 @@ public final class AsyncExecutionId { } return new AsyncExecutionId(docId, new TaskId(taskId), id); } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(getEncoded()); + } + + public static AsyncExecutionId readFrom(StreamInput input) throws IOException { + return decode(input.readString()); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java new file mode 100644 index 000000000000..dd78aeb49019 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class AsyncExecutionIdWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return AsyncExecutionId::readFrom; + } + + @Override + protected AsyncExecutionId createTestInstance() { + return new AsyncExecutionId(randomAlphaOfLength(15), new TaskId(randomAlphaOfLength(10), randomLong())); + } + + @Override + protected AsyncExecutionId mutateInstance(AsyncExecutionId instance) throws IOException { + return new AsyncExecutionId( + instance.getDocId(), + new TaskId(instance.getTaskId().getNodeId(), instance.getTaskId().getId() * 12345) + ); + } +} diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java index 759b65f7463a..6494a29e8307 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java @@ -916,19 +916,23 @@ public class EsqlSecurityIT extends ESRestTestCase { public void testGetQueryAllowed() throws Exception { // This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges. - Request request = new Request("GET", "_query/queries/foo:1234"); - var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); - assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404))); + setUser(GET_QUERY_REQUEST, "user_with_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST)); + assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(403))); } public void testGetQueryForbidden() throws Exception { - Request request = new Request("GET", "_query/queries/foo:1234"); - setUser(request, "user_without_monitor_privileges"); - var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); + setUser(GET_QUERY_REQUEST, "user_without_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST)); assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403)); assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]")); } + private static final Request GET_QUERY_REQUEST = new Request( + "GET", + "_query/queries/FmJKWHpFRi1OU0l5SU1YcnpuWWhoUWcZWDFuYUJBeW1TY0dKM3otWUs2bDJudzo1Mg==" + ); + private void createEnrichPolicy() throws Exception { createIndex("songs", Settings.EMPTY, """ "properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json index 1da628e0a3e8..f036eec21e9c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json @@ -3,6 +3,9 @@ "id" : 5326, "type" : "transport", "action" : "indices:data/read/esql", + "status" : { + "request_id" : "Ks5ApyqMTtWj5LrKigmCjQ" + }, "description" : "FROM test | STATS MAX(d) by a, b", <1> "start_time" : "2023-07-31T15:46:32.328Z", "start_time_in_millis" : 1690818392328, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java index 8054b260f006..86277b1c1cd2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit; public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase { protected static final Semaphore scriptPermits = new Semaphore(0); + // Incremented onWait. Can be used to check if the onWait process has been reached. + protected static final Semaphore scriptWaits = new Semaphore(0); protected int pageSize = -1; @@ -98,6 +100,7 @@ public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTes public static class PausableFieldPlugin extends AbstractPauseFieldPlugin { @Override protected boolean onWait() throws InterruptedException { + scriptWaits.release(); return scriptPermits.tryAcquire(1, TimeUnit.MINUTES); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java index 590f7efef803..c7072e217dd5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java @@ -7,24 +7,21 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.IntOrLongMatcher; import org.elasticsearch.test.MapMatcher; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.esql.EsqlTestUtils; -import java.util.List; +import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.core.TimeValue.timeValueSeconds; import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { private static final String QUERY = "from test | stats sum(pause_me)"; @@ -45,31 +42,10 @@ public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { try (var initialResponse = sendAsyncQuery()) { id = initialResponse.asyncExecutionId().get(); + assertRunningQueries(); var getResultsRequest = new GetAsyncResultRequest(id); getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1)); client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); - Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries")); - @SuppressWarnings("unchecked") - var listResult = (Map>) EsqlTestUtils.singleValue( - jsonEntityToMap(listResponse.getEntity()).values() - ); - var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet())); - MapMatcher basicMatcher = MapMatcher.matchesMap() - .entry("node", is(taskId.getNodeId())) - .entry("id", IntOrLongMatcher.matches(taskId.getId())) - .entry("query", is(QUERY)) - .entry("start_time_millis", IntOrLongMatcher.isIntOrLong()) - .entry("running_time_nanos", IntOrLongMatcher.isIntOrLong()); - MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); - - Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId)); - MapMatcher.assertMap( - jsonEntityToMap(getQueryResponse.getEntity()), - basicMatcher.entry("coordinating_node", isA(String.class)) - .entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class)))) - .entry("documents_found", IntOrLongMatcher.isIntOrLong()) - .entry("values_loaded", IntOrLongMatcher.isIntOrLong()) - ); } finally { if (id != null) { // Finish the query. @@ -82,9 +58,44 @@ public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { } } + public void testRunningQueriesSync() throws Exception { + var future = sendSyncQueryAsyncly(); + try { + scriptWaits.acquire(); + assertRunningQueries(); + } finally { + scriptPermits.release(numberOfDocs()); + future.actionGet(timeValueSeconds(60)).close(); + } + } + + private static void assertRunningQueries() throws IOException { + Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries")); + @SuppressWarnings("unchecked") + var listResult = (Map>) EsqlTestUtils.singleValue(jsonEntityToMap(listResponse.getEntity()).values()); + String queryId = EsqlTestUtils.singleValue(listResult.keySet()); + MapMatcher basicMatcher = MapMatcher.matchesMap() + .entry("query", is(QUERY)) + .entry("start_time_millis", IntOrLongMatcher.isIntOrLong()) + .entry("running_time_nanos", IntOrLongMatcher.isIntOrLong()); + MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); + + Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId)); + MapMatcher.assertMap( + jsonEntityToMap(getQueryResponse.getEntity()), + basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong()) + ); + } + private EsqlQueryResponse sendAsyncQuery() { scriptPermits.drainPermits(); scriptPermits.release(between(1, 5)); return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS); } + + private ActionFuture sendSyncQueryAsyncly() { + scriptPermits.drainPermits(); + scriptPermits.release(between(1, 5)); + return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute(); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java index e68626609d94..96e2ce341b0d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java @@ -7,33 +7,33 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.LegacyActionRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import java.io.IOException; -public class EsqlGetQueryRequest extends LegacyActionRequest { - private final TaskId id; +public class EsqlGetQueryRequest extends ActionRequest { + private final AsyncExecutionId asyncExecutionId; - public EsqlGetQueryRequest(TaskId id) { - this.id = id; + public EsqlGetQueryRequest(AsyncExecutionId asyncExecutionId) { + this.asyncExecutionId = asyncExecutionId; } - public TaskId id() { - return id; + public AsyncExecutionId id() { + return asyncExecutionId; } public EsqlGetQueryRequest(StreamInput streamInput) throws IOException { super(streamInput); - id = TaskId.readFromStream(streamInput); + asyncExecutionId = AsyncExecutionId.decode(streamInput.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeWriteable(id); + out.writeWriteable(asyncExecutionId); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index bd0a7635a465..d3073d30bfaa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -11,6 +11,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -20,8 +21,10 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.Column; import org.elasticsearch.xpack.esql.parser.QueryParams; +import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; @@ -242,9 +245,32 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - // Pass the query as the description - return new CancellableTask(id, type, action, query, parentTaskId, headers); + public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { + var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(localNodeId, id))); + return new EsqlQueryRequestTask(query, id, type, action, parentTaskId, headers, status); + } + + private static class EsqlQueryRequestTask extends CancellableTask { + private final Status status; + + EsqlQueryRequestTask( + String query, + long id, + String type, + String action, + TaskId parentTaskId, + Map headers, + EsqlQueryStatus status + ) { + // Pass the query as the description + super(id, type, action, query, parentTaskId, headers); + this.status = status; + } + + @Override + public Status getStatus() { + return status; + } } // Setter for tests diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java index 2076ae3b1aa5..ae91b7207f25 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java @@ -15,9 +15,8 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; -import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -37,7 +36,7 @@ public class RestEsqlListQueriesAction extends BaseRestHandler { } @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { return restChannelConsumer(request, client); } @@ -46,7 +45,7 @@ public class RestEsqlListQueriesAction extends BaseRestHandler { String id = request.param("id"); var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE; - var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest(); + var actionRequest = id != null ? new EsqlGetQueryRequest(AsyncExecutionId.decode(id)) : new EsqlListQueriesRequest(); return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java index b8679bb95c46..b3d4c543f3f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java @@ -14,7 +14,6 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.List; public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject { // This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task. @@ -24,22 +23,16 @@ public class EsqlGetQueryResponse extends ActionResponse implements ToXContentOb long runningTimeNanos, long documentsFound, long valuesLoaded, - String query, - String coordinatingNode, - List dataNodes + String query ) implements ToXContentObject { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("id", id.getId()); - builder.field("node", id.getNodeId()); builder.field("start_time_millis", startTimeMillis); builder.field("running_time_nanos", runningTimeNanos); builder.field("documents_found", documentsFound); builder.field("values_loaded", valuesLoaded); builder.field("query", query); - builder.field("coordinating_node", coordinatingNode); - builder.field("data_nodes", dataNodes); builder.endObject(); return builder; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java index 383dea8d82d9..a592f7bdb256 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java @@ -9,10 +9,10 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import java.io.IOException; import java.util.List; @@ -20,12 +20,10 @@ import java.util.List; public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject { private final List queries; - public record Query(TaskId taskId, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment { + public record Query(AsyncExecutionId id, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(taskId.toString()); - builder.field("id", taskId.getId()); - builder.field("node", taskId.getNodeId()); + builder.startObject(id.getEncoded()); builder.field("start_time_millis", startTimeMillis); builder.field("running_time_nanos", runningTimeNanos); builder.field("query", query); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 6ba7c9747bf0..33178d08b752 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -300,6 +300,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { entries.add(AbstractPageMappingOperator.Status.ENTRY); entries.add(AbstractPageMappingToIteratorOperator.Status.ENTRY); entries.add(AggregationOperator.Status.ENTRY); + entries.add(EsqlQueryStatus.ENTRY); entries.add(ExchangeSinkOperator.Status.ENTRY); entries.add(ExchangeSourceOperator.Status.ENTRY); entries.add(HashAggregationOperator.Status.ENTRY); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java new file mode 100644 index 000000000000..bb45b1bf4194 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; + +import java.io.IOException; + +public record EsqlQueryStatus(AsyncExecutionId id) implements Task.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Task.Status.class, + "EsqlDocIdStatus", + EsqlQueryStatus::new + ); + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + private EsqlQueryStatus(StreamInput stream) throws IOException { + this(AsyncExecutionId.readFrom(stream)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + id.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field("request_id", id.getEncoded()).endObject(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java index 038f9d3df0c6..a7ebff711c75 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java @@ -49,7 +49,7 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction() { @Override public void onResponse(GetTaskResponse response) { @@ -64,7 +64,7 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction() { @Override public void onResponse(ListTasksResponse response) { @@ -91,7 +91,6 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction