mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-06-28 09:28:55 -04:00
Add status for time-series source operator (#127932)
This change provides more detailed status information for the TimeSeriesSourceOperator. ```json { "processed_slices" : 1, "processed_queries" : [ "*:*" ], "processed_shards" : [ "test" ], "process_nanos" : 13315249, "process_time" : "13.3ms", "slice_index" : 0, "total_slices" : 1, "pages_emitted" : 1, "slice_min" : 0, "slice_max" : 0, "current" : 0, "rows_emitted" : 6, "partitioning_strategies" : { "test" : "SHARD" }, "tsid_loaded" : 1, "values_loaded" : 18 } ```
This commit is contained in:
parent
9bd7a3101d
commit
a3700ff598
6 changed files with 428 additions and 25 deletions
|
@ -252,6 +252,7 @@ public class TransportVersions {
|
|||
public static final TransportVersion FIELD_CAPS_ADD_CLUSTER_ALIAS = def(9_073_0_00);
|
||||
public static final TransportVersion INFERENCE_ADD_TIMEOUT_PUT_ENDPOINT = def(9_074_0_00);
|
||||
public static final TransportVersion ESQL_FIELD_ATTRIBUTE_DROP_TYPE = def(9_075_0_00);
|
||||
public static final TransportVersion ESQL_TIME_SERIES_SOURCE_STATUS = def(9_076_0_00);
|
||||
|
||||
/*
|
||||
* STOP! READ THIS FIRST! No, really,
|
||||
|
|
|
@ -57,12 +57,12 @@ public abstract class LuceneOperator extends SourceOperator {
|
|||
/**
|
||||
* Count of the number of slices processed.
|
||||
*/
|
||||
private int processedSlices;
|
||||
int processedSlices;
|
||||
final int maxPageSize;
|
||||
private final LuceneSliceQueue sliceQueue;
|
||||
|
||||
private final Set<Query> processedQueries = new HashSet<>();
|
||||
private final Set<String> processedShards = new HashSet<>();
|
||||
final Set<Query> processedQueries = new HashSet<>();
|
||||
final Set<String> processedShards = new HashSet<>();
|
||||
|
||||
private LuceneSlice currentSlice;
|
||||
private int sliceIndex;
|
||||
|
@ -75,7 +75,7 @@ public abstract class LuceneOperator extends SourceOperator {
|
|||
/**
|
||||
* Count of rows this operator has emitted.
|
||||
*/
|
||||
private long rowsEmitted;
|
||||
long rowsEmitted;
|
||||
|
||||
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
|
||||
this.blockFactory = blockFactory;
|
||||
|
@ -277,7 +277,7 @@ public abstract class LuceneOperator extends SourceOperator {
|
|||
private final long rowsEmitted;
|
||||
private final Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies;
|
||||
|
||||
private Status(LuceneOperator operator) {
|
||||
protected Status(LuceneOperator operator) {
|
||||
processedSlices = operator.processedSlices;
|
||||
processedQueries = operator.processedQueries.stream().map(Query::toString).collect(Collectors.toCollection(TreeSet::new));
|
||||
processNanos = operator.processingNanos;
|
||||
|
@ -447,6 +447,11 @@ public abstract class LuceneOperator extends SourceOperator {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
toXContentFields(builder, params);
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("processed_slices", processedSlices);
|
||||
builder.field("processed_queries", processedQueries);
|
||||
builder.field("processed_shards", processedShards);
|
||||
|
@ -462,7 +467,6 @@ public abstract class LuceneOperator extends SourceOperator {
|
|||
builder.field("current", current);
|
||||
builder.field("rows_emitted", rowsEmitted);
|
||||
builder.field("partitioning_strategies", new TreeMap<>(this.partitioningStrategies));
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -17,6 +17,11 @@ import org.apache.lucene.search.Scorer;
|
|||
import org.apache.lucene.search.Weight;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.PriorityQueue;
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.TransportVersions;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.compute.data.Block;
|
||||
import org.elasticsearch.compute.data.BlockFactory;
|
||||
import org.elasticsearch.compute.data.BytesRefVector;
|
||||
|
@ -25,7 +30,7 @@ import org.elasticsearch.compute.data.IntVector;
|
|||
import org.elasticsearch.compute.data.LongVector;
|
||||
import org.elasticsearch.compute.data.OrdinalBytesRefVector;
|
||||
import org.elasticsearch.compute.data.Page;
|
||||
import org.elasticsearch.compute.operator.SourceOperator;
|
||||
import org.elasticsearch.compute.operator.Operator;
|
||||
import org.elasticsearch.core.Releasable;
|
||||
import org.elasticsearch.core.Releasables;
|
||||
import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
|
||||
|
@ -33,13 +38,17 @@ import org.elasticsearch.index.mapper.BlockLoader;
|
|||
import org.elasticsearch.index.mapper.BlockLoaderStoredFieldsFromLeafLoader;
|
||||
import org.elasticsearch.index.mapper.SourceLoader;
|
||||
import org.elasticsearch.search.fetch.StoredFieldsSpec;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public final class TimeSeriesSourceOperator extends SourceOperator {
|
||||
public final class TimeSeriesSourceOperator extends LuceneOperator {
|
||||
|
||||
private final boolean emitDocIds;
|
||||
private final int maxPageSize;
|
||||
|
@ -55,6 +64,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
private final List<ValuesSourceReaderOperator.FieldInfo> fieldsToExtracts;
|
||||
private ShardLevelFieldsReader fieldsReader;
|
||||
private DocIdCollector docCollector;
|
||||
private long tsidsLoaded;
|
||||
|
||||
TimeSeriesSourceOperator(
|
||||
BlockFactory blockFactory,
|
||||
|
@ -64,6 +74,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
int maxPageSize,
|
||||
int limit
|
||||
) {
|
||||
super(blockFactory, maxPageSize, sliceQueue);
|
||||
this.maxPageSize = maxPageSize;
|
||||
this.blockFactory = blockFactory;
|
||||
this.fieldsToExtracts = fieldsToExtract;
|
||||
|
@ -85,7 +96,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public Page getOutput() {
|
||||
public Page getCheckedOutput() throws IOException {
|
||||
if (isFinished()) {
|
||||
return null;
|
||||
}
|
||||
|
@ -97,6 +108,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
|
||||
Page page = null;
|
||||
Block[] blocks = new Block[(emitDocIds ? 3 : 2) + fieldsToExtracts.size()];
|
||||
long startInNanos = System.nanoTime();
|
||||
try {
|
||||
if (iterator == null) {
|
||||
var slice = sliceQueue.nextSlice();
|
||||
|
@ -130,6 +142,8 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
currentPagePos = 0;
|
||||
}
|
||||
if (iterator.completed()) {
|
||||
processedShards.add(iterator.luceneSlice.shardContext().shardIdentifier());
|
||||
processedSlices++;
|
||||
Releasables.close(docCollector, fieldsReader);
|
||||
iterator = null;
|
||||
}
|
||||
|
@ -139,6 +153,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
if (page == null) {
|
||||
Releasables.closeExpectNoException(blocks);
|
||||
}
|
||||
processingNanos += System.nanoTime() - startInNanos;
|
||||
}
|
||||
return page;
|
||||
}
|
||||
|
@ -162,6 +177,7 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
}
|
||||
};
|
||||
Weight weight = luceneSlice.weight();
|
||||
processedQueries.add(weight.getQuery());
|
||||
int maxSegmentOrd = 0;
|
||||
for (var leafReaderContext : luceneSlice.leaves()) {
|
||||
LeafIterator leafIterator = new LeafIterator(weight, leafReaderContext.leafReaderContext());
|
||||
|
@ -237,6 +253,9 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
break;
|
||||
}
|
||||
}
|
||||
if (oneTsidQueue.size() > 0) {
|
||||
++tsidsLoaded;
|
||||
}
|
||||
}
|
||||
return oneTsidQueue;
|
||||
}
|
||||
|
@ -306,11 +325,6 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "[" + "maxPageSize=" + maxPageSize + ", remainingDocs=" + remainingDocs + "]";
|
||||
}
|
||||
|
||||
static class BlockLoaderFactory extends ValuesSourceReaderOperator.DelegatingBlockLoaderFactory {
|
||||
BlockLoaderFactory(BlockFactory factory) {
|
||||
super(factory);
|
||||
|
@ -571,4 +585,118 @@ public final class TimeSeriesSourceOperator extends SourceOperator {
|
|||
Releasables.close(docsBuilder, segmentsBuilder);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void describe(StringBuilder sb) {
|
||||
sb.append("[" + "maxPageSize=").append(maxPageSize).append(", remainingDocs=").append(remainingDocs).append("]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Operator.Status status() {
|
||||
final long valuesLoaded = rowsEmitted * (1 + fieldsToExtracts.size()); // @timestamp and other fields
|
||||
return new Status(this, tsidsLoaded, valuesLoaded);
|
||||
}
|
||||
|
||||
public static class Status extends LuceneOperator.Status {
|
||||
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
||||
Operator.Status.class,
|
||||
"time_series_source",
|
||||
Status::new
|
||||
);
|
||||
|
||||
private final long tsidLoaded;
|
||||
private final long valuesLoaded;
|
||||
|
||||
Status(TimeSeriesSourceOperator operator, long tsidLoaded, long valuesLoaded) {
|
||||
super(operator);
|
||||
this.tsidLoaded = tsidLoaded;
|
||||
this.valuesLoaded = valuesLoaded;
|
||||
}
|
||||
|
||||
Status(
|
||||
int processedSlices,
|
||||
Set<String> processedQueries,
|
||||
Set<String> processedShards,
|
||||
long processNanos,
|
||||
int sliceIndex,
|
||||
int totalSlices,
|
||||
int pagesEmitted,
|
||||
int sliceMin,
|
||||
int sliceMax,
|
||||
int current,
|
||||
long rowsEmitted,
|
||||
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies,
|
||||
long tsidLoaded,
|
||||
long valuesLoaded
|
||||
) {
|
||||
super(
|
||||
processedSlices,
|
||||
processedQueries,
|
||||
processedShards,
|
||||
processNanos,
|
||||
sliceIndex,
|
||||
totalSlices,
|
||||
pagesEmitted,
|
||||
sliceMin,
|
||||
sliceMax,
|
||||
current,
|
||||
rowsEmitted,
|
||||
partitioningStrategies
|
||||
);
|
||||
this.tsidLoaded = tsidLoaded;
|
||||
this.valuesLoaded = valuesLoaded;
|
||||
}
|
||||
|
||||
Status(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
this.tsidLoaded = in.readVLong();
|
||||
this.valuesLoaded = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(tsidLoaded);
|
||||
out.writeVLong(valuesLoaded);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void toXContentFields(XContentBuilder builder, Params params) throws IOException {
|
||||
super.toXContentFields(builder, params);
|
||||
builder.field("tsid_loaded", tsidLoaded);
|
||||
builder.field("values_loaded", valuesLoaded);
|
||||
}
|
||||
|
||||
public long tsidLoaded() {
|
||||
return tsidLoaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return ENTRY.name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportsVersion(TransportVersion version) {
|
||||
return version.onOrAfter(TransportVersions.ESQL_TIME_SERIES_SOURCE_STATUS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long valuesLoaded() {
|
||||
return valuesLoaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
if (super.equals(o) == false) return false;
|
||||
Status status = (Status) o;
|
||||
return tsidLoaded == status.tsidLoaded && valuesLoaded == status.valuesLoaded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(super.hashCode(), tsidLoaded, valuesLoaded);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,187 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.compute.lucene;
|
||||
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.test.AbstractWireSerializingTestCase;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TimeSeriesSourceOperatorStatusTests extends AbstractWireSerializingTestCase<TimeSeriesSourceOperator.Status> {
|
||||
public static TimeSeriesSourceOperator.Status simple() {
|
||||
return new TimeSeriesSourceOperator.Status(
|
||||
2,
|
||||
Set.of("*:*"),
|
||||
new TreeSet<>(List.of("a:0", "a:1")),
|
||||
1002,
|
||||
0,
|
||||
1,
|
||||
5,
|
||||
123,
|
||||
99990,
|
||||
8000,
|
||||
222,
|
||||
Map.of("b:0", LuceneSliceQueue.PartitioningStrategy.SHARD, "a:1", LuceneSliceQueue.PartitioningStrategy.DOC),
|
||||
250,
|
||||
28000
|
||||
);
|
||||
}
|
||||
|
||||
public static String simpleToJson() {
|
||||
return """
|
||||
{
|
||||
"processed_slices" : 2,
|
||||
"processed_queries" : [
|
||||
"*:*"
|
||||
],
|
||||
"processed_shards" : [
|
||||
"a:0",
|
||||
"a:1"
|
||||
],
|
||||
"process_nanos" : 1002,
|
||||
"process_time" : "1micros",
|
||||
"slice_index" : 0,
|
||||
"total_slices" : 1,
|
||||
"pages_emitted" : 5,
|
||||
"slice_min" : 123,
|
||||
"slice_max" : 99990,
|
||||
"current" : 8000,
|
||||
"rows_emitted" : 222,
|
||||
"partitioning_strategies" : {
|
||||
"a:1" : "DOC",
|
||||
"b:0" : "SHARD"
|
||||
},
|
||||
"tsid_loaded" : 250,
|
||||
"values_loaded" : 28000
|
||||
}""";
|
||||
}
|
||||
|
||||
public void testToXContent() {
|
||||
assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson()));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Writeable.Reader<TimeSeriesSourceOperator.Status> instanceReader() {
|
||||
return TimeSeriesSourceOperator.Status::new;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeSeriesSourceOperator.Status createTestInstance() {
|
||||
return new TimeSeriesSourceOperator.Status(
|
||||
randomNonNegativeInt(),
|
||||
randomProcessedQueries(),
|
||||
randomProcessedShards(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeInt(),
|
||||
randomNonNegativeLong(),
|
||||
randomPartitioningStrategies(),
|
||||
randomNonNegativeLong(),
|
||||
randomNonNegativeLong()
|
||||
);
|
||||
}
|
||||
|
||||
private static Set<String> randomProcessedQueries() {
|
||||
int size = between(0, 10);
|
||||
Set<String> set = new TreeSet<>();
|
||||
while (set.size() < size) {
|
||||
set.add(randomAlphaOfLength(5));
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
private static Set<String> randomProcessedShards() {
|
||||
int size = between(0, 10);
|
||||
Set<String> set = new TreeSet<>();
|
||||
while (set.size() < size) {
|
||||
set.add(randomAlphaOfLength(3) + ":" + between(0, 10));
|
||||
}
|
||||
return set;
|
||||
}
|
||||
|
||||
private static Map<String, LuceneSliceQueue.PartitioningStrategy> randomPartitioningStrategies() {
|
||||
int size = between(0, 10);
|
||||
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = new HashMap<>();
|
||||
while (partitioningStrategies.size() < size) {
|
||||
partitioningStrategies.put(
|
||||
randomAlphaOfLength(3) + ":" + between(0, 10),
|
||||
randomFrom(LuceneSliceQueue.PartitioningStrategy.values())
|
||||
);
|
||||
}
|
||||
return partitioningStrategies;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TimeSeriesSourceOperator.Status mutateInstance(TimeSeriesSourceOperator.Status instance) {
|
||||
int processedSlices = instance.processedSlices();
|
||||
Set<String> processedQueries = instance.processedQueries();
|
||||
Set<String> processedShards = instance.processedShards();
|
||||
long processNanos = instance.processNanos();
|
||||
int sliceIndex = instance.sliceIndex();
|
||||
int totalSlices = instance.totalSlices();
|
||||
int pagesEmitted = instance.pagesEmitted();
|
||||
int sliceMin = instance.sliceMin();
|
||||
int sliceMax = instance.sliceMax();
|
||||
int current = instance.current();
|
||||
long rowsEmitted = instance.rowsEmitted();
|
||||
long tsidLoaded = instance.tsidLoaded();
|
||||
long valuesLoaded = instance.valuesLoaded();
|
||||
Map<String, LuceneSliceQueue.PartitioningStrategy> partitioningStrategies = instance.partitioningStrategies();
|
||||
switch (between(0, 13)) {
|
||||
case 0 -> processedSlices = randomValueOtherThan(processedSlices, ESTestCase::randomNonNegativeInt);
|
||||
case 1 -> processedQueries = randomValueOtherThan(
|
||||
processedQueries,
|
||||
TimeSeriesSourceOperatorStatusTests::randomProcessedQueries
|
||||
);
|
||||
case 2 -> processedShards = randomValueOtherThan(processedShards, TimeSeriesSourceOperatorStatusTests::randomProcessedShards);
|
||||
case 3 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong);
|
||||
case 4 -> sliceIndex = randomValueOtherThan(sliceIndex, ESTestCase::randomNonNegativeInt);
|
||||
case 5 -> totalSlices = randomValueOtherThan(totalSlices, ESTestCase::randomNonNegativeInt);
|
||||
case 6 -> pagesEmitted = randomValueOtherThan(pagesEmitted, ESTestCase::randomNonNegativeInt);
|
||||
case 7 -> sliceMin = randomValueOtherThan(sliceMin, ESTestCase::randomNonNegativeInt);
|
||||
case 8 -> sliceMax = randomValueOtherThan(sliceMax, ESTestCase::randomNonNegativeInt);
|
||||
case 9 -> current = randomValueOtherThan(current, ESTestCase::randomNonNegativeInt);
|
||||
case 10 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
|
||||
case 11 -> partitioningStrategies = randomValueOtherThan(
|
||||
partitioningStrategies,
|
||||
TimeSeriesSourceOperatorStatusTests::randomPartitioningStrategies
|
||||
);
|
||||
case 12 -> tsidLoaded = randomValueOtherThan(tsidLoaded, ESTestCase::randomNonNegativeLong);
|
||||
case 13 -> valuesLoaded = randomValueOtherThan(valuesLoaded, ESTestCase::randomNonNegativeLong);
|
||||
default -> throw new UnsupportedOperationException();
|
||||
}
|
||||
return new TimeSeriesSourceOperator.Status(
|
||||
processedSlices,
|
||||
processedQueries,
|
||||
processedShards,
|
||||
processNanos,
|
||||
sliceIndex,
|
||||
totalSlices,
|
||||
pagesEmitted,
|
||||
sliceMin,
|
||||
sliceMax,
|
||||
current,
|
||||
rowsEmitted,
|
||||
partitioningStrategies,
|
||||
tsidLoaded,
|
||||
valuesLoaded
|
||||
);
|
||||
}
|
||||
}
|
|
@ -28,12 +28,15 @@ import org.apache.lucene.store.Directory;
|
|||
import org.apache.lucene.tests.index.RandomIndexWriter;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Randomness;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.compute.data.BytesRefVector;
|
||||
import org.elasticsearch.compute.data.DocVector;
|
||||
import org.elasticsearch.compute.data.ElementType;
|
||||
import org.elasticsearch.compute.data.LongVector;
|
||||
import org.elasticsearch.compute.data.Page;
|
||||
import org.elasticsearch.compute.operator.Driver;
|
||||
import org.elasticsearch.compute.operator.DriverContext;
|
||||
import org.elasticsearch.compute.operator.DriverStatus;
|
||||
import org.elasticsearch.compute.operator.Operator;
|
||||
import org.elasticsearch.compute.test.AnyOperatorTestCase;
|
||||
import org.elasticsearch.compute.test.OperatorTestCase;
|
||||
|
@ -41,6 +44,7 @@ import org.elasticsearch.compute.test.TestDriverFactory;
|
|||
import org.elasticsearch.compute.test.TestResultPageSinkOperator;
|
||||
import org.elasticsearch.core.CheckedFunction;
|
||||
import org.elasticsearch.core.IOUtils;
|
||||
import org.elasticsearch.core.TimeValue;
|
||||
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
|
||||
import org.elasticsearch.index.mapper.DateFieldMapper;
|
||||
import org.elasticsearch.index.mapper.KeywordFieldMapper;
|
||||
|
@ -108,6 +112,66 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testStatus() {
|
||||
int numTimeSeries = between(1, 5);
|
||||
int numSamplesPerTS = between(1, 10);
|
||||
long timestampStart = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-01-01T00:00:00Z");
|
||||
int maxPageSize = 128;
|
||||
DriverContext driverContext = driverContext();
|
||||
Driver driver = createDriver(
|
||||
driverContext,
|
||||
1024,
|
||||
maxPageSize,
|
||||
true,
|
||||
numTimeSeries,
|
||||
numSamplesPerTS,
|
||||
timestampStart,
|
||||
Page::releaseBlocks
|
||||
);
|
||||
OperatorTestCase.runDriver(driver);
|
||||
DriverStatus driverStatus = driver.status();
|
||||
var status = (TimeSeriesSourceOperator.Status) driverStatus.completedOperators().get(0).status();
|
||||
assertThat(status.tsidLoaded(), equalTo((long) numTimeSeries));
|
||||
assertThat(status.rowsEmitted(), equalTo((long) numTimeSeries * numSamplesPerTS));
|
||||
assertThat(status.documentsFound(), equalTo((long) numTimeSeries * numSamplesPerTS));
|
||||
assertThat(status.valuesLoaded(), equalTo((long) numTimeSeries * numSamplesPerTS * 3));
|
||||
|
||||
String expected = String.format(
|
||||
Locale.ROOT,
|
||||
"""
|
||||
{
|
||||
"processed_slices" : 1,
|
||||
"processed_queries" : [
|
||||
"*:*"
|
||||
],
|
||||
"processed_shards" : [
|
||||
"test"
|
||||
],
|
||||
"process_nanos" : %d,
|
||||
"process_time" : "%s",
|
||||
"slice_index" : 0,
|
||||
"total_slices" : 1,
|
||||
"pages_emitted" : 1,
|
||||
"slice_min" : 0,
|
||||
"slice_max" : 0,
|
||||
"current" : 0,
|
||||
"rows_emitted" : %s,
|
||||
"partitioning_strategies" : {
|
||||
"test" : "SHARD"
|
||||
},
|
||||
"tsid_loaded" : %d,
|
||||
"values_loaded" : %d
|
||||
}
|
||||
""",
|
||||
status.processNanos(),
|
||||
TimeValue.timeValueNanos(status.processNanos()),
|
||||
status.rowsEmitted(),
|
||||
status.tsidLoaded(),
|
||||
status.valuesLoaded()
|
||||
);
|
||||
assertThat(Strings.toString(status, true, true).trim(), equalTo(expected.trim()));
|
||||
}
|
||||
|
||||
public void testLimit() {
|
||||
int numTimeSeries = 3;
|
||||
int numSamplesPerTS = 10;
|
||||
|
@ -279,11 +343,33 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
|
|||
|
||||
@Override
|
||||
protected Matcher<String> expectedToStringOfSimple() {
|
||||
return equalTo("TimeSeriesSourceOperator[maxPageSize=1, remainingDocs=1]");
|
||||
return equalTo("TimeSeriesSourceOperator[shards = [test], maxPageSize = 1[maxPageSize=1, remainingDocs=1]]");
|
||||
}
|
||||
|
||||
List<Page> runDriver(int limit, int maxPageSize, boolean forceMerge, int numTimeSeries, int numSamplesPerTS, long timestampStart) {
|
||||
var ctx = driverContext();
|
||||
List<Page> results = new ArrayList<>();
|
||||
OperatorTestCase.runDriver(
|
||||
createDriver(ctx, limit, maxPageSize, forceMerge, numTimeSeries, numSamplesPerTS, timestampStart, results::add)
|
||||
);
|
||||
OperatorTestCase.assertDriverContext(ctx);
|
||||
for (Page result : results) {
|
||||
assertThat(result.getPositionCount(), lessThanOrEqualTo(maxPageSize));
|
||||
assertThat(result.getPositionCount(), lessThanOrEqualTo(limit));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
Driver createDriver(
|
||||
DriverContext driverContext,
|
||||
int limit,
|
||||
int maxPageSize,
|
||||
boolean forceMerge,
|
||||
int numTimeSeries,
|
||||
int numSamplesPerTS,
|
||||
long timestampStart,
|
||||
Consumer<Page> consumer
|
||||
) {
|
||||
var voltageField = new NumberFieldMapper.NumberFieldType("voltage", NumberFieldMapper.NumberType.LONG);
|
||||
var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname");
|
||||
var timeSeriesFactory = createTimeSeriesSourceOperator(
|
||||
|
@ -307,17 +393,12 @@ public class TimeSeriesSourceOperatorTests extends AnyOperatorTestCase {
|
|||
return numTimeSeries * numSamplesPerTS;
|
||||
}
|
||||
);
|
||||
|
||||
List<Page> results = new ArrayList<>();
|
||||
OperatorTestCase.runDriver(
|
||||
TestDriverFactory.create(ctx, timeSeriesFactory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add))
|
||||
return TestDriverFactory.create(
|
||||
driverContext,
|
||||
timeSeriesFactory.get(driverContext),
|
||||
List.of(),
|
||||
new TestResultPageSinkOperator(consumer)
|
||||
);
|
||||
OperatorTestCase.assertDriverContext(ctx);
|
||||
for (Page result : results) {
|
||||
assertThat(result.getPositionCount(), lessThanOrEqualTo(maxPageSize));
|
||||
assertThat(result.getPositionCount(), lessThanOrEqualTo(limit));
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
public record ExtractField(MappedFieldType ft, ElementType elementType) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.compute.data.BlockFactory;
|
|||
import org.elasticsearch.compute.data.BlockFactoryProvider;
|
||||
import org.elasticsearch.compute.lucene.DataPartitioning;
|
||||
import org.elasticsearch.compute.lucene.LuceneOperator;
|
||||
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
|
||||
import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
|
||||
import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
|
||||
import org.elasticsearch.compute.operator.AbstractPageMappingToIteratorOperator;
|
||||
|
@ -304,6 +305,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
|
|||
entries.add(HashAggregationOperator.Status.ENTRY);
|
||||
entries.add(LimitOperator.Status.ENTRY);
|
||||
entries.add(LuceneOperator.Status.ENTRY);
|
||||
entries.add(TimeSeriesSourceOperator.Status.ENTRY);
|
||||
entries.add(TopNOperatorStatus.ENTRY);
|
||||
entries.add(MvExpandOperator.Status.ENTRY);
|
||||
entries.add(ValuesSourceReaderOperator.Status.ENTRY);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue