Add block loader from stored field and source for ip field (#126644)

This commit is contained in:
Oleksandr Kolomiiets 2025-04-11 13:37:15 -07:00 committed by GitHub
parent 5c5a87aba4
commit 9d18d5280a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 331 additions and 12 deletions

View file

@ -0,0 +1,5 @@
pr: 126644
summary: Add block loader from stored field and source for ip field
area: Mapping
type: enhancement
issues: []

View file

@ -9,6 +9,7 @@
package org.elasticsearch.index.mapper;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedSetDocValues;
@ -20,6 +21,7 @@ import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.search.fetch.StoredFieldsSpec;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -381,6 +383,46 @@ public abstract class BlockSourceReader implements BlockLoader.RowStrideReader {
}
}
/**
* Load {@code ip}s from {@code _source}.
*/
public static class IpsBlockLoader extends SourceBlockLoader {
public IpsBlockLoader(ValueFetcher fetcher, LeafIteratorLookup lookup) {
super(fetcher, lookup);
}
@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.bytesRefs(expectedCount);
}
@Override
public RowStrideReader rowStrideReader(LeafReaderContext context, DocIdSetIterator iter) {
return new Ips(fetcher, iter);
}
@Override
protected String name() {
return "Ips";
}
}
private static class Ips extends BlockSourceReader {
Ips(ValueFetcher fetcher, DocIdSetIterator iter) {
super(fetcher, iter);
}
@Override
protected void append(BlockLoader.Builder builder, Object v) {
((BlockLoader.BytesRefBuilder) builder).appendBytesRef(new BytesRef(InetAddressPoint.encode((InetAddress) v)));
}
@Override
public String toString() {
return "BlockSourceReader.Ips";
}
}
/**
* Convert a {@link String} into a utf-8 {@link BytesRef}.
*/

View file

