diff --git a/docs/changelog/127472.yaml b/docs/changelog/127472.yaml new file mode 100644 index 000000000000..b91288f82bcd --- /dev/null +++ b/docs/changelog/127472.yaml @@ -0,0 +1,6 @@ +pr: 127472 +summary: Change queries ID to be the same as the async +area: ES|QL +type: feature +issues: + - 127187 diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json index 472e70a8766e..c4f5abcdcb7a 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/esql.list_queries.json @@ -7,10 +7,8 @@ "stability": "experimental", "visibility": "public", "headers": { - "accept": [], - "content_type": [ - "application/json" - ] + "accept": ["application/json"], + "content_type": ["application/json"] }, "url": { "paths": [ diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java index 029e120ce644..de81e9a1a96e 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskAwareRequest.java @@ -9,6 +9,8 @@ package org.elasticsearch.tasks; +import org.elasticsearch.core.Nullable; + import java.util.Map; /** @@ -52,6 +54,23 @@ public interface TaskAwareRequest { return new Task(id, type, action, getDescription(), parentTaskId, headers); } + /** + * Returns the task object that should be used to keep track of the processing of the request, with an extra local node ID. + */ + // TODO remove the above overload, use only this one. + default Task createTask( + // TODO this is only nullable in tests, where the MockNode does not guarantee the localNodeId is set before calling this method. We + // We should fix the tests, and replace this and id with TaskId instead. + @Nullable String localNodeId, + long id, + String type, + String action, + TaskId parentTaskId, + Map headers + ) { + return createTask(id, type, action, parentTaskId, headers); + } + /** * Returns optional description of the request to be displayed by the task manager */ diff --git a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java index d1cf4bd799e4..249bb1d43119 100644 --- a/server/src/main/java/org/elasticsearch/tasks/TaskManager.java +++ b/server/src/main/java/org/elasticsearch/tasks/TaskManager.java @@ -141,7 +141,14 @@ public class TaskManager implements ClusterStateApplier { headers.put(key, httpHeader); } } - Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers); + Task task = request.createTask( + lastDiscoveryNodes.getLocalNodeId(), + taskIdGenerator.incrementAndGet(), + type, + action, + request.getParentTask(), + headers + ); Objects.requireNonNull(task); assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId"; if (logger.isTraceEnabled()) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java index 8316b4cfa605..2910064744d1 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncExecutionId.java @@ -10,6 +10,8 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.TaskId; import java.io.IOException; @@ -20,7 +22,7 @@ import java.util.Objects; /** * A class that contains all information related to a submitted async execution. */ -public final class AsyncExecutionId { +public final class AsyncExecutionId implements Writeable { public static final String ASYNC_EXECUTION_ID_HEADER = "X-Elasticsearch-Async-Id"; public static final String ASYNC_EXECUTION_IS_RUNNING_HEADER = "X-Elasticsearch-Async-Is-Running"; @@ -115,4 +117,13 @@ public final class AsyncExecutionId { } return new AsyncExecutionId(docId, new TaskId(taskId), id); } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(getEncoded()); + } + + public static AsyncExecutionId readFrom(StreamInput input) throws IOException { + return decode(input.readString()); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java new file mode 100644 index 000000000000..dd78aeb49019 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncExecutionIdWireTests.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.async; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.io.IOException; + +public class AsyncExecutionIdWireTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return AsyncExecutionId::readFrom; + } + + @Override + protected AsyncExecutionId createTestInstance() { + return new AsyncExecutionId(randomAlphaOfLength(15), new TaskId(randomAlphaOfLength(10), randomLong())); + } + + @Override + protected AsyncExecutionId mutateInstance(AsyncExecutionId instance) throws IOException { + return new AsyncExecutionId( + instance.getDocId(), + new TaskId(instance.getTaskId().getNodeId(), instance.getTaskId().getId() * 12345) + ); + } +} diff --git a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java index 759b65f7463a..6494a29e8307 100644 --- a/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java +++ b/x-pack/plugin/esql/qa/security/src/javaRestTest/java/org/elasticsearch/xpack/esql/EsqlSecurityIT.java @@ -916,19 +916,23 @@ public class EsqlSecurityIT extends ESRestTestCase { public void testGetQueryAllowed() throws Exception { // This is a bit tricky, since there is no such running query. We just make sure it didn't fail on forbidden privileges. - Request request = new Request("GET", "_query/queries/foo:1234"); - var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); - assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(404))); + setUser(GET_QUERY_REQUEST, "user_with_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST)); + assertThat(resp.getResponse().getStatusLine().getStatusCode(), not(equalTo(403))); } public void testGetQueryForbidden() throws Exception { - Request request = new Request("GET", "_query/queries/foo:1234"); - setUser(request, "user_without_monitor_privileges"); - var resp = expectThrows(ResponseException.class, () -> client().performRequest(request)); + setUser(GET_QUERY_REQUEST, "user_without_monitor_privileges"); + var resp = expectThrows(ResponseException.class, () -> client().performRequest(GET_QUERY_REQUEST)); assertThat(resp.getResponse().getStatusLine().getStatusCode(), equalTo(403)); assertThat(resp.getMessage(), containsString("this action is granted by the cluster privileges [monitor_esql,monitor,manage,all]")); } + private static final Request GET_QUERY_REQUEST = new Request( + "GET", + "_query/queries/FmJKWHpFRi1OU0l5SU1YcnpuWWhoUWcZWDFuYUJBeW1TY0dKM3otWUs2bDJudzo1Mg==" + ); + private void createEnrichPolicy() throws Exception { createIndex("songs", Settings.EMPTY, """ "properties":{"song_id": {"type": "keyword"}, "title": {"type": "keyword"}, "artist": {"type": "keyword"} } diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json index 1da628e0a3e8..f036eec21e9c 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/query_task.json @@ -3,6 +3,9 @@ "id" : 5326, "type" : "transport", "action" : "indices:data/read/esql", + "status" : { + "request_id" : "Ks5ApyqMTtWj5LrKigmCjQ" + }, "description" : "FROM test | STATS MAX(d) by a, b", <1> "start_time" : "2023-07-31T15:46:32.328Z", "start_time_in_millis" : 1690818392328, diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java index 8054b260f006..86277b1c1cd2 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit; public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase { protected static final Semaphore scriptPermits = new Semaphore(0); + // Incremented onWait. Can be used to check if the onWait process has been reached. + protected static final Semaphore scriptWaits = new Semaphore(0); protected int pageSize = -1; @@ -98,6 +100,7 @@ public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTes public static class PausableFieldPlugin extends AbstractPauseFieldPlugin { @Override protected boolean onWait() throws InterruptedException { + scriptWaits.release(); return scriptPermits.tryAcquire(1, TimeUnit.MINUTES); } } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java index 590f7efef803..c7072e217dd5 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlListQueriesActionIT.java @@ -7,24 +7,21 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.test.IntOrLongMatcher; import org.elasticsearch.test.MapMatcher; import org.elasticsearch.xpack.core.async.GetAsyncResultRequest; import org.elasticsearch.xpack.esql.EsqlTestUtils; -import java.util.List; +import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.core.TimeValue.timeValueSeconds; import static org.elasticsearch.xpack.esql.EsqlTestUtils.jsonEntityToMap; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { private static final String QUERY = "from test | stats sum(pause_me)"; @@ -45,31 +42,10 @@ public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { try (var initialResponse = sendAsyncQuery()) { id = initialResponse.asyncExecutionId().get(); + assertRunningQueries(); var getResultsRequest = new GetAsyncResultRequest(id); getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(1)); client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).get().close(); - Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries")); - @SuppressWarnings("unchecked") - var listResult = (Map>) EsqlTestUtils.singleValue( - jsonEntityToMap(listResponse.getEntity()).values() - ); - var taskId = new TaskId(EsqlTestUtils.singleValue(listResult.keySet())); - MapMatcher basicMatcher = MapMatcher.matchesMap() - .entry("node", is(taskId.getNodeId())) - .entry("id", IntOrLongMatcher.matches(taskId.getId())) - .entry("query", is(QUERY)) - .entry("start_time_millis", IntOrLongMatcher.isIntOrLong()) - .entry("running_time_nanos", IntOrLongMatcher.isIntOrLong()); - MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); - - Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + taskId)); - MapMatcher.assertMap( - jsonEntityToMap(getQueryResponse.getEntity()), - basicMatcher.entry("coordinating_node", isA(String.class)) - .entry("data_nodes", allOf(isA(List.class), everyItem(isA(String.class)))) - .entry("documents_found", IntOrLongMatcher.isIntOrLong()) - .entry("values_loaded", IntOrLongMatcher.isIntOrLong()) - ); } finally { if (id != null) { // Finish the query. @@ -82,9 +58,44 @@ public class EsqlListQueriesActionIT extends AbstractPausableIntegTestCase { } } + public void testRunningQueriesSync() throws Exception { + var future = sendSyncQueryAsyncly(); + try { + scriptWaits.acquire(); + assertRunningQueries(); + } finally { + scriptPermits.release(numberOfDocs()); + future.actionGet(timeValueSeconds(60)).close(); + } + } + + private static void assertRunningQueries() throws IOException { + Response listResponse = getRestClient().performRequest(new Request("GET", "/_query/queries")); + @SuppressWarnings("unchecked") + var listResult = (Map>) EsqlTestUtils.singleValue(jsonEntityToMap(listResponse.getEntity()).values()); + String queryId = EsqlTestUtils.singleValue(listResult.keySet()); + MapMatcher basicMatcher = MapMatcher.matchesMap() + .entry("query", is(QUERY)) + .entry("start_time_millis", IntOrLongMatcher.isIntOrLong()) + .entry("running_time_nanos", IntOrLongMatcher.isIntOrLong()); + MapMatcher.assertMap(EsqlTestUtils.singleValue(listResult.values()), basicMatcher); + + Response getQueryResponse = getRestClient().performRequest(new Request("GET", "/_query/queries/" + queryId)); + MapMatcher.assertMap( + jsonEntityToMap(getQueryResponse.getEntity()), + basicMatcher.entry("documents_found", IntOrLongMatcher.isIntOrLong()).entry("values_loaded", IntOrLongMatcher.isIntOrLong()) + ); + } + private EsqlQueryResponse sendAsyncQuery() { scriptPermits.drainPermits(); scriptPermits.release(between(1, 5)); return EsqlQueryRequestBuilder.newAsyncEsqlQueryRequestBuilder(client()).query(QUERY).execute().actionGet(60, TimeUnit.SECONDS); } + + private ActionFuture sendSyncQueryAsyncly() { + scriptPermits.drainPermits(); + scriptPermits.release(between(1, 5)); + return EsqlQueryRequestBuilder.newSyncEsqlQueryRequestBuilder(client()).query(QUERY).execute(); + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java index e68626609d94..96e2ce341b0d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlGetQueryRequest.java @@ -7,33 +7,33 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.LegacyActionRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import java.io.IOException; -public class EsqlGetQueryRequest extends LegacyActionRequest { - private final TaskId id; +public class EsqlGetQueryRequest extends ActionRequest { + private final AsyncExecutionId asyncExecutionId; - public EsqlGetQueryRequest(TaskId id) { - this.id = id; + public EsqlGetQueryRequest(AsyncExecutionId asyncExecutionId) { + this.asyncExecutionId = asyncExecutionId; } - public TaskId id() { - return id; + public AsyncExecutionId id() { + return asyncExecutionId; } public EsqlGetQueryRequest(StreamInput streamInput) throws IOException { super(streamInput); - id = TaskId.readFromStream(streamInput); + asyncExecutionId = AsyncExecutionId.decode(streamInput.readString()); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeWriteable(id); + out.writeWriteable(asyncExecutionId); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java index bd0a7635a465..d3073d30bfaa 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java @@ -11,6 +11,7 @@ import org.elasticsearch.Build; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.CompositeIndicesRequest; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; @@ -20,8 +21,10 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import org.elasticsearch.xpack.esql.Column; import org.elasticsearch.xpack.esql.parser.QueryParams; +import org.elasticsearch.xpack.esql.plugin.EsqlQueryStatus; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; @@ -242,9 +245,32 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E } @Override - public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - // Pass the query as the description - return new CancellableTask(id, type, action, query, parentTaskId, headers); + public Task createTask(String localNodeId, long id, String type, String action, TaskId parentTaskId, Map headers) { + var status = new EsqlQueryStatus(new AsyncExecutionId(UUIDs.randomBase64UUID(), new TaskId(localNodeId, id))); + return new EsqlQueryRequestTask(query, id, type, action, parentTaskId, headers, status); + } + + private static class EsqlQueryRequestTask extends CancellableTask { + private final Status status; + + EsqlQueryRequestTask( + String query, + long id, + String type, + String action, + TaskId parentTaskId, + Map headers, + EsqlQueryStatus status + ) { + // Pass the query as the description + super(id, type, action, query, parentTaskId, headers); + this.status = status; + } + + @Override + public Status getStatus() { + return status; + } } // Setter for tests diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java index 2076ae3b1aa5..ae91b7207f25 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlListQueriesAction.java @@ -15,9 +15,8 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; import org.elasticsearch.rest.ServerlessScope; import org.elasticsearch.rest.action.RestToXContentListener; -import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; -import java.io.IOException; import java.util.List; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -37,7 +36,7 @@ public class RestEsqlListQueriesAction extends BaseRestHandler { } @Override - protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { return restChannelConsumer(request, client); } @@ -46,7 +45,7 @@ public class RestEsqlListQueriesAction extends BaseRestHandler { String id = request.param("id"); var action = id != null ? EsqlGetQueryAction.INSTANCE : EsqlListQueriesAction.INSTANCE; - var actionRequest = id != null ? new EsqlGetQueryRequest(new TaskId(id)) : new EsqlListQueriesRequest(); + var actionRequest = id != null ? new EsqlGetQueryRequest(AsyncExecutionId.decode(id)) : new EsqlListQueriesRequest(); return channel -> client.execute(action, actionRequest, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java index b8679bb95c46..b3d4c543f3f8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlGetQueryResponse.java @@ -14,7 +14,6 @@ import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.List; public class EsqlGetQueryResponse extends ActionResponse implements ToXContentObject { // This is rather limited at the moment, as we don't extract information such as CPU and memory usage, owning user, etc. for the task. @@ -24,22 +23,16 @@ public class EsqlGetQueryResponse extends ActionResponse implements ToXContentOb long runningTimeNanos, long documentsFound, long valuesLoaded, - String query, - String coordinatingNode, - List dataNodes + String query ) implements ToXContentObject { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("id", id.getId()); - builder.field("node", id.getNodeId()); builder.field("start_time_millis", startTimeMillis); builder.field("running_time_nanos", runningTimeNanos); builder.field("documents_found", documentsFound); builder.field("values_loaded", valuesLoaded); builder.field("query", query); - builder.field("coordinating_node", coordinatingNode); - builder.field("data_nodes", dataNodes); builder.endObject(); return builder; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java index 383dea8d82d9..a592f7bdb256 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlListQueriesResponse.java @@ -9,10 +9,10 @@ package org.elasticsearch.xpack.esql.plugin; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.tasks.TaskId; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; import java.io.IOException; import java.util.List; @@ -20,12 +20,10 @@ import java.util.List; public class EsqlListQueriesResponse extends ActionResponse implements ToXContentObject { private final List queries; - public record Query(TaskId taskId, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment { + public record Query(AsyncExecutionId id, long startTimeMillis, long runningTimeNanos, String query) implements ToXContentFragment { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(taskId.toString()); - builder.field("id", taskId.getId()); - builder.field("node", taskId.getNodeId()); + builder.startObject(id.getEncoded()); builder.field("start_time_millis", startTimeMillis); builder.field("running_time_nanos", runningTimeNanos); builder.field("query", query); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 6ba7c9747bf0..33178d08b752 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -300,6 +300,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { entries.add(AbstractPageMappingOperator.Status.ENTRY); entries.add(AbstractPageMappingToIteratorOperator.Status.ENTRY); entries.add(AggregationOperator.Status.ENTRY); + entries.add(EsqlQueryStatus.ENTRY); entries.add(ExchangeSinkOperator.Status.ENTRY); entries.add(ExchangeSourceOperator.Status.ENTRY); entries.add(HashAggregationOperator.Status.ENTRY); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java new file mode 100644 index 000000000000..bb45b1bf4194 --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlQueryStatus.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plugin; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xpack.core.async.AsyncExecutionId; + +import java.io.IOException; + +public record EsqlQueryStatus(AsyncExecutionId id) implements Task.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Task.Status.class, + "EsqlDocIdStatus", + EsqlQueryStatus::new + ); + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + private EsqlQueryStatus(StreamInput stream) throws IOException { + this(AsyncExecutionId.readFrom(stream)); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + id.writeTo(out); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder.startObject().field("request_id", id.getEncoded()).endObject(); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java index 038f9d3df0c6..a7ebff711c75 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlGetQueryAction.java @@ -49,7 +49,7 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction() { @Override public void onResponse(GetTaskResponse response) { @@ -64,7 +64,7 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction() { @Override public void onResponse(ListTasksResponse response) { @@ -91,7 +91,6 @@ public class TransportEsqlGetQueryAction extends HandledTransportAction