diff --git a/docs/changelog/106036.yaml b/docs/changelog/106036.yaml new file mode 100644 index 000000000000..7b129c6c0a7a --- /dev/null +++ b/docs/changelog/106036.yaml @@ -0,0 +1,5 @@ +pr: 106036 +summary: Add status for enrich operator +area: ES|QL +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index ec3971a48a64..29dec8087578 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -137,6 +137,7 @@ public class TransportVersions { public static final TransportVersion ESQL_TIMINGS = def(8_597_00_0); public static final TransportVersion DATA_STREAM_AUTO_SHARDING_EVENT = def(8_598_00_0); public static final TransportVersion ADD_FAILURE_STORE_INDICES_OPTIONS = def(8_599_00_0); + public static final TransportVersion ESQL_ENRICH_OPERATOR_STATUS = def(8_600_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java index bcab6a39496f..061cefc86bed 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AsyncOperator.java @@ -8,16 +8,27 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.SubscribableListener; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.xcontent.XContentBuilder; +import java.io.IOException; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; /** * {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}. @@ -33,6 +44,7 @@ public abstract class AsyncOperator implements Operator { private final DriverContext driverContext; private final int maxOutstandingRequests; + private final LongAdder totalTimeInNanos = new LongAdder(); private boolean finished = false; private volatile boolean closed = false; @@ -81,7 +93,11 @@ public abstract class AsyncOperator implements Operator { onFailure(e); onSeqNoCompleted(seqNo); }); - performAsync(input, ActionListener.runAfter(listener, driverContext::removeAsyncAction)); + final long startNanos = System.nanoTime(); + performAsync(input, ActionListener.runAfter(listener, () -> { + driverContext.removeAsyncAction(); + totalTimeInNanos.add(System.nanoTime() - startNanos); + })); success = true; } finally { if (success == false) { @@ -224,4 +240,107 @@ public abstract class AsyncOperator implements Operator { return blockedFuture; } } + + @Override + public final Operator.Status status() { + return status( + Math.max(0L, checkpoint.getMaxSeqNo()), + Math.max(0L, checkpoint.getProcessedCheckpoint()), + TimeValue.timeValueNanos(totalTimeInNanos.sum()).millis() + ); + } + + protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) { + return new Status(receivedPages, completedPages, totalTimeInMillis); + } + + public static class Status implements Operator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "async_operator", + Status::new + ); + + final long receivedPages; + final long completedPages; + final long totalTimeInMillis; + + protected Status(long receivedPages, long completedPages, long totalTimeInMillis) { + this.receivedPages = receivedPages; + this.completedPages = completedPages; + this.totalTimeInMillis = totalTimeInMillis; + } + + protected Status(StreamInput in) throws IOException { + this.receivedPages = in.readVLong(); + this.completedPages = in.readVLong(); + this.totalTimeInMillis = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(receivedPages); + out.writeVLong(completedPages); + out.writeVLong(totalTimeInMillis); + } + + public long receivedPages() { + return receivedPages; + } + + public long completedPages() { + return completedPages; + } + + public long totalTimeInMillis() { + return totalTimeInMillis; + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder); + return builder.endObject(); + } + + protected final XContentBuilder innerToXContent(XContentBuilder builder) throws IOException { + builder.field("received_pages", receivedPages); + builder.field("completed_pages", completedPages); + builder.field("total_time_in_millis", totalTimeInMillis); + if (totalTimeInMillis >= 0) { + builder.field("total_time", TimeValue.timeValueMillis(totalTimeInMillis)); + } + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return receivedPages == status.receivedPages + && completedPages == status.completedPages + && totalTimeInMillis == status.totalTimeInMillis; + } + + @Override + public int hashCode() { + return Objects.hash(receivedPages, completedPages, totalTimeInMillis); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.ESQL_ENRICH_OPERATOR_STATUS; + } + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java index 5f6e1ed12e20..00c377154086 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java @@ -102,7 +102,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject { return iterations; } - List operators() { + public List operators() { return operators; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java new file mode 100644 index 000000000000..ab2dcc5e6c44 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorStatusTests.java @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class AsyncOperatorStatusTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return AsyncOperator.Status::new; + } + + @Override + protected AsyncOperator.Status createTestInstance() { + return new AsyncOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomLongBetween(1, TimeValue.timeValueHours(1).millis()) + ); + } + + @Override + protected AsyncOperator.Status mutateInstance(AsyncOperator.Status in) throws IOException { + int field = randomIntBetween(0, 2); + return switch (field) { + case 0 -> new AsyncOperator.Status( + randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong), + in.completedPages(), + in.totalTimeInMillis() + ); + case 1 -> new AsyncOperator.Status( + in.receivedPages(), + randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong), + in.totalTimeInMillis() + ); + case 2 -> new AsyncOperator.Status( + in.receivedPages(), + in.completedPages(), + randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong) + ); + default -> throw new AssertionError("unknown "); + }; + } + + public void testToXContent() { + var status = new AsyncOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis()); + String json = Strings.toString(status, true, true); + assertThat(json, equalTo(""" + { + "received_pages" : 100, + "completed_pages" : 50, + "total_time_in_millis" : 10000, + "total_time" : "10s" + }""")); + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java index 09f20d7ca4ff..2b59e6dd1957 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClustersEnrichIT.java @@ -460,6 +460,9 @@ public class CrossClustersEnrichIT extends AbstractMultiClustersTestCase { EsqlQueryRequest request = new EsqlQueryRequest(); request.query(query); request.pragmas(AbstractEsqlIntegTestCase.randomPragmas()); + if (randomBoolean()) { + request.profile(true); + } return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS); } diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java index a589e1cc468a..3bb6bb35b521 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EnrichIT.java @@ -18,6 +18,8 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.operator.DriverProfile; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.exchange.ExchangeService; import org.elasticsearch.core.TimeValue; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; @@ -58,12 +60,15 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static java.util.Collections.emptyList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.transport.AbstractSimpleTransportTestCase.IGNORE_DESERIALIZATION_ERRORS_SETTING; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.not; public class EnrichIT extends AbstractEsqlIntegTestCase { @@ -121,6 +126,9 @@ public class EnrichIT extends AbstractEsqlIntegTestCase { } else { client = client(); } + if (request.profile() == false && randomBoolean()) { + request.profile(true); + } if (randomBoolean()) { setRequestCircuitBreakerLimit(ByteSizeValue.ofBytes(between(256, 4096))); try { @@ -318,6 +326,27 @@ public class EnrichIT extends AbstractEsqlIntegTestCase { } } + public void testProfile() { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.pragmas(randomPragmas()); + request.query("from listens* | sort timestamp DESC | limit 1 | " + enrichSongCommand() + " | KEEP timestamp, artist"); + request.profile(true); + try (var resp = run(request)) { + Iterator row = resp.values().next(); + assertThat(row.next(), equalTo(7L)); + assertThat(row.next(), equalTo("Linkin Park")); + EsqlQueryResponse.Profile profile = resp.profile(); + assertNotNull(profile); + List drivers = profile.drivers(); + assertThat(drivers.size(), greaterThanOrEqualTo(2)); + List enrichOperators = drivers.stream() + .flatMap(d -> d.operators().stream()) + .filter(status -> status.operator().startsWith("EnrichOperator")) + .toList(); + assertThat(enrichOperators, not(emptyList())); + } + } + /** * Some enrich queries that could fail without the PushDownEnrich rule. */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java index 844cfde28607..2d433f073206 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/EnrichLookupOperator.java @@ -8,15 +8,21 @@ package org.elasticsearch.xpack.esql.enrich; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xpack.ql.expression.NamedExpression; +import java.io.IOException; import java.util.List; +import java.util.Objects; public final class EnrichLookupOperator extends AsyncOperator { private final EnrichLookupService enrichLookupService; @@ -27,6 +33,7 @@ public final class EnrichLookupOperator extends AsyncOperator { private final String matchType; private final String matchField; private final List enrichFields; + private long totalTerms = 0L; public record Factory( String sessionId, @@ -95,6 +102,7 @@ public final class EnrichLookupOperator extends AsyncOperator { @Override protected void performAsync(Page inputPage, ActionListener listener) { final Block inputBlock = inputPage.getBlock(inputChannel); + totalTerms += inputBlock.getTotalValueCount(); enrichLookupService.lookupAsync( sessionId, parentTask, @@ -107,9 +115,83 @@ public final class EnrichLookupOperator extends AsyncOperator { ); } + @Override + public String toString() { + return "EnrichOperator[index=" + + enrichIndex + + " match_field=" + + matchField + + " enrich_fields=" + + enrichFields + + " inputChannel=" + + inputChannel + + "]"; + } + @Override protected void doClose() { // TODO: Maybe create a sub-task as the parent task of all the lookup tasks // then cancel it when this operator terminates early (e.g., have enough result). } + + @Override + protected Operator.Status status(long receivedPages, long completedPages, long totalTimeInMillis) { + return new EnrichLookupOperator.Status(receivedPages, completedPages, totalTimeInMillis, totalTerms); + } + + public static class Status extends AsyncOperator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "enrich", + Status::new + ); + + final long totalTerms; + + Status(long receivedPages, long completedPages, long totalTimeInMillis, long totalTerms) { + super(receivedPages, completedPages, totalTimeInMillis); + this.totalTerms = totalTerms; + } + + Status(StreamInput in) throws IOException { + super(in); + this.totalTerms = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(totalTerms); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + innerToXContent(builder); + builder.field("total_terms", totalTerms); + return builder.endObject(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass() || super.equals(o) == false) { + return false; + } + Status status = (Status) o; + return totalTerms == status.totalTerms; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), totalTerms); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index d1bcac4e399e..61f0393c8094 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -26,6 +26,7 @@ import org.elasticsearch.compute.lucene.LuceneOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.AggregationOperator; +import org.elasticsearch.compute.operator.AsyncOperator; import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.LimitOperator; @@ -52,6 +53,7 @@ import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction; import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction; import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction; import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction; +import org.elasticsearch.xpack.esql.enrich.EnrichLookupOperator; import org.elasticsearch.xpack.esql.execution.PlanExecutor; import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery; import org.elasticsearch.xpack.esql.session.EsqlIndexResolver; @@ -176,7 +178,9 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { TopNOperatorStatus.ENTRY, MvExpandOperator.Status.ENTRY, ValuesSourceReaderOperator.Status.ENTRY, - SingleValueQuery.ENTRY + SingleValueQuery.ENTRY, + AsyncOperator.Status.ENTRY, + EnrichLookupOperator.Status.ENTRY ).stream(), Block.getNamedWriteables().stream() ).toList(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java new file mode 100644 index 000000000000..4fc67f85cc06 --- /dev/null +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/enrich/EnrichOperatorStatusTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.enrich; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class EnrichOperatorStatusTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return EnrichLookupOperator.Status::new; + } + + @Override + protected EnrichLookupOperator.Status createTestInstance() { + return new EnrichLookupOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomLongBetween(1, TimeValue.timeValueHours(1).millis()) + ); + } + + @Override + protected EnrichLookupOperator.Status mutateInstance(EnrichLookupOperator.Status in) throws IOException { + int field = randomIntBetween(0, 3); + return switch (field) { + case 0 -> new EnrichLookupOperator.Status( + randomValueOtherThan(in.receivedPages(), ESTestCase::randomNonNegativeLong), + in.completedPages(), + in.totalTerms, + in.totalTimeInMillis() + ); + case 1 -> new EnrichLookupOperator.Status( + in.receivedPages(), + randomValueOtherThan(in.completedPages(), ESTestCase::randomNonNegativeLong), + in.totalTerms, + in.totalTimeInMillis() + ); + case 2 -> new EnrichLookupOperator.Status( + in.receivedPages(), + in.completedPages(), + randomValueOtherThan(in.totalTerms, ESTestCase::randomNonNegativeLong), + in.totalTimeInMillis() + ); + case 3 -> new EnrichLookupOperator.Status( + in.receivedPages(), + in.completedPages(), + in.totalTerms, + randomValueOtherThan(in.totalTimeInMillis(), ESTestCase::randomNonNegativeLong) + ); + default -> throw new AssertionError("unknown "); + }; + } + + public void testToXContent() { + var status = new EnrichLookupOperator.Status(100, 50, TimeValue.timeValueSeconds(10).millis(), 120); + String json = Strings.toString(status, true, true); + assertThat(json, equalTo(""" + { + "received_pages" : 100, + "completed_pages" : 50, + "total_time_in_millis" : 10000, + "total_time" : "10s", + "total_terms" : 120 + }""")); + } +}