@ -43,6 +43,7 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.lookup.FieldValues;
import org.elasticsearch.search.lookup.SearchLookup;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.net.InetAddress;
@ -51,8 +52,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.BiFunction;
import static org.elasticsearch.index.mapper.FieldArrayContext.getOffsetsFieldName;
@ -213,7 +216,8 @@ public class IpFieldMapper extends FieldMapper {
parseNullValue(),
scriptValues(),
meta.getValue(),
dimension.getValue()
dimension.getValue(),
context.isSourceSynthetic()
),
builderParams(this, context),
context.isSourceSynthetic(),
@ -234,6 +238,7 @@ public class IpFieldMapper extends FieldMapper {
private final InetAddress nullValue;
private final FieldValues<InetAddress> scriptValues;
private final boolean isDimension;
private final boolean isSyntheticSource;
public IpFieldType(
String name,
@ -243,12 +248,14 @@ public class IpFieldMapper extends FieldMapper {
InetAddress nullValue,
FieldValues<InetAddress> scriptValues,
Map<String, String> meta,
boolean isDimension
boolean isDimension,
boolean isSyntheticSource
) {
super(name, indexed, stored, hasDocValues, TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS, meta);
this.nullValue = nullValue;
this.scriptValues = scriptValues;
this.isDimension = isDimension;
this.isSyntheticSource = isSyntheticSource;
}
public IpFieldType(String name) {
@ -260,7 +267,7 @@ public class IpFieldMapper extends FieldMapper {
}
public IpFieldType(String name, boolean isIndexed, boolean hasDocValues) {
this(name, isIndexed, false, hasDocValues, null, null, Collections.emptyMap(), false);
this(name, isIndexed, false, hasDocValues, null, null, Collections.emptyMap(), false, false);
}
@Override
@ -452,10 +459,79 @@ public class IpFieldMapper extends FieldMapper {
@Override
public BlockLoader blockLoader(BlockLoaderContext blContext) {
if (hasDocValues()) {
if (hasDocValues() && (blContext.fieldExtractPreference() != FieldExtractPreference.STORED || isSyntheticSource)) {
return new BlockDocValuesReader.BytesRefsFromOrdsBlockLoader(name());
}
return null;
if (isStored()) {
return new BlockStoredFieldsReader.BytesFromBytesRefsBlockLoader(name());
}
if (isSyntheticSource) {
return blockLoaderFromFallbackSyntheticSource(blContext);
}
// see #indexValue
BlockSourceReader.LeafIteratorLookup lookup = hasDocValues() == false && isIndexed()
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
: BlockSourceReader.lookupMatchingAll();
return new BlockSourceReader.IpsBlockLoader(sourceValueFetcher(blContext.sourcePaths(name())), lookup);
}
private BlockLoader blockLoaderFromFallbackSyntheticSource(BlockLoaderContext blContext) {
var reader = new FallbackSyntheticSourceBlockLoader.SingleValueReader<InetAddress>(nullValue) {
@Override
public void convertValue(Object value, List<InetAddress> accumulator) {
if (value instanceof InetAddress ia) {
accumulator.add(ia);
}
try {
var address = InetAddresses.forString(value.toString());
accumulator.add(address);
} catch (Exception e) {
// Malformed value, skip it
}
}
@Override
protected void parseNonNullValue(XContentParser parser, List<InetAddress> accumulator) throws IOException {
// aligned with #parseCreateField()
String value = parser.text();
try {
var address = InetAddresses.forString(value);
accumulator.add(address);
} catch (Exception e) {
// Malformed value, skip it
}
}
@Override
public void writeToBlock(List<InetAddress> values, BlockLoader.Builder blockBuilder) {
var bytesRefBuilder = (BlockLoader.BytesRefBuilder) blockBuilder;
for (var value : values) {
bytesRefBuilder.appendBytesRef(new BytesRef(InetAddressPoint.encode(value)));
}
}
};
return new FallbackSyntheticSourceBlockLoader(reader, name()) {
@Override
public Builder builder(BlockFactory factory, int expectedCount) {
return factory.bytesRefs(expectedCount);
}
};
}
private SourceValueFetcher sourceValueFetcher(Set<String> sourcePaths) {
return new SourceValueFetcher(sourcePaths, nullValue) {
@Override
public InetAddress parseSourceValue(Object value) {
return parse(value);
}
};
}
@Override

View file

@ -105,6 +105,7 @@ public class IpFieldTypeTests extends FieldTypeTestCase {
null,
null,
Collections.emptyMap(),
false,
false
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> unsearchable.termQuery("::1", MOCK_CONTEXT));
@ -339,6 +340,7 @@ public class IpFieldTypeTests extends FieldTypeTestCase {
null,
null,
Collections.emptyMap(),
false,
false
);
IllegalArgumentException e = expectThrows(

View file

@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.index.mapper.blockloader;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.index.mapper.BlockLoaderTestCase;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.logsdb.datageneration.FieldType;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class IpFieldBlockLoaderTests extends BlockLoaderTestCase {
public IpFieldBlockLoaderTests(Params params) {
super(FieldType.IP.toString(), params);
}
@Override
@SuppressWarnings("unchecked")
protected Object expected(Map<String, Object> fieldMapping, Object value, TestContext testContext) {
var rawNullValue = (String) fieldMapping.get("null_value");
BytesRef nullValue = convert(rawNullValue, null);
if (value == null) {
return convert(null, nullValue);
}
if (value instanceof String s) {
return convert(s, nullValue);
}
boolean hasDocValues = hasDocValues(fieldMapping, true);
boolean useDocValues = params.preference() == MappedFieldType.FieldExtractPreference.NONE
|| params.preference() == MappedFieldType.FieldExtractPreference.DOC_VALUES
|| params.syntheticSource();
if (hasDocValues && useDocValues) {
var resultList = ((List<String>) value).stream()
.map(v -> convert(v, nullValue))
.filter(Objects::nonNull)
.distinct()
.sorted()
.toList();
return maybeFoldList(resultList);
}
// field is stored or using source
var resultList = ((List<String>) value).stream().map(v -> convert(v, nullValue)).filter(Objects::nonNull).toList();
return maybeFoldList(resultList);
}
private static BytesRef convert(Object value, BytesRef nullValue) {
if (value == null) {
return nullValue;
}
if (value instanceof String s) {
try {
var address = InetAddresses.forString(s);
return new BytesRef(InetAddressPoint.encode(address));
} catch (Exception ex) {
// malformed
return null;
}
}
return null;
}
}

View file

@ -1328,6 +1328,7 @@ public class TermsAggregatorTests extends AggregatorTestCase {
null,
null,
Collections.emptyMap(),
false,
false
);
testCase(iw -> {

View file

@ -19,6 +19,7 @@ import org.elasticsearch.logsdb.datageneration.fields.leaf.FloatFieldDataGenerat
import org.elasticsearch.logsdb.datageneration.fields.leaf.GeoPointFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.HalfFloatFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.IntegerFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.IpFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.KeywordFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.LongFieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.fields.leaf.ScaledFloatFieldDataGenerator;
@ -44,7 +45,8 @@ public enum FieldType {
BOOLEAN("boolean"),
DATE("date"),
GEO_POINT("geo_point"),
TEXT("text");
TEXT("text"),
IP("ip");
private final String name;
@ -69,6 +71,7 @@ public enum FieldType {
case DATE -> new DateFieldDataGenerator(dataSource);
case GEO_POINT -> new GeoPointFieldDataGenerator(dataSource);
case TEXT -> new TextFieldDataGenerator(dataSource);
case IP -> new IpFieldDataGenerator(dataSource);
};
}
@ -89,6 +92,7 @@ public enum FieldType {
case "date" -> FieldType.DATE;
case "geo_point" -> FieldType.GEO_POINT;
case "text" -> FieldType.TEXT;
case "ip" -> FieldType.IP;
default -> null;
};
}

View file

@ -74,6 +74,10 @@ public interface DataSourceHandler {
return null;
}
default DataSourceResponse.IpGenerator handle(DataSourceRequest.IpGenerator request) {
return null;
}
default DataSourceResponse.NullWrapper handle(DataSourceRequest.NullWrapper request) {
return null;
}

View file

@ -120,6 +120,12 @@ public interface DataSourceRequest<TResponse extends DataSourceResponse> {
}
}
record IpGenerator() implements DataSourceRequest<DataSourceResponse.IpGenerator> {
public DataSourceResponse.IpGenerator accept(DataSourceHandler handler) {
return handler.handle(this);
}
}
record NullWrapper() implements DataSourceRequest<DataSourceResponse.NullWrapper> {
public DataSourceResponse.NullWrapper accept(DataSourceHandler handler) {
return handler.handle(this);

View file

@ -11,6 +11,7 @@ package org.elasticsearch.logsdb.datageneration.datasource;
import org.elasticsearch.geometry.Geometry;
import java.net.InetAddress;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
@ -50,6 +51,8 @@ public interface DataSourceResponse {
record GeoPointGenerator(Supplier<Object> generator) implements DataSourceResponse {}
record IpGenerator(Supplier<InetAddress> generator) implements DataSourceResponse {}
record NullWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}
record ArrayWrapper(Function<Supplier<Object>, Supplier<Object>> wrapper) implements DataSourceResponse {}

View file

@ -9,6 +9,7 @@
package org.elasticsearch.logsdb.datageneration.datasource;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.geo.GeometryTestUtils;
import org.elasticsearch.geometry.utils.WellKnownText;
import org.elasticsearch.index.mapper.Mapper;
@ -49,6 +50,7 @@ public class DefaultMappingParametersHandler implements DataSourceHandler {
case DATE -> dateMapping(map);
case GEO_POINT -> geoPointMapping(map);
case TEXT -> textMapping(request, new HashMap<>());
case IP -> ipMapping(map);
});
}
@ -209,6 +211,20 @@ public class DefaultMappingParametersHandler implements DataSourceHandler {
};
}
private Supplier<Map<String, Object>> ipMapping(Map<String, Object> injected) {
return () -> {
if (ESTestCase.randomDouble() <= 0.2) {
injected.put("null_value", NetworkAddress.format(ESTestCase.randomIp(ESTestCase.randomBoolean())));
}
if (ESTestCase.randomBoolean()) {
injected.put("ignore_malformed", ESTestCase.randomBoolean());
}
return injected;
};
}
private static HashMap<String, Object> commonMappingParameters() {
var map = new HashMap<String, Object>();
map.put("store", ESTestCase.randomBoolean());

View file

@ -78,4 +78,9 @@ public class DefaultPrimitiveTypesHandler implements DataSourceHandler {
public DataSourceResponse.GeoPointGenerator handle(DataSourceRequest.GeoPointGenerator request) {
return new DataSourceResponse.GeoPointGenerator(() -> RandomGeoGenerator.randomPoint(ESTestCase.random()));
}
@Override
public DataSourceResponse.IpGenerator handle(DataSourceRequest.IpGenerator request) {
return new DataSourceResponse.IpGenerator(() -> ESTestCase.randomIp(ESTestCase.randomBoolean()));
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/
package org.elasticsearch.logsdb.datageneration.fields.leaf;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.logsdb.datageneration.FieldDataGenerator;
import org.elasticsearch.logsdb.datageneration.datasource.DataSource;
import org.elasticsearch.logsdb.datageneration.datasource.DataSourceRequest;
import java.net.InetAddress;
import java.util.Map;
import java.util.function.Supplier;
public class IpFieldDataGenerator implements FieldDataGenerator {
private final Supplier<Object> valueGenerator;
private final Supplier<Object> valueGeneratorWithMalformed;
public IpFieldDataGenerator(DataSource dataSource) {
Supplier<InetAddress> ips = dataSource.get(new DataSourceRequest.IpGenerator()).generator();
Supplier<String> formattedIps = () -> NetworkAddress.format(ips.get());
this.valueGenerator = Wrappers.defaults(formattedIps::get, dataSource);
var strings = dataSource.get(new DataSourceRequest.StringGenerator()).generator();
this.valueGeneratorWithMalformed = Wrappers.defaultsWithMalformed(formattedIps::get, strings::get, dataSource);
}
@Override
public Object generateValue(Map<String, Object> fieldMapping) {
if (fieldMapping != null && (Boolean) fieldMapping.getOrDefault("ignore_malformed", false)) {
return valueGeneratorWithMalformed.get();
}
return valueGenerator.get();
}
}

View file

@ -62,6 +62,7 @@ interface FieldSpecificMatcher {
put("shape", new ExactMatcher("shape", actualMappings, actualSettings, expectedMappings, expectedSettings));
put("geo_point", new GeoPointMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings));
put("text", new TextMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings));
put("ip", new IpMatcher(actualMappings, actualSettings, expectedMappings, expectedSettings));
}
};
}
@ -666,6 +667,30 @@ interface FieldSpecificMatcher {
}
}
class IpMatcher extends GenericMappingAwareMatcher {
IpMatcher(
XContentBuilder actualMappings,
Settings.Builder actualSettings,
XContentBuilder expectedMappings,
Settings.Builder expectedSettings
) {
super("ip", actualMappings, actualSettings, expectedMappings, expectedSettings);
}
@Override
Object convert(Object value, Object nullValue) {
if (value == null) {
if (nullValue != null) {
return nullValue;
}
return null;
}
// We should be always able to convert an IP back to original string.
return value;
}
}
/**
* Generic matcher that supports common matching logic like null values.
*/

View file

@ -124,6 +124,7 @@ tasks.named("yamlRestCompatTestTransform").configure({ task ->
task.skipTest("ml/post_data/Test POST data job api, flush, close and verify DataCounts doc", "Flush API is deprecated")
task.replaceValueInMatch("Size", 49, "Test flamegraph from profiling-events")
task.replaceValueInMatch("Size", 49, "Test flamegraph from test-events")
task.skipTest("esql/90_non_indexed/fetch", "Temporary until backported")
})
tasks.named('yamlRestCompatTest').configure {

View file

@ -995,7 +995,12 @@ public class EsqlCapabilities {
/**
* Support avg_over_time aggregation that gets evaluated per time-series
*/
AVG_OVER_TIME(Build.current().isSnapshot());
AVG_OVER_TIME(Build.current().isSnapshot()),
/**
* Support loading of ip fields if they are not indexed.
*/
LOADING_NON_INDEXED_IP_FIELDS;
private final boolean enabled;

View file

@ -1,8 +1,12 @@
setup:
- requires:
cluster_features: ["gte_v8.12.0"]
reason: "extracting non-indexed fields available in 8.12+"
test_runner_features: allowed_warnings_regex
test_runner_features: [capabilities, allowed_warnings_regex]
capabilities:
- method: POST
path: /_query
parameters: []
capabilities: [loading_non_indexed_ip_fields]
reason: "Support for loading non indexed ip"
- do:
indices.create:
index: test
@ -96,7 +100,6 @@ setup:
fetch:
- do:
allowed_warnings_regex:
- "Field \\[ip_noidx\\] cannot be retrieved, it is unsupported or not indexed; returning null"
- "No limit defined, adding default limit of \\[.*\\]"
esql.query:
@ -150,10 +153,11 @@ fetch:
- match: { values.0.5: 40 }
- match: { values.0.6: 30 }
- match: { values.0.7: 30 }
- match: { values.0.7: 30 }
- match: { values.0.8: 10 }
- match: { values.0.9: 10 }
- match: { values.0.10: "192.168.0.1" }
- match: { values.0.11: null }
- match: { values.0.11: "192.168.0.1" }
- match: { values.0.12: "foo" }
- match: { values.0.13: "foo" }
- match: { values.0.14: 20 }