ESQL: Prepare for backport of documents_found (#130039)

Prepares the `main` branch for the backport of #125631. Specifically,
this adds the version constant for 8.19 to main and the serialization
code that lets main talk to 8.19.
This commit is contained in:
Nik Everett 2025-06-26 07:51:17 -04:00 committed by GitHub
parent 4c7d922eeb
commit 59133d5e00
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 39 additions and 11 deletions

View file

@ -208,6 +208,7 @@ public class TransportVersions {
public static final TransportVersion SPARSE_VECTOR_FIELD_PRUNING_OPTIONS_8_19 = def(8_841_0_58);
public static final TransportVersion ML_INFERENCE_ELASTIC_DENSE_TEXT_EMBEDDINGS_ADDED_8_19 = def(8_841_0_59);
public static final TransportVersion ML_INFERENCE_COHERE_API_VERSION_8_19 = def(8_841_0_60);
public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61);
public static final TransportVersion V_9_0_0 = def(9_000_0_09);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10);
public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11);

View file

@ -11,6 +11,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
@ -47,6 +48,7 @@ import java.util.function.IntFunction;
import java.util.function.Supplier;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
/**
* Operator that extracts doc_values from a Lucene index out of pages that have been produced by {@link LuceneSourceOperator}
@ -617,18 +619,23 @@ public class ValuesSourceReaderOperator extends AbstractPageMappingOperator {
Status(StreamInput in) throws IOException {
super(in);
readersBuilt = in.readOrderedMap(StreamInput::readString, StreamInput::readVInt);
valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(readersBuilt, StreamOutput::writeVInt);
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsValuesLoaded(out.getTransportVersion())) {
out.writeVLong(valuesLoaded);
}
}
private static boolean supportsValuesLoaded(TransportVersion version) {
return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
|| version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
}
@Override
public String getWriteableName() {
return ENTRY.name;

View file

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
@ -37,6 +38,7 @@ import java.util.Objects;
import java.util.Optional;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.EsqlQueryResponse
implements
@ -120,8 +122,8 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
}
List<ColumnInfoImpl> columns = in.readCollectionAsList(ColumnInfoImpl::new);
List<Page> pages = in.readCollectionAsList(Page::new);
long documentsFound = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
long valuesLoaded = in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED) ? in.readVLong() : 0;
long documentsFound = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
long valuesLoaded = supportsValuesLoaded(in.getTransportVersion()) ? in.readVLong() : 0;
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
profile = in.readOptionalWriteable(Profile::readFrom);
}
@ -153,7 +155,7 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
}
out.writeCollection(columns);
out.writeCollection(pages);
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsValuesLoaded(out.getTransportVersion())) {
out.writeVLong(documentsFound);
out.writeVLong(valuesLoaded);
}
@ -166,6 +168,11 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
}
}
private static boolean supportsValuesLoaded(TransportVersion version) {
return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
|| version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
}
public List<ColumnInfoImpl> columns() {
return columns;
}

View file

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
@ -20,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
/**
* The compute result of {@link DataNodeRequest} or {@link ClusterComputeRequest}
@ -58,7 +60,7 @@ final class ComputeResponse extends TransportResponse {
}
ComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsCompletionInfo(in.getTransportVersion())) {
completionInfo = DriverCompletionInfo.readFrom(in);
} else if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
if (in.readBoolean()) {
@ -92,7 +94,7 @@ final class ComputeResponse extends TransportResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsCompletionInfo(out.getTransportVersion())) {
completionInfo.writeTo(out);
} else if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeBoolean(true);
@ -111,6 +113,11 @@ final class ComputeResponse extends TransportResponse {
}
}
private static boolean supportsCompletionInfo(TransportVersion version) {
return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
|| version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
}
public DriverCompletionInfo getCompletionInfo() {
return completionInfo;
}

View file

@ -7,6 +7,7 @@
package org.elasticsearch.xpack.esql.plugin;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.compute.operator.DriverCompletionInfo;
@ -19,6 +20,7 @@ import java.util.List;
import java.util.Map;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED;
import static org.elasticsearch.TransportVersions.ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19;
/**
* The compute result of {@link DataNodeRequest}
@ -33,7 +35,7 @@ final class DataNodeComputeResponse extends TransportResponse {
}
DataNodeComputeResponse(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsCompletionInfo(in.getTransportVersion())) {
this.completionInfo = DriverCompletionInfo.readFrom(in);
this.shardLevelFailures = in.readMap(ShardId::new, StreamInput::readException);
return;
@ -49,7 +51,7 @@ final class DataNodeComputeResponse extends TransportResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)) {
if (supportsCompletionInfo(out.getTransportVersion())) {
completionInfo.writeTo(out);
out.writeMap(shardLevelFailures, (o, v) -> v.writeTo(o), StreamOutput::writeException);
return;
@ -65,6 +67,11 @@ final class DataNodeComputeResponse extends TransportResponse {
new ComputeResponse(completionInfo).writeTo(out);
}
private static boolean supportsCompletionInfo(TransportVersion version) {
return version.onOrAfter(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED)
|| version.isPatchFrom(ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19);
}
public DriverCompletionInfo completionInfo() {
return completionInfo;
}

View file

@ -349,7 +349,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
columns,
result.pages(),
result.completionInfo().documentsFound(),
result.completionInfo().documentsFound(),
result.completionInfo().valuesLoaded(),
profile,
request.columnar(),
asyncExecutionId,

View file

@ -312,7 +312,6 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
}
default -> throw new IllegalArgumentException();
}
;
return new EsqlQueryResponse(columns, pages, documentsFound, valuesLoaded, profile, columnar, isAsync, executionInfo);
}