From b7ab8f8bb7466223b2bc5b872f56eaff033c7e2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Wed, 15 Jan 2025 16:30:41 +0100 Subject: [PATCH] ESQL: Add row counts to profile results (#120134) Closes https://github.com/elastic/elasticsearch/issues/119969 - Rename "pages_in/out" to "pages_received/emitted", to standardize the name along most operators - **There are still "pages_processed" operators**, maybe it would make sense to also rename those? - Add "pages_received/emitted" to TopN operator, as it was missing that - Added "rows_received/emitted" to most operators - Added a test to ensure all operators with status provide those metrics --- benchmarks/build.gradle | 3 +- .../org/elasticsearch/TransportVersions.java | 1 + .../compute/lucene/LuceneCountOperator.java | 1 - .../compute/lucene/LuceneMinMaxOperator.java | 1 - .../compute/lucene/LuceneOperator.java | 35 ++++++- .../compute/lucene/LuceneSourceOperator.java | 1 - .../lucene/LuceneTopNSourceOperator.java | 1 - .../lucene/ValuesSourceReaderOperator.java | 8 +- .../operator/AbstractPageMappingOperator.java | 52 ++++++++-- ...AbstractPageMappingToIteratorOperator.java | 63 ++++++++++-- .../compute/operator/AggregationOperator.java | 59 +++++++++++- .../operator/HashAggregationOperator.java | 65 ++++++++++++- .../compute/operator/LimitOperator.java | 63 +++++++++++- .../compute/operator/MvExpandOperator.java | 95 ++++++++++++++----- .../exchange/ExchangeSinkOperator.java | 45 ++++++--- .../exchange/ExchangeSourceOperator.java | 27 +++++- .../compute/operator/topn/TopNOperator.java | 35 ++++++- .../operator/topn/TopNOperatorStatus.java | 65 ++++++++++++- .../LuceneSourceOperatorStatusTests.java | 27 +++++- ...ValuesSourceReaderOperatorStatusTests.java | 22 ++++- ...bstractPageMappingOperatorStatusTests.java | 21 +++- ...eMappingToIteratorOperatorStatusTests.java | 22 ++++- .../AggregationOperatorStatusTests.java | 22 ++++- .../compute/operator/AnyOperatorTestCase.java | 42 ++++++++ .../HashAggregationOperatorStatusTests.java | 22 ++++- .../compute/operator/LimitStatusTests.java | 31 ++++-- .../operator/MvExpandOperatorStatusTests.java | 46 +++++---- .../operator/MvExpandOperatorTests.java | 8 +- .../compute/operator/OperatorTestCase.java | 3 +- .../ExchangeSinkOperatorStatusTests.java | 17 +++- .../ExchangeSourceOperatorStatusTests.java | 29 +++--- .../topn/TopNOperatorStatusTests.java | 54 +++++++++-- .../xpack/esql/qa/single_node/RestEsqlIT.java | 23 ++++- .../xpack/esql/action/EsqlActionTaskIT.java | 2 +- .../action/EsqlQueryResponseProfileTests.java | 7 +- .../esql/action/EsqlQueryResponseTests.java | 6 +- .../esql/action/NamedWriteablesTests.java | 13 ++- 37 files changed, 846 insertions(+), 191 deletions(-) diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 25cfae6c9803..632bae64389a 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -1,4 +1,5 @@ import org.elasticsearch.gradle.internal.test.TestUtil +import org.elasticsearch.gradle.OS /* * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one @@ -77,7 +78,7 @@ tasks.register("copyPainless", Copy) { } tasks.named("run").configure { - executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '') args << "-Dplugins.dir=${buildDir}/plugins" << "-Dtests.index=${buildDir}/index" dependsOn "copyExpression", "copyPainless", configurations.nativeLib systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../libs/native/libraries/build/platform/").toString()) diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f0f3d27c6e86..957da69aa599 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -156,6 +156,7 @@ public class TransportVersions { public static final TransportVersion REPLACE_FAILURE_STORE_OPTIONS_WITH_SELECTOR_SYNTAX = def(8_821_00_0); public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0); public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0); + public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java index 9c35b5a44d5d..34c27a5c1fdf 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java @@ -138,7 +138,6 @@ public class LuceneCountOperator extends LuceneOperator { Page page = null; // emit only one page if (remainingDocs <= 0 && pagesEmitted == 0) { - pagesEmitted++; LongBlock count = null; BooleanBlock seen = null; try { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java index c41c31345df4..e9f540c654a2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java @@ -151,7 +151,6 @@ final class LuceneMinMaxOperator extends LuceneOperator { Page page = null; // emit only one page if (remainingDocs <= 0 && pagesEmitted == 0) { - pagesEmitted++; Block result = null; BooleanBlock seen = null; try { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index bbc3ace3716b..2f72c309b5f2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -68,6 +68,10 @@ public abstract class LuceneOperator extends SourceOperator { long processingNanos; int pagesEmitted; boolean doneCollecting; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) { this.blockFactory = blockFactory; @@ -115,7 +119,12 @@ public abstract class LuceneOperator extends SourceOperator { @Override public final Page getOutput() { try { - return getCheckedOutput(); + Page page = getCheckedOutput(); + if (page != null) { + pagesEmitted++; + rowsEmitted += page.getPositionCount(); + } + return page; } catch (IOException ioe) { throw new UncheckedIOException(ioe); } @@ -252,6 +261,7 @@ public abstract class LuceneOperator extends SourceOperator { private final int sliceMin; private final int sliceMax; private final int current; + private final long rowsEmitted; private Status(LuceneOperator operator) { processedSlices = operator.processedSlices; @@ -276,6 +286,7 @@ public abstract class LuceneOperator extends SourceOperator { current = scorer.position; } pagesEmitted = operator.pagesEmitted; + rowsEmitted = operator.rowsEmitted; } Status( @@ -288,7 +299,8 @@ public abstract class LuceneOperator extends SourceOperator { int pagesEmitted, int sliceMin, int sliceMax, - int current + int current, + long rowsEmitted ) { this.processedSlices = processedSlices; this.processedQueries = processedQueries; @@ -300,6 +312,7 @@ public abstract class LuceneOperator extends SourceOperator { this.sliceMin = sliceMin; this.sliceMax = sliceMax; this.current = current; + this.rowsEmitted = rowsEmitted; } Status(StreamInput in) throws IOException { @@ -318,6 +331,11 @@ public abstract class LuceneOperator extends SourceOperator { sliceMin = in.readVInt(); sliceMax = in.readVInt(); current = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsEmitted = in.readVLong(); + } else { + rowsEmitted = 0; + } } @Override @@ -336,6 +354,9 @@ public abstract class LuceneOperator extends SourceOperator { out.writeVInt(sliceMin); out.writeVInt(sliceMax); out.writeVInt(current); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsEmitted); + } } @Override @@ -383,6 +404,10 @@ public abstract class LuceneOperator extends SourceOperator { return current; } + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -399,6 +424,7 @@ public abstract class LuceneOperator extends SourceOperator { builder.field("slice_min", sliceMin); builder.field("slice_max", sliceMax); builder.field("current", current); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -416,12 +442,13 @@ public abstract class LuceneOperator extends SourceOperator { && pagesEmitted == status.pagesEmitted && sliceMin == status.sliceMin && sliceMax == status.sliceMax - && current == status.current; + && current == status.current + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current); + return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java index 4afabcadf60c..3d34067e1a83 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java @@ -175,7 +175,6 @@ public class LuceneSourceOperator extends LuceneOperator { } Page page = null; if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) { - pagesEmitted++; IntBlock shard = null; IntBlock leaf = null; IntVector docs = null; diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java index 8da62963ffb6..d25cb3a870da 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java @@ -240,7 +240,6 @@ public final class LuceneTopNSourceOperator extends LuceneOperator { Releasables.closeExpectNoException(shard, segments, docs, docBlock, scores); } } - pagesEmitted++; return page; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java index 74affb10eaf2..8fbb94658747 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java @@ -546,8 +546,8 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { } @Override - protected Status status(long processNanos, int pagesProcessed) { - return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed); + protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted); } public static class Status extends AbstractPageMappingOperator.Status { @@ -559,8 +559,8 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator { private final Map readersBuilt; - Status(Map readersBuilt, long processNanos, int pagesProcessed) { - super(processNanos, pagesProcessed); + Status(Map readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + super(processNanos, pagesProcessed, rowsReceived, rowsEmitted); this.readersBuilt = readersBuilt; } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java index 05913b7dd5f6..09d04d36f831 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java @@ -37,6 +37,14 @@ public abstract class AbstractPageMappingOperator implements Operator { * Count of pages that have been processed by this operator. */ private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; protected abstract Page process(Page page); @@ -52,6 +60,7 @@ public abstract class AbstractPageMappingOperator implements Operator { public final void addInput(Page page) { assert prev == null : "has pending input page"; prev = page; + rowsReceived += page.getPositionCount(); } @Override @@ -75,6 +84,9 @@ public abstract class AbstractPageMappingOperator implements Operator { long start = System.nanoTime(); Page p = process(prev); pagesProcessed++; + if (p != null) { + rowsEmitted += p.getPositionCount(); + } processNanos += System.nanoTime() - start; prev = null; return p; @@ -82,11 +94,11 @@ public abstract class AbstractPageMappingOperator implements Operator { @Override public final Status status() { - return status(processNanos, pagesProcessed); + return status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); } - protected Status status(long processNanos, int pagesProcessed) { - return new Status(processNanos, pagesProcessed); + protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); } @Override @@ -105,15 +117,26 @@ public abstract class AbstractPageMappingOperator implements Operator { private final long processNanos; private final int pagesProcessed; + private final long rowsReceived; + private final long rowsEmitted; - public Status(long processNanos, int pagesProcessed) { + public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { this.processNanos = processNanos; this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; pagesProcessed = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override @@ -122,6 +145,10 @@ public abstract class AbstractPageMappingOperator implements Operator { out.writeVLong(processNanos); } out.writeVInt(pagesProcessed); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -133,6 +160,14 @@ public abstract class AbstractPageMappingOperator implements Operator { return pagesProcessed; } + public long rowsReceived() { + return rowsReceived; + } + + public long rowsEmitted() { + return rowsEmitted; + } + public long processNanos() { return processNanos; } @@ -153,7 +188,7 @@ public abstract class AbstractPageMappingOperator implements Operator { if (builder.humanReadable()) { builder.field("process_time", TimeValue.timeValueNanos(processNanos)); } - return builder.field("pages_processed", pagesProcessed); + return builder.field("pages_processed", pagesProcessed).field("rows_received", rowsReceived).field("rows_emitted", rowsEmitted); } @Override @@ -161,12 +196,15 @@ public abstract class AbstractPageMappingOperator implements Operator { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return processNanos == status.processNanos && pagesProcessed == status.pagesProcessed; + return processNanos == status.processNanos + && pagesProcessed == status.pagesProcessed + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(processNanos, pagesProcessed); + return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java index 32492af157fe..6a165fdfa055 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperator.java @@ -49,6 +49,16 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator */ private int pagesEmitted; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; + /** * Build and Iterator of results for a new page. */ @@ -82,6 +92,7 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator } next = new RuntimeTrackingIterator(receive(page)); pagesReceived++; + rowsReceived += page.getPositionCount(); } @Override @@ -101,16 +112,23 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator } Page ret = next.next(); pagesEmitted++; + rowsEmitted += ret.getPositionCount(); return ret; } @Override public final AbstractPageMappingToIteratorOperator.Status status() { - return status(processNanos, pagesReceived, pagesEmitted); + return status(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } - protected AbstractPageMappingToIteratorOperator.Status status(long processNanos, int pagesReceived, int pagesEmitted) { - return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted); + protected AbstractPageMappingToIteratorOperator.Status status( + long processNanos, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted + ) { + return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override @@ -154,17 +172,28 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator private final long processNanos; private final int pagesReceived; private final int pagesEmitted; + private final long rowsReceived; + private final long rowsEmitted; - public Status(long processNanos, int pagesProcessed, int pagesEmitted) { + public Status(long processNanos, int pagesProcessed, int pagesEmitted, long rowsReceived, long rowsEmitted) { this.processNanos = processNanos; this.pagesReceived = pagesProcessed; this.pagesEmitted = pagesEmitted; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { processNanos = in.readVLong(); pagesReceived = in.readVInt(); pagesEmitted = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override @@ -172,6 +201,10 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator out.writeVLong(processNanos); out.writeVInt(pagesReceived); out.writeVInt(pagesEmitted); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -187,6 +220,14 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator return pagesEmitted; } + public long rowsReceived() { + return rowsReceived; + } + + public long rowsEmitted() { + return rowsEmitted; + } + public long processNanos() { return processNanos; } @@ -207,8 +248,10 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator if (builder.humanReadable()) { builder.field("process_time", TimeValue.timeValueNanos(processNanos)); } - builder.field("pages_received", pagesReceived); - return builder.field("pages_emitted", pagesEmitted); + return builder.field("pages_received", pagesReceived) + .field("pages_emitted", pagesEmitted) + .field("rows_received", rowsReceived) + .field("rows_emitted", rowsEmitted); } @Override @@ -216,12 +259,16 @@ public abstract class AbstractPageMappingToIteratorOperator implements Operator if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; AbstractPageMappingToIteratorOperator.Status status = (AbstractPageMappingToIteratorOperator.Status) o; - return processNanos == status.processNanos && pagesReceived == status.pagesReceived && pagesEmitted == status.pagesEmitted; + return processNanos == status.processNanos + && pagesReceived == status.pagesReceived + && pagesEmitted == status.pagesEmitted + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(processNanos, pagesReceived, pagesEmitted); + return Objects.hash(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java index f57f450c7ee3..ab086a7fbe48 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AggregationOperator.java @@ -58,6 +58,14 @@ public class AggregationOperator implements Operator { * Count of pages this operator has processed. */ private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; public record AggregationOperatorFactory(List aggregators, AggregatorMode mode) implements OperatorFactory { @@ -106,12 +114,16 @@ public class AggregationOperator implements Operator { page.releaseBlocks(); aggregationNanos += System.nanoTime() - start; pagesProcessed++; + rowsReceived += page.getPositionCount(); } } @Override public Page getOutput() { Page p = output; + if (p != null) { + rowsEmitted += p.getPositionCount(); + } this.output = null; return p; } @@ -181,7 +193,7 @@ public class AggregationOperator implements Operator { @Override public Operator.Status status() { - return new Status(aggregationNanos, aggregationFinishNanos, pagesProcessed); + return new Status(aggregationNanos, aggregationFinishNanos, pagesProcessed, rowsReceived, rowsEmitted); } public static class Status implements Operator.Status { @@ -204,6 +216,14 @@ public class AggregationOperator implements Operator { * Count of pages this operator has processed. */ private final int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private final long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private final long rowsEmitted; /** * Build. @@ -211,10 +231,12 @@ public class AggregationOperator implements Operator { * @param aggregationFinishNanos Nanoseconds this operator has spent running the aggregations. * @param pagesProcessed Count of pages this operator has processed. */ - public Status(long aggregationNanos, long aggregationFinishNanos, int pagesProcessed) { + public Status(long aggregationNanos, long aggregationFinishNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { this.aggregationNanos = aggregationNanos; this.aggregationFinishNanos = aggregationFinishNanos; this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { @@ -225,6 +247,13 @@ public class AggregationOperator implements Operator { aggregationFinishNanos = null; } pagesProcessed = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override @@ -234,6 +263,10 @@ public class AggregationOperator implements Operator { out.writeOptionalVLong(aggregationFinishNanos); } out.writeVInt(pagesProcessed); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -262,6 +295,20 @@ public class AggregationOperator implements Operator { return pagesProcessed; } + /** + * Count of rows this operator has received. + */ + public long rowsReceived() { + return rowsReceived; + } + + /** + * Count of rows this operator has emitted. + */ + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -277,6 +324,8 @@ public class AggregationOperator implements Operator { ); } builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -287,13 +336,15 @@ public class AggregationOperator implements Operator { if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; return aggregationNanos == status.aggregationNanos + && Objects.equals(aggregationFinishNanos, status.aggregationFinishNanos) && pagesProcessed == status.pagesProcessed - && Objects.equals(aggregationFinishNanos, status.aggregationFinishNanos); + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(aggregationNanos, aggregationFinishNanos, pagesProcessed); + return Objects.hash(aggregationNanos, aggregationFinishNanos, pagesProcessed, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index ccddfdf5cc74..c47b6cebdadd 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -99,6 +99,14 @@ public class HashAggregationOperator implements Operator { * Count of pages this operator has processed. */ private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; @SuppressWarnings("this-escape") public HashAggregationOperator( @@ -187,12 +195,16 @@ public class HashAggregationOperator implements Operator { } finally { page.releaseBlocks(); pagesProcessed++; + rowsReceived += page.getPositionCount(); } } @Override public Page getOutput() { Page p = output; + if (p != null) { + rowsEmitted += p.getPositionCount(); + } output = null; return p; } @@ -246,7 +258,7 @@ public class HashAggregationOperator implements Operator { @Override public Operator.Status status() { - return new Status(hashNanos, aggregationNanos, pagesProcessed); + return new Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted); } protected static void checkState(boolean condition, String msg) { @@ -288,23 +300,43 @@ public class HashAggregationOperator implements Operator { * Count of pages this operator has processed. */ private final int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private final long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private final long rowsEmitted; /** * Build. * @param hashNanos Nanoseconds this operator has spent hashing grouping keys. * @param aggregationNanos Nanoseconds this operator has spent running the aggregations. * @param pagesProcessed Count of pages this operator has processed. + * @param rowsReceived Count of rows this operator has received. + * @param rowsEmitted Count of rows this operator has emitted. */ - public Status(long hashNanos, long aggregationNanos, int pagesProcessed) { + public Status(long hashNanos, long aggregationNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { this.hashNanos = hashNanos; this.aggregationNanos = aggregationNanos; this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { hashNanos = in.readVLong(); aggregationNanos = in.readVLong(); pagesProcessed = in.readVInt(); + + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override @@ -312,6 +344,11 @@ public class HashAggregationOperator implements Operator { out.writeVLong(hashNanos); out.writeVLong(aggregationNanos); out.writeVInt(pagesProcessed); + + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -340,6 +377,20 @@ public class HashAggregationOperator implements Operator { return pagesProcessed; } + /** + * Count of rows this operator has received. + */ + public long rowsReceived() { + return rowsReceived; + } + + /** + * Count of rows this operator has emitted. + */ + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -352,6 +403,8 @@ public class HashAggregationOperator implements Operator { builder.field("aggregation_time", TimeValue.timeValueNanos(aggregationNanos)); } builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -361,12 +414,16 @@ public class HashAggregationOperator implements Operator { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return hashNanos == status.hashNanos && aggregationNanos == status.aggregationNanos && pagesProcessed == status.pagesProcessed; + return hashNanos == status.hashNanos + && aggregationNanos == status.aggregationNanos + && pagesProcessed == status.pagesProcessed + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(hashNanos, aggregationNanos, pagesProcessed); + return Objects.hash(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java index 34e37031e6f1..b669be9192d0 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/LimitOperator.java @@ -37,6 +37,16 @@ public class LimitOperator implements Operator { */ private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; + private Page lastInput; private boolean finished; @@ -67,6 +77,7 @@ public class LimitOperator implements Operator { public void addInput(Page page) { assert lastInput == null : "has pending input page"; lastInput = page; + rowsReceived += page.getPositionCount(); } @Override @@ -117,13 +128,14 @@ public class LimitOperator implements Operator { } lastInput = null; pagesProcessed++; + rowsEmitted += result.getPositionCount(); return result; } @Override public Status status() { - return new Status(limit, limitRemaining, pagesProcessed); + return new Status(limit, limitRemaining, pagesProcessed, rowsReceived, rowsEmitted); } @Override @@ -160,16 +172,35 @@ public class LimitOperator implements Operator { */ private final int pagesProcessed; - protected Status(int limit, int limitRemaining, int pagesProcessed) { + /** + * Count of rows this operator has received. + */ + private final long rowsReceived; + + /** + * Count of rows this operator has emitted. + */ + private final long rowsEmitted; + + protected Status(int limit, int limitRemaining, int pagesProcessed, long rowsReceived, long rowsEmitted) { this.limit = limit; this.limitRemaining = limitRemaining; this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { limit = in.readVInt(); limitRemaining = in.readVInt(); pagesProcessed = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override @@ -177,6 +208,10 @@ public class LimitOperator implements Operator { out.writeVInt(limit); out.writeVInt(limitRemaining); out.writeVInt(pagesProcessed); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -205,12 +240,28 @@ public class LimitOperator implements Operator { return pagesProcessed; } + /** + * Count of rows this operator has received. + */ + public long rowsReceived() { + return rowsReceived; + } + + /** + * Count of rows this operator has emitted. + */ + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("limit", limit); builder.field("limit_remaining", limitRemaining); builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -219,12 +270,16 @@ public class LimitOperator implements Operator { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return limit == status.limit && limitRemaining == status.limitRemaining && pagesProcessed == status.pagesProcessed; + return limit == status.limit + && limitRemaining == status.limitRemaining + && pagesProcessed == status.pagesProcessed + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(limit, limitRemaining, pagesProcessed); + return Objects.hash(limit, limitRemaining, pagesProcessed, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java index e87329a90705..1659a88a84cd 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/MvExpandOperator.java @@ -69,10 +69,21 @@ public class MvExpandOperator implements Operator { private int nextItemOnExpanded = 0; /** - * Count of pages that have been processed by this operator. + * Count of pages that this operator has received. */ - private int pagesIn; - private int pagesOut; + private int pagesReceived; + /** + * Count of pages this operator has emitted. + */ + private int pagesEmitted; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; public MvExpandOperator(int channel, int pageSize) { this.channel = channel; @@ -82,10 +93,18 @@ public class MvExpandOperator implements Operator { @Override public final Page getOutput() { + Page result = getOutputInternal(); + if (result != null) { + pagesEmitted++; + rowsEmitted += result.getPositionCount(); + } + return result; + } + + private Page getOutputInternal() { if (prev == null) { return null; } - pagesOut++; if (expandedBlock == null) { /* @@ -214,7 +233,8 @@ public class MvExpandOperator implements Operator { assert prev == null : "has pending input page"; prev = page; this.expandingBlock = prev.getBlock(channel); - pagesIn++; + pagesReceived++; + rowsReceived += page.getPositionCount(); } @Override @@ -229,7 +249,7 @@ public class MvExpandOperator implements Operator { @Override public final Status status() { - return new Status(pagesIn, pagesOut, noops); + return new Status(pagesReceived, pagesEmitted, noops, rowsReceived, rowsEmitted); } @Override @@ -248,9 +268,11 @@ public class MvExpandOperator implements Operator { public static final class Status implements Operator.Status { - private final int pagesIn; - private final int pagesOut; + private final int pagesReceived; + private final int pagesEmitted; private final int noops; + private final long rowsReceived; + private final long rowsEmitted; public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Operator.Status.class, @@ -258,31 +280,46 @@ public class MvExpandOperator implements Operator { Status::new ); - Status(int pagesIn, int pagesOut, int noops) { - this.pagesIn = pagesIn; - this.pagesOut = pagesOut; + Status(int pagesReceived, int pagesEmitted, int noops, long rowsReceived, long rowsEmitted) { + this.pagesReceived = pagesReceived; + this.pagesEmitted = pagesEmitted; this.noops = noops; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } Status(StreamInput in) throws IOException { - pagesIn = in.readVInt(); - pagesOut = in.readVInt(); + pagesReceived = in.readVInt(); + pagesEmitted = in.readVInt(); noops = in.readVInt(); + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } else { + rowsReceived = 0; + rowsEmitted = 0; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(pagesIn); - out.writeVInt(pagesOut); + out.writeVInt(pagesReceived); + out.writeVInt(pagesEmitted); out.writeVInt(noops); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("pages_in", pagesIn); - builder.field("pages_out", pagesOut); + builder.field("pages_received", pagesReceived); + builder.field("pages_emitted", pagesEmitted); builder.field("noops", noops); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -304,20 +341,32 @@ public class MvExpandOperator implements Operator { return false; } Status status = (Status) o; - return noops == status.noops && pagesIn == status.pagesIn && pagesOut == status.pagesOut; + return noops == status.noops + && pagesReceived == status.pagesReceived + && pagesEmitted == status.pagesEmitted + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; } - public int pagesIn() { - return pagesIn; + public int pagesReceived() { + return pagesReceived; } - public int pagesOut() { - return pagesOut; + public int pagesEmitted() { + return pagesEmitted; + } + + public long rowsReceived() { + return rowsReceived; + } + + public long rowsEmitted() { + return rowsEmitted; } @Override public int hashCode() { - return Objects.hash(noops, pagesIn, pagesOut); + return Objects.hash(noops, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java index 0230ca2097cb..f87edd1a3e16 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperator.java @@ -33,7 +33,8 @@ public class ExchangeSinkOperator extends SinkOperator { private final ExchangeSink sink; private final Function transformer; - private int pagesAccepted; + private int pagesReceived; + private long rowsReceived; public record ExchangeSinkOperatorFactory(Supplier exchangeSinks, Function transformer) implements @@ -81,7 +82,8 @@ public class ExchangeSinkOperator extends SinkOperator { @Override protected void doAddInput(Page page) { - pagesAccepted++; + pagesReceived++; + rowsReceived += page.getPositionCount(); sink.addPage(transformer.apply(page)); } @@ -97,7 +99,7 @@ public class ExchangeSinkOperator extends SinkOperator { @Override public Status status() { - return new Status(pagesAccepted); + return new Status(pagesReceived, rowsReceived); } public static class Status implements Operator.Status { @@ -107,19 +109,31 @@ public class ExchangeSinkOperator extends SinkOperator { Status::new ); - private final int pagesAccepted; + private final int pagesReceived; + private final long rowsReceived; - Status(int pagesAccepted) { - this.pagesAccepted = pagesAccepted; + Status(int pagesReceived, long rowsReceived) { + this.pagesReceived = pagesReceived; + this.rowsReceived = rowsReceived; } Status(StreamInput in) throws IOException { - pagesAccepted = in.readVInt(); + pagesReceived = in.readVInt(); + + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsReceived = in.readVLong(); + } else { + rowsReceived = 0; + } } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVInt(pagesAccepted); + out.writeVInt(pagesReceived); + + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsReceived); + } } @Override @@ -127,14 +141,19 @@ public class ExchangeSinkOperator extends SinkOperator { return ENTRY.name; } - public int pagesAccepted() { - return pagesAccepted; + public int pagesReceived() { + return pagesReceived; + } + + public long rowsReceived() { + return rowsReceived; } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("pages_accepted", pagesAccepted); + builder.field("pages_received", pagesReceived); + builder.field("rows_received", rowsReceived); return builder.endObject(); } @@ -143,12 +162,12 @@ public class ExchangeSinkOperator extends SinkOperator { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return pagesAccepted == status.pagesAccepted; + return pagesReceived == status.pagesReceived && rowsReceived == status.rowsReceived; } @Override public int hashCode() { - return Objects.hash(pagesAccepted); + return Objects.hash(pagesReceived, rowsReceived); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java index 2d0ce228e81d..3a96f1bb1d36 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperator.java @@ -32,6 +32,7 @@ public class ExchangeSourceOperator extends SourceOperator { private final ExchangeSource source; private IsBlockedResult isBlocked = NOT_BLOCKED; private int pagesEmitted; + private long rowsEmitted; public record ExchangeSourceOperatorFactory(Supplier exchangeSources) implements SourceOperatorFactory { @@ -55,6 +56,7 @@ public class ExchangeSourceOperator extends SourceOperator { final var page = source.pollPage(); if (page != null) { pagesEmitted++; + rowsEmitted += page.getPositionCount(); } return page; } @@ -92,7 +94,7 @@ public class ExchangeSourceOperator extends SourceOperator { @Override public Status status() { - return new Status(source.bufferSize(), pagesEmitted); + return new Status(source.bufferSize(), pagesEmitted, rowsEmitted); } public static class Status implements Operator.Status { @@ -104,21 +106,33 @@ public class ExchangeSourceOperator extends SourceOperator { private final int pagesWaiting; private final int pagesEmitted; + private final long rowsEmitted; - Status(int pagesWaiting, int pagesEmitted) { + Status(int pagesWaiting, int pagesEmitted, long rowsEmitted) { this.pagesWaiting = pagesWaiting; this.pagesEmitted = pagesEmitted; + this.rowsEmitted = rowsEmitted; } Status(StreamInput in) throws IOException { pagesWaiting = in.readVInt(); pagesEmitted = in.readVInt(); + + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + rowsEmitted = in.readVLong(); + } else { + rowsEmitted = 0; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(pagesWaiting); out.writeVInt(pagesEmitted); + + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVLong(rowsEmitted); + } } @Override @@ -134,11 +148,16 @@ public class ExchangeSourceOperator extends SourceOperator { return pagesEmitted; } + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("pages_waiting", pagesWaiting); builder.field("pages_emitted", pagesEmitted); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -147,12 +166,12 @@ public class ExchangeSourceOperator extends SourceOperator { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return pagesWaiting == status.pagesWaiting && pagesEmitted == status.pagesEmitted; + return pagesWaiting == status.pagesWaiting && pagesEmitted == status.pagesEmitted && rowsEmitted == status.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(pagesWaiting, pagesEmitted); + return Objects.hash(pagesWaiting, pagesEmitted, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java index 682a5d6050c2..0489be58fade 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperator.java @@ -278,6 +278,26 @@ public class TopNOperator implements Operator, Accountable { private Iterator output; + /** + * Count of pages that have been received by this operator. + */ + private int pagesReceived; + + /** + * Count of pages that have been emitted by this operator. + */ + private int pagesEmitted; + + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; + public TopNOperator( BlockFactory blockFactory, CircuitBreaker breaker, @@ -368,7 +388,9 @@ public class TopNOperator implements Operator, Accountable { spare = inputQueue.insertWithOverflow(spare); } } finally { - Releasables.close(() -> page.releaseBlocks()); + page.releaseBlocks(); + pagesReceived++; + rowsReceived += page.getPositionCount(); } } @@ -491,10 +513,13 @@ public class TopNOperator implements Operator, Accountable { @Override public Page getOutput() { - if (output != null && output.hasNext()) { - return output.next(); + if (output == null || output.hasNext() == false) { + return null; } - return null; + Page ret = output.next(); + pagesEmitted++; + rowsEmitted += ret.getPositionCount(); + return ret; } @Override @@ -531,7 +556,7 @@ public class TopNOperator implements Operator, Accountable { @Override public Status status() { - return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed()); + return new TopNOperatorStatus(inputQueue.size(), ramBytesUsed(), pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java index 1617a546be2c..ceccdce529ce 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatus.java @@ -27,21 +27,55 @@ public class TopNOperatorStatus implements Operator.Status { ); private final int occupiedRows; private final long ramBytesUsed; + private final int pagesReceived; + private final int pagesEmitted; + private final long rowsReceived; + private final long rowsEmitted; - public TopNOperatorStatus(int occupiedRows, long ramBytesUsed) { + public TopNOperatorStatus( + int occupiedRows, + long ramBytesUsed, + int pagesReceived, + int pagesEmitted, + long rowsReceived, + long rowsEmitted + ) { this.occupiedRows = occupiedRows; this.ramBytesUsed = ramBytesUsed; + this.pagesReceived = pagesReceived; + this.pagesEmitted = pagesEmitted; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; } TopNOperatorStatus(StreamInput in) throws IOException { this.occupiedRows = in.readVInt(); this.ramBytesUsed = in.readVLong(); + + if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + this.pagesReceived = in.readVInt(); + this.pagesEmitted = in.readVInt(); + this.rowsReceived = in.readVLong(); + this.rowsEmitted = in.readVLong(); + } else { + this.pagesReceived = 0; + this.pagesEmitted = 0; + this.rowsReceived = 0; + this.rowsEmitted = 0; + } } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(occupiedRows); out.writeVLong(ramBytesUsed); + + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) { + out.writeVInt(pagesReceived); + out.writeVInt(pagesEmitted); + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } } @Override @@ -57,12 +91,32 @@ public class TopNOperatorStatus implements Operator.Status { return ramBytesUsed; } + public int pagesReceived() { + return pagesReceived; + } + + public int pagesEmitted() { + return pagesEmitted; + } + + public long rowsReceived() { + return rowsReceived; + } + + public long rowsEmitted() { + return rowsEmitted; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); builder.field("occupied_rows", occupiedRows); builder.field("ram_bytes_used", ramBytesUsed); builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed)); + builder.field("pages_received", pagesReceived); + builder.field("pages_emitted", pagesEmitted); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); return builder.endObject(); } @@ -72,12 +126,17 @@ public class TopNOperatorStatus implements Operator.Status { return false; } TopNOperatorStatus that = (TopNOperatorStatus) o; - return occupiedRows == that.occupiedRows && ramBytesUsed == that.ramBytesUsed; + return occupiedRows == that.occupiedRows + && ramBytesUsed == that.ramBytesUsed + && pagesReceived == that.pagesReceived + && pagesEmitted == that.pagesEmitted + && rowsReceived == that.rowsReceived + && rowsEmitted == that.rowsEmitted; } @Override public int hashCode() { - return Objects.hash(occupiedRows, ramBytesUsed); + return Objects.hash(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java index def0710644d2..28aa9e7976c7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorStatusTests.java @@ -20,7 +20,19 @@ import static org.hamcrest.Matchers.equalTo; public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTestCase { public static LuceneSourceOperator.Status simple() { - return new LuceneSourceOperator.Status(2, Set.of("*:*"), new TreeSet<>(List.of("a:0", "a:1")), 1002, 0, 1, 5, 123, 99990, 8000); + return new LuceneSourceOperator.Status( + 2, + Set.of("*:*"), + new TreeSet<>(List.of("a:0", "a:1")), + 1002, + 0, + 1, + 5, + 123, + 99990, + 8000, + 222 + ); } public static String simpleToJson() { @@ -41,7 +53,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest "pages_emitted" : 5, "slice_min" : 123, "slice_max" : 99990, - "current" : 8000 + "current" : 8000, + "rows_emitted" : 222 }"""; } @@ -66,7 +79,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt(), - randomNonNegativeInt() + randomNonNegativeInt(), + randomNonNegativeLong() ); } @@ -100,7 +114,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest int sliceMin = instance.sliceMin(); int sliceMax = instance.sliceMax(); int current = instance.current(); - switch (between(0, 9)) { + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 10)) { case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt); case 1 -> processedQueries = randomValueOtherThan(processedQueries, LuceneSourceOperatorStatusTests::randomProcessedQueries); case 2 -> processedShards = randomValueOtherThan(processedShards, LuceneSourceOperatorStatusTests::randomProcessedShards); @@ -111,6 +126,7 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest case 7 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt); case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt); case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt); + case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } return new LuceneSourceOperator.Status( @@ -123,7 +139,8 @@ public class LuceneSourceOperatorStatusTests extends AbstractWireSerializingTest pagesEmitted, sliceMin, sliceMax, - current + current, + rowsEmitted ); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java index 5887da0bc466..4303137f74bb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorStatusTests.java @@ -20,7 +20,7 @@ import static org.hamcrest.Matchers.equalTo; public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializingTestCase { public static ValuesSourceReaderOperator.Status simple() { - return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123); + return new ValuesSourceReaderOperator.Status(Map.of("ReaderType", 3), 1022323, 123, 111, 222); } public static String simpleToJson() { @@ -31,7 +31,9 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi }, "process_nanos" : 1022323, "process_time" : "1ms", - "pages_processed" : 123 + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 }"""; } @@ -46,7 +48,13 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi @Override public ValuesSourceReaderOperator.Status createTestInstance() { - return new ValuesSourceReaderOperator.Status(randomReadersBuilt(), randomNonNegativeLong(), randomNonNegativeInt()); + return new ValuesSourceReaderOperator.Status( + randomReadersBuilt(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } private Map randomReadersBuilt() { @@ -63,12 +71,16 @@ public class ValuesSourceReaderOperatorStatusTests extends AbstractWireSerializi Map readersBuilt = instance.readersBuilt(); long processNanos = instance.processNanos(); int pagesProcessed = instance.pagesProcessed(); - switch (between(0, 2)) { + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { case 0 -> readersBuilt = randomValueOtherThan(readersBuilt, this::randomReadersBuilt); case 1 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed); + return new ValuesSourceReaderOperator.Status(readersBuilt, processNanos, pagesProcessed, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java index 3c04e6e5a9f5..3e8aaf4f6b0b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingOperatorStatusTests.java @@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.equalTo; public class AbstractPageMappingOperatorStatusTests extends AbstractWireSerializingTestCase { public static AbstractPageMappingOperator.Status simple() { - return new AbstractPageMappingOperator.Status(200012, 123); + return new AbstractPageMappingOperator.Status(200012, 123, 111, 222); } public static String simpleToJson() { @@ -24,7 +24,9 @@ public class AbstractPageMappingOperatorStatusTests extends AbstractWireSerializ { "process_nanos" : 200012, "process_time" : "200micros", - "pages_processed" : 123 + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 }"""; } @@ -39,18 +41,27 @@ public class AbstractPageMappingOperatorStatusTests extends AbstractWireSerializ @Override public AbstractPageMappingOperator.Status createTestInstance() { - return new AbstractPageMappingOperator.Status(randomNonNegativeLong(), randomNonNegativeInt()); + return new AbstractPageMappingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } @Override protected AbstractPageMappingOperator.Status mutateInstance(AbstractPageMappingOperator.Status instance) { long processNanos = instance.processNanos(); int pagesProcessed = instance.pagesProcessed(); - switch (between(0, 1)) { + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 3)) { case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 2 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 3 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new AbstractPageMappingOperator.Status(processNanos, pagesProcessed); + return new AbstractPageMappingOperator.Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java index 41db82b9b4c8..b131c43ad648 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AbstractPageMappingToIteratorOperatorStatusTests.java @@ -17,7 +17,7 @@ import static org.hamcrest.Matchers.equalTo; public class AbstractPageMappingToIteratorOperatorStatusTests extends AbstractWireSerializingTestCase< AbstractPageMappingToIteratorOperator.Status> { public static AbstractPageMappingToIteratorOperator.Status simple() { - return new AbstractPageMappingToIteratorOperator.Status(200012, 123, 204); + return new AbstractPageMappingToIteratorOperator.Status(200012, 123, 204, 111, 222); } public static String simpleToJson() { @@ -26,7 +26,9 @@ public class AbstractPageMappingToIteratorOperatorStatusTests extends AbstractWi "process_nanos" : 200012, "process_time" : "200micros", "pages_received" : 123, - "pages_emitted" : 204 + "pages_emitted" : 204, + "rows_received" : 111, + "rows_emitted" : 222 }"""; } @@ -41,7 +43,13 @@ public class AbstractPageMappingToIteratorOperatorStatusTests extends AbstractWi @Override public AbstractPageMappingToIteratorOperator.Status createTestInstance() { - return new AbstractPageMappingToIteratorOperator.Status(randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeInt()); + return new AbstractPageMappingToIteratorOperator.Status( + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } @Override @@ -49,12 +57,16 @@ public class AbstractPageMappingToIteratorOperatorStatusTests extends AbstractWi long processNanos = instance.processNanos(); int pagesReceived = instance.pagesReceived(); int pagesEmitted = instance.pagesEmitted(); - switch (between(0, 2)) { + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); case 1 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); case 2 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted); + return new AbstractPageMappingToIteratorOperator.Status(processNanos, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java index f9d806b72cb4..ba6c3ea0c153 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AggregationOperatorStatusTests.java @@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.equalTo; public class AggregationOperatorStatusTests extends AbstractWireSerializingTestCase { public static AggregationOperator.Status simple() { - return new AggregationOperator.Status(200012, 400036, 123); + return new AggregationOperator.Status(200012, 400036, 123, 111, 222); } public static String simpleToJson() { @@ -26,7 +26,9 @@ public class AggregationOperatorStatusTests extends AbstractWireSerializingTestC "aggregation_time" : "200micros", "aggregation_finish_nanos" : 400036, "aggregation_finish_time" : "400micros", - "pages_processed" : 123 + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 }"""; } @@ -41,7 +43,13 @@ public class AggregationOperatorStatusTests extends AbstractWireSerializingTestC @Override public AggregationOperator.Status createTestInstance() { - return new AggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt()); + return new AggregationOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } @Override @@ -49,12 +57,16 @@ public class AggregationOperatorStatusTests extends AbstractWireSerializingTestC long aggregationNanos = instance.aggregationNanos(); long aggregationFinishNanos = instance.aggregationFinishNanos(); int pagesProcessed = instance.pagesProcessed(); - switch (between(0, 2)) { + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { case 0 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong); case 1 -> aggregationFinishNanos = randomValueOtherThan(aggregationFinishNanos, ESTestCase::randomNonNegativeLong); case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new AggregationOperator.Status(aggregationNanos, aggregationFinishNanos, pagesProcessed); + return new AggregationOperator.Status(aggregationNanos, aggregationFinishNanos, pagesProcessed, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java index ba3e7d816e42..3d4c8b8ed226 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AnyOperatorTestCase.java @@ -8,10 +8,20 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; import org.hamcrest.Matcher; +import java.io.IOException; + +import static org.hamcrest.Matchers.both; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.matchesPattern; /** @@ -74,6 +84,38 @@ public abstract class AnyOperatorTestCase extends ComputeTestCase { } } + /** + * Ensures that the Operator.Status of this operator has the standard fields. + */ + public void testOperatorStatus() throws IOException { + DriverContext driverContext = driverContext(); + try (var operator = simple().get(driverContext)) { + Operator.Status status = operator.status(); + + assumeTrue("Operator does not provide a status", status != null); + + var xContent = XContentType.JSON.xContent(); + try (var xContentBuilder = XContentBuilder.builder(xContent)) { + status.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS); + + var bytesReference = BytesReference.bytes(xContentBuilder); + var map = XContentHelper.convertToMap(bytesReference, false, xContentBuilder.contentType()).v2(); + + if (operator instanceof SourceOperator) { + assertThat(map, hasKey("pages_emitted")); + assertThat(map, hasKey("rows_emitted")); + } else if (operator instanceof SinkOperator) { + assertThat(map, hasKey("pages_received")); + assertThat(map, hasKey("rows_received")); + } else { + assertThat(map, either(hasKey("pages_processed")).or(both(hasKey("pages_received")).and(hasKey("pages_emitted")))); + assertThat(map, hasKey("rows_received")); + assertThat(map, hasKey("rows_emitted")); + } + } + } + } + /** * A {@link DriverContext} with a nonBreakingBigArrays. */ diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java index 245ae171c630..93e1a9f8d221 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorStatusTests.java @@ -16,7 +16,7 @@ import static org.hamcrest.Matchers.equalTo; public class HashAggregationOperatorStatusTests extends AbstractWireSerializingTestCase { public static HashAggregationOperator.Status simple() { - return new HashAggregationOperator.Status(500012, 200012, 123); + return new HashAggregationOperator.Status(500012, 200012, 123, 111, 222); } public static String simpleToJson() { @@ -26,7 +26,9 @@ public class HashAggregationOperatorStatusTests extends AbstractWireSerializingT "hash_time" : "500micros", "aggregation_nanos" : 200012, "aggregation_time" : "200micros", - "pages_processed" : 123 + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 }"""; } @@ -41,7 +43,13 @@ public class HashAggregationOperatorStatusTests extends AbstractWireSerializingT @Override public HashAggregationOperator.Status createTestInstance() { - return new HashAggregationOperator.Status(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt()); + return new HashAggregationOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } @Override @@ -49,12 +57,16 @@ public class HashAggregationOperatorStatusTests extends AbstractWireSerializingT long hashNanos = instance.hashNanos(); long aggregationNanos = instance.aggregationNanos(); int pagesProcessed = instance.pagesProcessed(); - switch (between(0, 2)) { + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { case 0 -> hashNanos = randomValueOtherThan(hashNanos, ESTestCase::randomNonNegativeLong); case 1 -> aggregationNanos = randomValueOtherThan(aggregationNanos, ESTestCase::randomNonNegativeLong); case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed); + return new HashAggregationOperator.Status(hashNanos, aggregationNanos, pagesProcessed, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java index fd2b75f6bd81..016c8a85f94c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/LimitStatusTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.compute.operator; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -17,8 +18,8 @@ import static org.hamcrest.Matchers.equalTo; public class LimitStatusTests extends AbstractWireSerializingTestCase { public void testToXContent() { - assertThat(Strings.toString(new LimitOperator.Status(10, 1, 1)), equalTo(""" - {"limit":10,"limit_remaining":1,"pages_processed":1}""")); + assertThat(Strings.toString(new LimitOperator.Status(10, 1, 1, 111, 222)), equalTo(""" + {"limit":10,"limit_remaining":1,"pages_processed":1,"rows_received":111,"rows_emitted":222}""")); } @Override @@ -28,7 +29,13 @@ public class LimitStatusTests extends AbstractWireSerializingTestCase between(0, Integer.MAX_VALUE)); + limit = randomValueOtherThan(limit, ESTestCase::randomNonNegativeInt); break; case 1: - limitRemaining = randomValueOtherThan(limitRemaining, () -> between(0, Integer.MAX_VALUE)); + limitRemaining = randomValueOtherThan(limitRemaining, ESTestCase::randomNonNegativeInt); break; case 2: - pagesProcessed = randomValueOtherThan(pagesProcessed, () -> between(0, Integer.MAX_VALUE)); + pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + break; + case 3: + rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + break; + case 4: + rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); break; default: throw new IllegalArgumentException(); } - return new LimitOperator.Status(limit, limitRemaining, pagesProcessed); + return new LimitOperator.Status(limit, limitRemaining, pagesProcessed, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java index 9527388a0d3c..a421ba360e4a 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorStatusTests.java @@ -16,12 +16,12 @@ import static org.hamcrest.Matchers.equalTo; public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase { public static MvExpandOperator.Status simple() { - return new MvExpandOperator.Status(10, 15, 9); + return new MvExpandOperator.Status(10, 15, 9, 111, 222); } public static String simpleToJson() { return """ - {"pages_in":10,"pages_out":15,"noops":9}"""; + {"pages_received":10,"pages_emitted":15,"noops":9,"rows_received":111,"rows_emitted":222}"""; } public void testToXContent() { @@ -35,32 +35,30 @@ public class MvExpandOperatorStatusTests extends AbstractWireSerializingTestCase @Override public MvExpandOperator.Status createTestInstance() { - return new MvExpandOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeInt()); + return new MvExpandOperator.Status( + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); } @Override protected MvExpandOperator.Status mutateInstance(MvExpandOperator.Status instance) { - switch (between(0, 2)) { - case 0: - return new MvExpandOperator.Status( - randomValueOtherThan(instance.pagesIn(), ESTestCase::randomNonNegativeInt), - instance.pagesOut(), - instance.noops() - ); - case 1: - return new MvExpandOperator.Status( - instance.pagesIn(), - randomValueOtherThan(instance.pagesOut(), ESTestCase::randomNonNegativeInt), - instance.noops() - ); - case 2: - return new MvExpandOperator.Status( - instance.pagesIn(), - instance.pagesOut(), - randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt) - ); - default: - throw new UnsupportedOperationException(); + int pagesReceived = instance.pagesReceived(); + int pagesEmitted = instance.pagesEmitted(); + int noops = instance.noops(); + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 4)) { + case 0 -> pagesReceived = randomValueOtherThan(instance.pagesReceived(), ESTestCase::randomNonNegativeInt); + case 1 -> pagesEmitted = randomValueOtherThan(instance.pagesEmitted(), ESTestCase::randomNonNegativeInt); + case 2 -> noops = randomValueOtherThan(instance.noops(), ESTestCase::randomNonNegativeInt); + case 3 -> rowsReceived = randomValueOtherThan(instance.rowsReceived(), ESTestCase::randomNonNegativeLong); + case 4 -> rowsEmitted = randomValueOtherThan(instance.rowsEmitted(), ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); } + return new MvExpandOperator.Status(pagesReceived, pagesEmitted, noops, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java index 9442fb05761d..b07ff8b0da57 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/MvExpandOperatorTests.java @@ -210,8 +210,8 @@ public class MvExpandOperatorTests extends OperatorTestCase { assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); MvExpandOperator.Status status = op.status(); - assertThat(status.pagesIn(), equalTo(1)); - assertThat(status.pagesOut(), equalTo(1)); + assertThat(status.pagesReceived(), equalTo(1)); + assertThat(status.pagesEmitted(), equalTo(1)); assertThat(status.noops(), equalTo(1)); } @@ -223,8 +223,8 @@ public class MvExpandOperatorTests extends OperatorTestCase { assertThat(result, hasSize(1)); assertThat(valuesAtPositions(result.get(0).getBlock(0), 0, 2), equalTo(List.of(List.of(1), List.of(2)))); MvExpandOperator.Status status = op.status(); - assertThat(status.pagesIn(), equalTo(1)); - assertThat(status.pagesOut(), equalTo(1)); + assertThat(status.pagesReceived(), equalTo(1)); + assertThat(status.pagesEmitted(), equalTo(1)); assertThat(status.noops(), equalTo(0)); result.forEach(Page::releaseBlocks); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index 54db0453530b..28b7095aa1bd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -212,7 +212,8 @@ public abstract class OperatorTestCase extends AnyOperatorTestCase { // Clone the input so that the operator can close it, then, later, we can read it again to build the assertion. List origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance()); - List results = drive(simple().get(context), input.iterator(), context); + var operator = simple().get(context); + List results = drive(operator, input.iterator(), context); assertSimpleOutput(origInput, results); assertThat(context.breaker().getUsed(), equalTo(0L)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperatorStatusTests.java index 369913c7d152..aa2ca2faebbd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSinkOperatorStatusTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.compute.operator.exchange; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -21,13 +22,14 @@ public class ExchangeSinkOperatorStatusTests extends AbstractWireSerializingTest } public static ExchangeSinkOperator.Status simple() { - return new ExchangeSinkOperator.Status(10); + return new ExchangeSinkOperator.Status(10, 111); } public static String simpleToJson() { return """ { - "pages_accepted" : 10 + "pages_received" : 10, + "rows_received" : 111 }"""; } @@ -38,11 +40,18 @@ public class ExchangeSinkOperatorStatusTests extends AbstractWireSerializingTest @Override public ExchangeSinkOperator.Status createTestInstance() { - return new ExchangeSinkOperator.Status(between(0, Integer.MAX_VALUE)); + return new ExchangeSinkOperator.Status(randomNonNegativeInt(), randomNonNegativeLong()); } @Override protected ExchangeSinkOperator.Status mutateInstance(ExchangeSinkOperator.Status instance) throws IOException { - return new ExchangeSinkOperator.Status(randomValueOtherThan(instance.pagesAccepted(), () -> between(0, Integer.MAX_VALUE))); + int pagesReceived = instance.pagesReceived(); + long rowsReceived = instance.rowsReceived(); + switch (between(0, 1)) { + case 0 -> pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); + case 1 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new ExchangeSinkOperator.Status(pagesReceived, rowsReceived); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperatorStatusTests.java index 2c5f7eebbaf3..e99ea69af54d 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeSourceOperatorStatusTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.compute.operator.exchange; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import java.io.IOException; @@ -17,8 +18,8 @@ import static org.hamcrest.Matchers.equalTo; public class ExchangeSourceOperatorStatusTests extends AbstractWireSerializingTestCase { public void testToXContent() { - assertThat(Strings.toString(new ExchangeSourceOperator.Status(0, 10)), equalTo(""" - {"pages_waiting":0,"pages_emitted":10}""")); + assertThat(Strings.toString(new ExchangeSourceOperator.Status(0, 10, 111)), equalTo(""" + {"pages_waiting":0,"pages_emitted":10,"rows_emitted":111}""")); } @Override @@ -28,24 +29,20 @@ public class ExchangeSourceOperatorStatusTests extends AbstractWireSerializingTe @Override protected ExchangeSourceOperator.Status createTestInstance() { - return new ExchangeSourceOperator.Status(between(0, Integer.MAX_VALUE), between(0, Integer.MAX_VALUE)); + return new ExchangeSourceOperator.Status(randomNonNegativeInt(), randomNonNegativeInt(), randomNonNegativeLong()); } @Override protected ExchangeSourceOperator.Status mutateInstance(ExchangeSourceOperator.Status instance) throws IOException { - switch (between(0, 1)) { - case 0: - return new ExchangeSourceOperator.Status( - randomValueOtherThan(instance.pagesWaiting(), () -> between(0, Integer.MAX_VALUE)), - instance.pagesEmitted() - ); - case 1: - return new ExchangeSourceOperator.Status( - instance.pagesWaiting(), - randomValueOtherThan(instance.pagesEmitted(), () -> between(0, Integer.MAX_VALUE)) - ); - default: - throw new UnsupportedOperationException(); + int pagesWaiting = instance.pagesWaiting(); + int pagesEmitted = instance.pagesEmitted(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 2)) { + case 0 -> pagesWaiting = randomValueOtherThan(pagesWaiting, ESTestCase::randomNonNegativeInt); + case 1 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 2 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); } + return new ExchangeSourceOperator.Status(pagesWaiting, pagesEmitted, rowsEmitted); } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java index f52274b68bdf..5faf5159a546 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorStatusTests.java @@ -10,13 +10,30 @@ package org.elasticsearch.compute.operator.topn; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; import static org.hamcrest.Matchers.equalTo; public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase { + public static TopNOperatorStatus simple() { + return new TopNOperatorStatus(10, 2000, 123, 123, 111, 222); + } + + public static String simpleToJson() { + return """ + { + "occupied_rows" : 10, + "ram_bytes_used" : 2000, + "ram_used" : "1.9kb", + "pages_received" : 123, + "pages_emitted" : 123, + "rows_received" : 111, + "rows_emitted" : 222 + }"""; + } + public void testToXContent() { - assertThat(Strings.toString(new TopNOperatorStatus(10, 2000)), equalTo(""" - {"occupied_rows":10,"ram_bytes_used":2000,"ram_used":"1.9kb"}""")); + assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson())); } @Override @@ -26,23 +43,46 @@ public class TopNOperatorStatusTests extends AbstractWireSerializingTestCase randomNonNegativeInt()); + occupiedRows = randomValueOtherThan(occupiedRows, ESTestCase::randomNonNegativeInt); break; case 1: - ramBytesUsed = randomValueOtherThan(ramBytesUsed, () -> randomNonNegativeLong()); + ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong); + break; + case 2: + pagesReceived = randomValueOtherThan(pagesReceived, ESTestCase::randomNonNegativeInt); + break; + case 3: + pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + break; + case 4: + rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + break; + case 5: + rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); break; default: throw new IllegalArgumentException(); } - return new TopNOperatorStatus(occupiedRows, ramBytesUsed); + return new TopNOperatorStatus(occupiedRows, ramBytesUsed, pagesReceived, pagesEmitted, rowsReceived, rowsEmitted); } } diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 050259bbb5b5..daa986f8a601 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -574,23 +574,35 @@ public class RestEsqlIT extends RestEsqlTestCase { .entry("slice_min", 0) .entry("current", DocIdSetIterator.NO_MORE_DOCS) .entry("pages_emitted", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)) .entry("processing_nanos", greaterThan(0)) .entry("processed_queries", List.of("*:*")); case "ValuesSourceReaderOperator" -> basicProfile().entry("readers_built", matchesMap().extraOk()); case "AggregationOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) + .entry("rows_received", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)) .entry("aggregation_nanos", greaterThan(0)) .entry("aggregation_finish_nanos", greaterThan(0)); - case "ExchangeSinkOperator" -> matchesMap().entry("pages_accepted", greaterThan(0)); - case "ExchangeSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)).entry("pages_waiting", 0); + case "ExchangeSinkOperator" -> matchesMap().entry("pages_received", greaterThan(0)).entry("rows_received", greaterThan(0)); + case "ExchangeSourceOperator" -> matchesMap().entry("pages_waiting", 0) + .entry("pages_emitted", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)); case "ProjectOperator", "EvalOperator" -> basicProfile(); case "LimitOperator" -> matchesMap().entry("pages_processed", greaterThan(0)) .entry("limit", 1000) - .entry("limit_remaining", 999); + .entry("limit_remaining", 999) + .entry("rows_received", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)); case "OutputOperator" -> null; case "TopNOperator" -> matchesMap().entry("occupied_rows", 0) + .entry("pages_received", greaterThan(0)) + .entry("pages_emitted", greaterThan(0)) + .entry("rows_received", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)) .entry("ram_used", instanceOf(String.class)) .entry("ram_bytes_used", greaterThan(0)); case "LuceneTopNSourceOperator" -> matchesMap().entry("pages_emitted", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)) .entry("current", greaterThan(0)) .entry("processed_slices", greaterThan(0)) .entry("processed_shards", List.of("rest-esql-test:0")) @@ -611,7 +623,10 @@ public class RestEsqlIT extends RestEsqlTestCase { } private MapMatcher basicProfile() { - return matchesMap().entry("pages_processed", greaterThan(0)).entry("process_nanos", greaterThan(0)); + return matchesMap().entry("pages_processed", greaterThan(0)) + .entry("process_nanos", greaterThan(0)) + .entry("rows_received", greaterThan(0)) + .entry("rows_emitted", greaterThan(0)); } private void assertException(String query, String... errorMessages) throws IOException { diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 34991c3958f2..8e27cfceb28e 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -160,7 +160,7 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { } if (o.operator().equals("ExchangeSinkOperator")) { ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status(); - assertThat(oStatus.pagesAccepted(), greaterThanOrEqualTo(0)); + assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(0)); exchangeSinks++; } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index 134981d3c3b0..ebfe1c814707 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -63,7 +63,12 @@ public class EsqlQueryResponseProfileTests extends AbstractWireSerializingTestCa String name = randomAlphaOfLength(4); Operator.Status status = randomBoolean() ? null - : new AbstractPageMappingOperator.Status(randomNonNegativeLong(), between(0, Integer.MAX_VALUE)); + : new AbstractPageMappingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); return new DriverStatus.OperatorStatus(name, status); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 4b9ae0d1692e..1d49409dc964 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -683,7 +683,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase