Support synthetic source for aggregate_metric_double when ignore_malformed is used (#108746)

This PR adds synthetic source support for aggregate_metric_double field when ignore_malformed is used.

This PR introduces a pattern that will be reused in ignore_malformed support in synthetic source for other (complex object-like) fields. The pattern is to create a "shadow" XContentBuilder that replicates all successfully parsed fields and values. In case of malformed data, everything remaining in the parser (inside the field) is copied over to the builder. As a result we get both successfully parsed pieces, malformed piece, and skipped pieces which is a full representation of user input and can go to synthetic source.
This commit is contained in:
Oleksandr Kolomiiets 2024-05-23 09:50:50 -07:00 committed by GitHub
parent 6806763cc8
commit a7c3bd580f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 235 additions and 27 deletions

View file

@ -0,0 +1,5 @@
pr: 108746
summary: Support synthetic source for `aggregate_metric_double` when ignore_malf…
area: Mapping
type: feature
issues: []

View file

@ -748,4 +748,24 @@ public class XContentHelper {
throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e); throw new ElasticsearchGenerationException("Failed to generate [" + source + "]", e);
} }
} }
/**
* Drains all data available via this parser into a provided builder.
* Provided parser is closed as a result.
* @param parser
* @param destination
*/
public static void drainAndClose(XContentParser parser, XContentBuilder destination) throws IOException {
if (parser.isClosed()) {
throw new IllegalStateException("Can't drain a parser that is closed");
}
XContentParser.Token token;
do {
destination.copyCurrentStructure(parser);
token = parser.nextToken();
} while (token != null);
parser.close();
}
} }

View file

@ -30,7 +30,7 @@ import java.util.Arrays;
/** /**
* Helper class for processing field data of any type, as provided by the {@link XContentParser}. * Helper class for processing field data of any type, as provided by the {@link XContentParser}.
*/ */
final class XContentDataHelper { public final class XContentDataHelper {
/** /**
* Build a {@link StoredField} for the value on which the parser is * Build a {@link StoredField} for the value on which the parser is
* currently positioned. * currently positioned.
@ -57,7 +57,7 @@ final class XContentDataHelper {
* Build a {@link BytesRef} wrapping a byte array containing an encoded form * Build a {@link BytesRef} wrapping a byte array containing an encoded form
* of the passed XContentBuilder contents. * of the passed XContentBuilder contents.
*/ */
static BytesRef encodeXContentBuilder(XContentBuilder builder) throws IOException { public static BytesRef encodeXContentBuilder(XContentBuilder builder) throws IOException {
return new BytesRef(TypeUtils.encode(builder)); return new BytesRef(TypeUtils.encode(builder));
} }

View file

@ -8,6 +8,7 @@
package org.elasticsearch.common.xcontent.support; package org.elasticsearch.common.xcontent.support;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.compress.CompressedXContent;
@ -420,4 +421,25 @@ public class XContentHelperTests extends ESTestCase {
assertThat(names, equalTo(Set.of("a", "c"))); assertThat(names, equalTo(Set.of("a", "c")));
} }
public void testDrainAndClose() throws IOException {
String json = """
{ "a": "b", "c": "d", "e": {"f": "g"}, "h": ["i", "j", {"k": "l"}]}""";
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, json);
var content = XContentBuilder.builder(XContentType.JSON.xContent());
XContentHelper.drainAndClose(parser, content);
assertEquals(json.replace(" ", ""), Strings.toString(content));
assertTrue(parser.isClosed());
}
public void testDrainAndCloseAlreadyClosed() throws IOException {
var parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, "{}");
parser.close();
assertThrows(
IllegalStateException.class,
() -> XContentHelper.drainAndClose(parser, XContentBuilder.builder(XContentType.JSON.xContent()))
);
}
} }

View file

