diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 82e70c2fd69f..680770a41c44 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -252,6 +252,7 @@ public class TransportVersions { public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00); public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00); public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00); + public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java index b5b325e11094..099a81abe5a2 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java @@ -57,12 +57,12 @@ public abstract class LuceneOperator extends SourceOperator { /** * Count of the number of slices processed. */ - private int processedSlices; + int processedSlices; final int maxPageSize; private final LuceneSliceQueue sliceQueue; - private final Set processedQueries = new HashSet<>(); - private final Set processedShards = new HashSet<>(); + final Set processedQueries = new HashSet<>(); + final Set processedShards = new HashSet<>(); private LuceneSlice currentSlice; private int sliceIndex; @@ -75,7 +75,7 @@ public abstract class LuceneOperator extends SourceOperator { /** * Count of rows this operator has emitted. */ - private long rowsEmitted; + long rowsEmitted; protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) { this.blockFactory = blockFactory; @@ -277,7 +277,7 @@ public abstract class LuceneOperator extends SourceOperator { private final long rowsEmitted; private final Map partitioningStrategies; - private Status(LuceneOperator operator) { + protected Status(LuceneOperator operator) { processedSlices = operator.processedSlices; processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new)); processNanos = operator.processingNanos; @@ -447,6 +447,11 @@ public abstract class LuceneOperator extends SourceOperator { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); + toXContentFields(builder, params); + return builder.endObject(); + } + + protected void toXContentFields(XContentBuilder builder, Params params) throws IOException { builder.field("processed_slices", processedSlices); builder.field("processed_queries", processedQueries); builder.field("processed_shards", processedShards); @@ -462,7 +467,6 @@ public abstract class LuceneOperator extends SourceOperator { builder.field("current", current); builder.field("rows_emitted", rowsEmitted); builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies)); - return builder.endObject(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java index 420b3cc8a315..4e00822fa525 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperator.java @@ -17,6 +17,11 @@ import org.apache.lucene.search.Scorer; import org.apache.lucene.search.Weight; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefVector; @@ -25,7 +30,7 @@ import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.OrdinalBytesRefVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; import org.elasticsearch.index.fieldvisitor.StoredFieldLoader; @@ -33,13 +38,17 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader; import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.search.fetch.StoredFieldsSpec; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; -public final class TimeSeriesSourceOperator extends SourceOperator { +public final class TimeSeriesSourceOperator extends LuceneOperator { private final boolean emitDocIds; private final int maxPageSize; @@ -55,6 +64,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { private final List fieldsToExtracts; private ShardLevelFieldsReader fieldsReader; private DocIdCollector docCollector; + private long tsidsLoaded; TimeSeriesSourceOperator( BlockFactory blockFactory, @@ -64,6 +74,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { int maxPageSize, int limit ) { + super(blockFactory, maxPageSize, sliceQueue); this.maxPageSize = maxPageSize; this.blockFactory = blockFactory; this.fieldsToExtracts = fieldsToExtract; @@ -85,7 +96,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { } @Override - public Page getOutput() { + public Page getCheckedOutput() throws IOException { if (isFinished()) { return null; } @@ -97,6 +108,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { Page page = null; Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()]; + long startInNanos = System.nanoTime(); try { if (iterator == null) { var slice = sliceQueue.nextSlice(); @@ -130,6 +142,8 @@ public final class TimeSeriesSourceOperator extends SourceOperator { currentPagePos = 0; } if (iterator.completed()) { + processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier()); + processedSlices++; Releasables.close(docCollector, fieldsReader); iterator = null; } @@ -139,6 +153,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { if (page == null) { Releasables.closeExpectNoException(blocks); } + processingNanos += System.nanoTime() - startInNanos; } return page; } @@ -162,6 +177,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator { } }; Weight weight = luceneSlice.weight(); + processedQueries.add(weight.getQuery()); int maxSegmentOrd = 0; for (var leafReaderContext : luceneSlice.leaves()) { LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext()); @@ -237,6 +253,9 @@ public final class TimeSeriesSourceOperator extends SourceOperator { break; } } + if (oneTsidQueue.size() > 0) { + ++tsidsLoaded; + } } return oneTsidQueue; } @@ -306,11 +325,6 @@ public final class TimeSeriesSourceOperator extends SourceOperator { } } - @Override - public String toString() { - return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]"; - } - static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory { BlockLoaderFactory(BlockFactory factory) { super(factory); @@ -571,4 +585,118 @@ public final class TimeSeriesSourceOperator extends SourceOperator { Releasables.close(docsBuilder, segmentsBuilder); } } + + @Override + protected void describe(StringBuilder sb) { + sb.append("[" + "maxPageSize=").append(maxPageSize).append(", remainingDocs=").append(remainingDocs).append("]"); + } + + @Override + public Operator.Status status() { + final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields + return new Status(this, tsidsLoaded, valuesLoaded); + } + + public static class Status extends LuceneOperator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "time_series_source", + Status::new + ); + + private final long tsidLoaded; + private final long valuesLoaded; + + Status(TimeSeriesSourceOperator operator, long tsidLoaded, long valuesLoaded) { + super(operator); + this.tsidLoaded = tsidLoaded; + this.valuesLoaded = valuesLoaded; + } + + Status( + int processedSlices, + Set processedQueries, + Set processedShards, + long processNanos, + int sliceIndex, + int totalSlices, + int pagesEmitted, + int sliceMin, + int sliceMax, + int current, + long rowsEmitted, + Map partitioningStrategies, + long tsidLoaded, + long valuesLoaded + ) { + super( + processedSlices, + processedQueries, + processedShards, + processNanos, + sliceIndex, + totalSlices, + pagesEmitted, + sliceMin, + sliceMax, + current, + rowsEmitted, + partitioningStrategies + ); + this.tsidLoaded = tsidLoaded; + this.valuesLoaded = valuesLoaded; + } + + Status(StreamInput in) throws IOException { + super(in); + this.tsidLoaded = in.readVLong(); + this.valuesLoaded = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(tsidLoaded); + out.writeVLong(valuesLoaded); + } + + @Override + protected void toXContentFields(XContentBuilder builder, Params params) throws IOException { + super.toXContentFields(builder, params); + builder.field("tsid_loaded", tsidLoaded); + builder.field("values_loaded", valuesLoaded); + } + + public long tsidLoaded() { + return tsidLoaded; + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return version.onOrAfter(TransportVersions.ESQL_TIME_SERIES_SOURCE_STATUS); + } + + @Override + public long valuesLoaded() { + return valuesLoaded; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + if (super.equals(o) == false) return false; + Status status = (Status) o; + return tsidLoaded == status.tsidLoaded && valuesLoaded == status.valuesLoaded; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), tsidLoaded, valuesLoaded); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorStatusTests.java new file mode 100644 index 000000000000..21c024e9eda4 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorStatusTests.java @@ -0,0 +1,187 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.lucene; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import static org.hamcrest.Matchers.equalTo; + +public class TimeSeriesSourceOperatorStatusTests extends AbstractWireSerializingTestCase { + public static TimeSeriesSourceOperator.Status simple() { + return new TimeSeriesSourceOperator.Status( + 2, + Set.of("*:*"), + new TreeSet<>(List.of("a:0", "a:1")), + 1002, + 0, + 1, + 5, + 123, + 99990, + 8000, + 222, + Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC), + 250, + 28000 + ); + } + + public static String simpleToJson() { + return """ + { + "processed_slices" : 2, + "processed_queries" : [ + "*:*" + ], + "processed_shards" : [ + "a:0", + "a:1" + ], + "process_nanos" : 1002, + "process_time" : "1micros", + "slice_index" : 0, + "total_slices" : 1, + "pages_emitted" : 5, + "slice_min" : 123, + "slice_max" : 99990, + "current" : 8000, + "rows_emitted" : 222, + "partitioning_strategies" : { + "a:1" : "DOC", + "b:0" : "SHARD" + }, + "tsid_loaded" : 250, + "values_loaded" : 28000 + }"""; + } + + public void testToXContent() { + assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson())); + } + + @Override + protected Writeable.Reader instanceReader() { + return TimeSeriesSourceOperator.Status::new; + } + + @Override + public TimeSeriesSourceOperator.Status createTestInstance() { + return new TimeSeriesSourceOperator.Status( + randomNonNegativeInt(), + randomProcessedQueries(), + randomProcessedShards(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomPartitioningStrategies(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + private static Set randomProcessedQueries() { + int size = between(0, 10); + Set set = new TreeSet<>(); + while (set.size() < size) { + set.add(randomAlphaOfLength(5)); + } + return set; + } + + private static Set randomProcessedShards() { + int size = between(0, 10); + Set set = new TreeSet<>(); + while (set.size() < size) { + set.add(randomAlphaOfLength(3) + ":" + between(0, 10)); + } + return set; + } + + private static Map randomPartitioningStrategies() { + int size = between(0, 10); + Map partitioningStrategies = new HashMap<>(); + while (partitioningStrategies.size() < size) { + partitioningStrategies.put( + randomAlphaOfLength(3) + ":" + between(0, 10), + randomFrom(LuceneSliceQueue.PartitioningStrategy.values()) + ); + } + return partitioningStrategies; + } + + @Override + protected TimeSeriesSourceOperator.Status mutateInstance(TimeSeriesSourceOperator.Status instance) { + int processedSlices = instance.processedSlices(); + Set processedQueries = instance.processedQueries(); + Set processedShards = instance.processedShards(); + long processNanos = instance.processNanos(); + int sliceIndex = instance.sliceIndex(); + int totalSlices = instance.totalSlices(); + int pagesEmitted = instance.pagesEmitted(); + int sliceMin = instance.sliceMin(); + int sliceMax = instance.sliceMax(); + int current = instance.current(); + long rowsEmitted = instance.rowsEmitted(); + long tsidLoaded = instance.tsidLoaded(); + long valuesLoaded = instance.valuesLoaded(); + Map partitioningStrategies = instance.partitioningStrategies(); + switch (between(0, 13)) { + case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt); + case 1 -> processedQueries = randomValueOtherThan( + processedQueries, + TimeSeriesSourceOperatorStatusTests::randomProcessedQueries + ); + case 2 -> processedShards = randomValueOtherThan(processedShards, TimeSeriesSourceOperatorStatusTests::randomProcessedShards); + case 3 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); + case 4 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt); + case 5 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt); + case 6 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt); + case 7 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt); + case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt); + case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt); + case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + case 11 -> partitioningStrategies = randomValueOtherThan( + partitioningStrategies, + TimeSeriesSourceOperatorStatusTests::randomPartitioningStrategies + ); + case 12 -> tsidLoaded = randomValueOtherThan(tsidLoaded, ESTestCase::randomNonNegativeLong); + case 13 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new TimeSeriesSourceOperator.Status( + processedSlices, + processedQueries, + processedShards, + processNanos, + sliceIndex, + totalSlices, + pagesEmitted, + sliceMin, + sliceMax, + current, + rowsEmitted, + partitioningStrategies, + tsidLoaded, + valuesLoaded + ); + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java index 9389c885a895..4ab74eabd746 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSourceOperatorTests.java @@ -28,12 +28,15 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.Randomness; +import org.elasticsearch.common.Strings; import org.elasticsearch.compute.data.BytesRefVector; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; +import org.elasticsearch.compute.operator.DriverStatus; import org.elasticsearch.compute.operator.Operator; import org.elasticsearch.compute.test.AnyOperatorTestCase; import org.elasticsearch.compute.test.OperatorTestCase; @@ -41,6 +44,7 @@ import org.elasticsearch.compute.test.TestDriverFactory; import org.elasticsearch.compute.test.TestResultPageSinkOperator; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.core.IOUtils; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper; import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper; @@ -108,6 +112,66 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase { } } + public void testStatus() { + int numTimeSeries = between(1, 5); + int numSamplesPerTS = between(1, 10); + long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z"); + int maxPageSize = 128; + DriverContext driverContext = driverContext(); + Driver driver = createDriver( + driverContext, + 1024, + maxPageSize, + true, + numTimeSeries, + numSamplesPerTS, + timestampStart, + Page::releaseBlocks + ); + OperatorTestCase.runDriver(driver); + DriverStatus driverStatus = driver.status(); + var status = (TimeSeriesSourceOperator.Status) driverStatus.completedOperators().get(0).status(); + assertThat(status.tsidLoaded(), equalTo((long) numTimeSeries)); + assertThat(status.rowsEmitted(), equalTo((long) numTimeSeries * numSamplesPerTS)); + assertThat(status.documentsFound(), equalTo((long) numTimeSeries * numSamplesPerTS)); + assertThat(status.valuesLoaded(), equalTo((long) numTimeSeries * numSamplesPerTS * 3)); + + String expected = String.format( + Locale.ROOT, + """ + { + "processed_slices" : 1, + "processed_queries" : [ + "*:*" + ], + "processed_shards" : [ + "test" + ], + "process_nanos" : %d, + "process_time" : "%s", + "slice_index" : 0, + "total_slices" : 1, + "pages_emitted" : 1, + "slice_min" : 0, + "slice_max" : 0, + "current" : 0, + "rows_emitted" : %s, + "partitioning_strategies" : { + "test" : "SHARD" + }, + "tsid_loaded" : %d, + "values_loaded" : %d + } + """, + status.processNanos(), + TimeValue.timeValueNanos(status.processNanos()), + status.rowsEmitted(), + status.tsidLoaded(), + status.valuesLoaded() + ); + assertThat(Strings.toString(status, true, true).trim(), equalTo(expected.trim())); + } + public void testLimit() { int numTimeSeries = 3; int numSamplesPerTS = 10; @@ -279,11 +343,33 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase { @Override protected Matcher expectedToStringOfSimple() { - return equalTo("TimeSeriesSourceOperator[maxPageSize=1, remainingDocs=1]"); + return equalTo("TimeSeriesSourceOperator[shards = [test], maxPageSize = 1[maxPageSize=1, remainingDocs=1]]"); } List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) { var ctx = driverContext(); + List results = new ArrayList<>(); + OperatorTestCase.runDriver( + createDriver(ctx, limit, maxPageSize, forceMerge, numTimeSeries, numSamplesPerTS, timestampStart, results::add) + ); + OperatorTestCase.assertDriverContext(ctx); + for (Page result : results) { + assertThat(result.getPositionCount(), lessThanOrEqualTo(maxPageSize)); + assertThat(result.getPositionCount(), lessThanOrEqualTo(limit)); + } + return results; + } + + Driver createDriver( + DriverContext driverContext, + int limit, + int maxPageSize, + boolean forceMerge, + int numTimeSeries, + int numSamplesPerTS, + long timestampStart, + Consumer consumer + ) { var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG); var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname"); var timeSeriesFactory = createTimeSeriesSourceOperator( @@ -307,17 +393,12 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase { return numTimeSeries * numSamplesPerTS; } ); - - List results = new ArrayList<>(); - OperatorTestCase.runDriver( - TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add)) + return TestDriverFactory.create( + driverContext, + timeSeriesFactory.get(driverContext), + List.of(), + new TestResultPageSinkOperator(consumer) ); - OperatorTestCase.assertDriverContext(ctx); - for (Page result : results) { - assertThat(result.getPositionCount(), lessThanOrEqualTo(maxPageSize)); - assertThat(result.getPositionCount(), lessThanOrEqualTo(limit)); - } - return results; } public record ExtractField(MappedFieldType ft, ElementType elementType) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java index 713e2e08e750..6ba7c9747bf0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java @@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockFactoryProvider; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.compute.lucene.LuceneOperator; +import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator; @@ -304,6 +305,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin { entries.add(HashAggregationOperator.Status.ENTRY); entries.add(LimitOperator.Status.ENTRY); entries.add(LuceneOperator.Status.ENTRY); + entries.add(TimeSeriesSourceOperator.Status.ENTRY); entries.add(TopNOperatorStatus.ENTRY); entries.add(MvExpandOperator.Status.ENTRY); entries.add(ValuesSourceReaderOperator.Status.ENTRY);