@ -1115,7 +1115,7 @@ public abstract class MapperTestCase extends MapperServiceTestCase {
assertSyntheticSource(syntheticSourceSupport(ignoreMalformed).example(5)); assertSyntheticSource(syntheticSourceSupport(ignoreMalformed).example(5));
} }
public final void testSyntheticSourceIgnoreMalformedExamples() throws IOException { public void testSyntheticSourceIgnoreMalformedExamples() throws IOException {
assumeTrue("type doesn't support ignore_malformed", supportsIgnoreMalformed()); assumeTrue("type doesn't support ignore_malformed", supportsIgnoreMalformed());
CheckedConsumer<XContentBuilder, IOException> mapping = syntheticSourceSupport(true).example(1).mapping(); CheckedConsumer<XContentBuilder, IOException> mapping = syntheticSourceSupport(true).example(1).mapping();
for (ExampleMalformedValue v : exampleMalformedValues()) { for (ExampleMalformedValue v : exampleMalformedValues()) {

View file

@ -19,6 +19,7 @@ import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.time.DateMathParser; import org.elasticsearch.common.time.DateMathParser;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.fielddata.FieldDataContext; import org.elasticsearch.index.fielddata.FieldDataContext;
@ -29,6 +30,7 @@ import org.elasticsearch.index.fielddata.SortedBinaryDocValues;
import org.elasticsearch.index.fielddata.SortedNumericDoubleValues; import org.elasticsearch.index.fielddata.SortedNumericDoubleValues;
import org.elasticsearch.index.mapper.DocumentParserContext; import org.elasticsearch.index.mapper.DocumentParserContext;
import org.elasticsearch.index.mapper.FieldMapper; import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperBuilderContext; import org.elasticsearch.index.mapper.MapperBuilderContext;
@ -41,6 +43,7 @@ import org.elasticsearch.index.mapper.TextSearchInfo;
import org.elasticsearch.index.mapper.TimeSeriesParams; import org.elasticsearch.index.mapper.TimeSeriesParams;
import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType; import org.elasticsearch.index.mapper.TimeSeriesParams.MetricType;
import org.elasticsearch.index.mapper.ValueFetcher; import org.elasticsearch.index.mapper.ValueFetcher;
import org.elasticsearch.index.mapper.XContentDataHelper;
import org.elasticsearch.index.query.QueryRewriteContext; import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.SearchExecutionContext; import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.script.ScriptCompiler; import org.elasticsearch.script.ScriptCompiler;
@ -587,6 +590,12 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper {
XContentParser.Token token; XContentParser.Token token;
XContentSubParser subParser = null; XContentSubParser subParser = null;
EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class); EnumMap<Metric, Number> metricsParsed = new EnumMap<>(Metric.class);
// Preserves the content of the field in order to be able to construct synthetic source
// if field value is malformed.
XContentBuilder malformedContentForSyntheticSource = context.mappingLookup().isSourceSynthetic() && ignoreMalformed
? XContentBuilder.builder(context.parser().contentType().xContent())
: null;
try { try {
token = context.parser().currentToken(); token = context.parser().currentToken();
if (token == XContentParser.Token.VALUE_NULL) { if (token == XContentParser.Token.VALUE_NULL) {
@ -596,6 +605,9 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper {
ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser()); ensureExpectedToken(XContentParser.Token.START_OBJECT, token, context.parser());
subParser = new XContentSubParser(context.parser()); subParser = new XContentSubParser(context.parser());
token = subParser.nextToken(); token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.startObject();
}
while (token != XContentParser.Token.END_OBJECT) { while (token != XContentParser.Token.END_OBJECT) {
// should be an object sub-field with name a metric name // should be an object sub-field with name a metric name
ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser); ensureExpectedToken(XContentParser.Token.FIELD_NAME, token, subParser);
@ -609,13 +621,20 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper {
} }
token = subParser.nextToken(); token = subParser.nextToken();
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.field(fieldName);
}
// Make sure that the value is a number. Probably this will change when // Make sure that the value is a number. Probably this will change when
// new aggregate metric types are added (histogram, cardinality etc) // new aggregate metric types are added (histogram, cardinality etc)
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser); ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, subParser);
NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(metric); NumberFieldMapper delegateFieldMapper = metricFieldMappers.get(metric);
// Delegate parsing the field to a numeric field mapper // Delegate parsing the field to a numeric field mapper
try { try {
metricsParsed.put(metric, delegateFieldMapper.value(context.parser())); Number metricValue = delegateFieldMapper.value(context.parser());
metricsParsed.put(metric, metricValue);
if (malformedContentForSyntheticSource != null) {
malformedContentForSyntheticSource.value(metricValue);
}
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e); throw new IllegalArgumentException("failed to parse [" + metric.name() + "] sub field: " + e.getMessage(), e);
} }
@ -658,10 +677,26 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper {
} }
} catch (Exception e) { } catch (Exception e) {
if (ignoreMalformed) { if (ignoreMalformed) {
if (subParser != null) { if (malformedContentForSyntheticSource != null) {
// close the subParser so we advance to the end of the object if (subParser != null) {
// Remaining data in parser needs to be stored as is in order to provide it in synthetic source.
XContentHelper.drainAndClose(subParser, malformedContentForSyntheticSource);
} else {
// We don't use DrainingXContentParser since we don't want to go beyond current field
malformedContentForSyntheticSource.copyCurrentStructure(context.parser());
}
;
var nameValue = IgnoredSourceFieldMapper.NameValue.fromContext(
context,
name(),
XContentDataHelper.encodeXContentBuilder(malformedContentForSyntheticSource)
);
context.addIgnoredField(nameValue);
} else if (subParser != null) {
// close the subParser, so we advance to the end of the object
subParser.close(); subParser.close();
} }
context.addIgnoredField(name()); context.addIgnoredField(name());
context.path().remove(); context.path().remove();
return; return;
@ -689,11 +724,7 @@ public class AggregateDoubleMetricFieldMapper extends FieldMapper {
@Override @Override
public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() { public SourceLoader.SyntheticFieldLoader syntheticFieldLoader() {
if (ignoreMalformed) { // Note that malformed values are handled via `IgnoredSourceFieldMapper` infrastructure
throw new IllegalArgumentException(
"field [" + name() + "] of type [" + typeName() + "] doesn't support synthetic source because it ignores malformed numbers"
);
}
return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics); return new AggregateMetricSyntheticFieldLoader(name(), simpleName(), metrics);
} }

View file

@ -18,6 +18,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MapperTestCase; import org.elasticsearch.index.mapper.MapperTestCase;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory; import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin; import org.elasticsearch.xpack.aggregatemetric.AggregateMetricMapperPlugin;
@ -33,11 +34,12 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.Supplier;
import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Names.IGNORE_MALFORMED;
import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Names.METRICS; import static org.elasticsearch.xpack.aggregatemetric.mapper.AggregateDoubleMetricFieldMapper.Names.METRICS;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.hamcrest.core.IsInstanceOf.instanceOf;
@ -467,8 +469,12 @@ public class AggregateDoubleMetricFieldMapperTests extends MapperTestCase {
@Override @Override
protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed) { protected SyntheticSourceSupport syntheticSourceSupport(boolean ignoreMalformed) {
assumeFalse("synthetic _source support for aggregate_double_metric doesn't support ignore_malformed", ignoreMalformed); return new AggregateDoubleMetricSyntheticSourceSupport(ignoreMalformed);
return new AggregateDoubleMetricSyntheticSourceSupport(); }
@Override
public void testSyntheticSourceIgnoreMalformedExamples() {
assumeTrue("Scenarios are covered in scope of syntheticSourceSupport", false);
} }
@Override @Override
@ -478,16 +484,94 @@ public class AggregateDoubleMetricFieldMapperTests extends MapperTestCase {
protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport { protected final class AggregateDoubleMetricSyntheticSourceSupport implements SyntheticSourceSupport {
private final EnumSet<Metric> storedMetrics = EnumSet.copyOf(randomNonEmptySubsetOf(Arrays.asList(Metric.values()))); private final boolean malformedExample;
private final EnumSet<Metric> storedMetrics;
public AggregateDoubleMetricSyntheticSourceSupport(boolean malformedExample) {
this.malformedExample = malformedExample;
this.storedMetrics = EnumSet.copyOf(randomNonEmptySubsetOf(Arrays.asList(Metric.values())));
}
@Override @Override
public SyntheticSourceExample example(int maxVals) { public SyntheticSourceExample example(int maxVals) {
// aggregate_metric_double field does not support arrays // aggregate_metric_double field does not support arrays
Map<String, Object> value = randomAggregateMetric(); Object value = randomAggregateMetric();
return new SyntheticSourceExample(value, value, this::mapping); return new SyntheticSourceExample(value, value, this::mapping);
} }
private Map<String, Object> randomAggregateMetric() { private Object randomAggregateMetric() {
if (malformedExample && randomBoolean()) {
return malformedValue();
}
return validMetrics();
}
private Object malformedValue() {
List<Supplier<Object>> choices = List.of(
() -> randomAlphaOfLength(3),
ESTestCase::randomInt,
ESTestCase::randomLong,
ESTestCase::randomFloat,
ESTestCase::randomDouble,
ESTestCase::randomBoolean,
// no metrics
Map::of,
// unmapped metric
() -> {
var metrics = validMetrics();
metrics.put("hello", "world");
return metrics;
},
// missing metric
() -> {
var metrics = validMetrics();
metrics.remove(storedMetrics.stream().findFirst().get().name());
return metrics;
},
// invalid metric value
() -> {
var metrics = validMetrics();
metrics.put(storedMetrics.stream().findFirst().get().name(), "boom");
return metrics;
},
// metric is an object
() -> {
var metrics = validMetrics();
metrics.put(storedMetrics.stream().findFirst().get().name(), Map.of("hello", "world"));
return metrics;
},
// invalid metric value with additional data
() -> {
var metrics = validMetrics();
metrics.put(storedMetrics.stream().findFirst().get().name(), "boom");
metrics.put("hello", "world");
metrics.put("object", Map.of("hello", "world"));
metrics.put("list", List.of("hello", "world"));
return metrics;
},
// negative value count
() -> {
var metrics = validMetrics();
if (storedMetrics.contains(Metric.value_count.name())) {
metrics.put(Metric.value_count.name(), -100);
}
return metrics;
},
// value count with decimal digits (whole numbers formatted as doubles are permitted, but non-whole numbers are not)
() -> {
var metrics = validMetrics();
if (storedMetrics.contains(Metric.value_count.name())) {
metrics.put(Metric.value_count.name(), 10.5);
}
return metrics;
}
);
return randomFrom(choices).get();
}
private Map<String, Object> validMetrics() {
Map<String, Object> value = new LinkedHashMap<>(storedMetrics.size()); Map<String, Object> value = new LinkedHashMap<>(storedMetrics.size());
for (Metric m : storedMetrics) { for (Metric m : storedMetrics) {
if (Metric.value_count == m) { if (Metric.value_count == m) {
@ -506,19 +590,14 @@ public class AggregateDoubleMetricFieldMapperTests extends MapperTestCase {
private void mapping(XContentBuilder b) throws IOException { private void mapping(XContentBuilder b) throws IOException {
String[] metrics = storedMetrics.stream().map(Metric::toString).toArray(String[]::new); String[] metrics = storedMetrics.stream().map(Metric::toString).toArray(String[]::new);
b.field("type", CONTENT_TYPE).array(METRICS_FIELD, metrics).field(DEFAULT_METRIC, metrics[0]); b.field("type", CONTENT_TYPE).array(METRICS_FIELD, metrics).field(DEFAULT_METRIC, metrics[0]);
if (malformedExample) {
b.field(IGNORE_MALFORMED, true);
}
} }
@Override @Override
public List<SyntheticSourceInvalidExample> invalidExample() throws IOException { public List<SyntheticSourceInvalidExample> invalidExample() throws IOException {
return List.of( return List.of();
new SyntheticSourceInvalidExample(
matchesPattern("field \\[field] of type \\[.+] doesn't support synthetic source because it ignores malformed numbers"),
b -> {
mapping(b);
b.field("ignore_malformed", true);
}
)
);
} }
} }

View file

@ -1,4 +1,4 @@
constant_keyword: aggregate_metric_double:
- requires: - requires:
cluster_features: ["gte_v8.5.0"] cluster_features: ["gte_v8.5.0"]
reason: synthetic source support added in 8.5.0 reason: synthetic source support added in 8.5.0
@ -51,3 +51,54 @@ constant_keyword:
min: 18.2 min: 18.2
max: 100.0 max: 100.0
value_count: 50 value_count: 50
---
aggregate_metric_double with ignore_malformed:
- requires:
cluster_features: ["mapper.track_ignored_source"]
reason: requires tracking ignored source
- do:
indices.create:
index: test
body:
mappings:
_source:
mode: synthetic
properties:
metric:
type: aggregate_metric_double
metrics: [min, max, value_count]
default_metric: max
ignore_malformed: true
- do:
index:
index: test
id: "1"
refresh: true
body:
metric:
min: 18.2
max: 100
field: "field"
sub:
array: [1, 2, 3]
field: "field"
value_count: 50
- do:
search:
index: test
- match:
hits.hits.0._source:
metric:
min: 18.2
max: 100
field: "field"
sub:
array: [1, 2, 3]
field: "field"
value_count: 50