[ML] Add runtime mappings to data frame analytics source config (#69183)

Users can now specify runtime mappings as part of the source config
of a data frame analytics job. Those runtime mappings become part of
the mapping of the destination index. This ensures the fields are
accessible in the destination index even if the relevant data frame
analytics job gets deleted.

Closes #65056
This commit is contained in:
Dimitris Athanasiou 2021-02-19 16:29:19 +02:00 committed by GitHub
parent 623f547c9f
commit 7fb98c0d3c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 665 additions and 119 deletions

View file

@ -15,11 +15,13 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class DataFrameAnalyticsSource implements ToXContentObject {
@ -45,16 +47,20 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
PARSER.declareObject(Builder::setRuntimeMappings, (p, c) -> p.map(), SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
}
private final String[] index;
private final QueryConfig queryConfig;
private final FetchSourceContext sourceFiltering;
private final Map<String, Object> runtimeMappings;
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering,
@Nullable Map<String, Object> runtimeMappings) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
this.sourceFiltering = sourceFiltering;
this.runtimeMappings = runtimeMappings;
}
public String[] getIndex() {
@ -69,6 +75,10 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return sourceFiltering;
}
public Map<String, Object> getRuntimeMappings() {
return runtimeMappings;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -79,6 +89,9 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
if (runtimeMappings != null) {
builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings);
}
builder.endObject();
return builder;
}
@ -91,12 +104,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
&& Objects.equals(sourceFiltering, other.sourceFiltering)
&& Objects.equals(runtimeMappings, other.runtimeMappings);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering);
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering, runtimeMappings);
}
@Override
@ -109,6 +123,7 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private String[] index;
private QueryConfig queryConfig;
private FetchSourceContext sourceFiltering;
private Map<String, Object> runtimeMappings;
private Builder() {}
@ -132,8 +147,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return this;
}
public Builder setRuntimeMappings(Map<String, Object> runtimeMappings) {
this.runtimeMappings = runtimeMappings;
return this;
}
public DataFrameAnalyticsSource build() {
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering);
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering, runtimeMappings);
}
}
}

View file

@ -2995,13 +2995,16 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
QueryConfig queryConfig = new QueryConfig(new MatchAllQueryBuilder());
// end::put-data-frame-analytics-query-config
Map<String, Object> runtimeMappings = Collections.emptyMap();
// tag::put-data-frame-analytics-source-config
DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1>
.setIndex("put-test-source-index") // <2>
.setQueryConfig(queryConfig) // <3>
.setRuntimeMappings(runtimeMappings) // <4>
.setSourceFiltering(new FetchSourceContext(true,
new String[] { "included_field_1", "included_field_2" },
new String[] { "excluded_field" })) // <4>
new String[] { "excluded_field" })) // <5>
.build();
// end::put-data-frame-analytics-source-config

View file

@ -16,6 +16,8 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Predicate;
import static java.util.Collections.emptyList;
@ -31,11 +33,19 @@ public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase<Data
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
Map<String, Object> runtimeMappings = null;
if (randomBoolean()) {
runtimeMappings = new HashMap<>();
Map<String, Object> runtimeField = new HashMap<>();
runtimeField.put("type", "keyword");
runtimeField.put("script", "");
runtimeMappings.put(randomAlphaOfLength(10), runtimeField);
}
return DataFrameAnalyticsSource.builder()
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.setSourceFiltering(sourceFiltering)
.setRuntimeMappings(runtimeMappings)
.build();
}

View file

@ -55,7 +55,8 @@ include-tagged::{doc-tests-file}[{api}-source-config]
<1> Constructing a new DataFrameAnalyticsSource
<2> The source index
<3> The query from which to gather the data. If query is not set, a `match_all` query is used by default.
<4> Source filtering to select which fields will exist in the destination index.
<4> Runtime mappings that will be added to the destination index mapping.
<5> Source filtering to select which fields will exist in the destination index.
===== QueryConfig

View file

@ -523,7 +523,7 @@ setting, an error occurs when you try to create {dfanalytics-jobs} that have
`source`::
(object)
The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
Optionally, `query`, `runtime_mappings`, and `_source` may be specified.
+
.Properties of `source`
[%collapsible%open]
@ -543,6 +543,10 @@ options that are supported by {es} can be used, as this object is passed
verbatim to {es}. By default, this property has the following value:
`{"match_all": {}}`.
`runtime_mappings`:::
(Optional, object) Definitions of runtime fields that will become part of the
mapping of the destination index.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select which
fields will be present in the destination. Fields that are excluded cannot be

View file

@ -31,6 +31,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* A search action request builder.
@ -592,4 +593,12 @@ public class SearchRequestBuilder extends ActionRequestBuilder<SearchRequest, Se
this.request.setPreFilterShardSize(preFilterShardSize);
return this;
}
/**
* Set runtime mappings to create runtime fields that exist only as part of this particular search.
*/
public SearchRequestBuilder setRuntimeMappings(Map<String, Object> runtimeMappings) {
sourceBuilder().runtimeMappings(runtimeMappings);
return this;
}
}

View file

@ -36,6 +36,7 @@ import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.MlStrings;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
@ -825,7 +826,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
validateScriptFields();
validateRuntimeMappings();
RuntimeMappingsValidator.validate(runtimeMappings);
setDefaultChunkingConfig();
setDefaultQueryDelay();
@ -846,28 +847,6 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
}
}
/**
* Perform a light check that the structure resembles runtime_mappings.
* The full check cannot happen until search
*/
void validateRuntimeMappings() {
for (Map.Entry<String, Object> entry : runtimeMappings.entrySet()) {
// top level objects are fields
String fieldName = entry.getKey();
if (entry.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> propNode = new HashMap<>(((Map<String, Object>) entry.getValue()));
Object typeNode = propNode.get("type");
if (typeNode == null) {
throw ExceptionsHelper.badRequestException("No type specified for runtime field [" + fieldName + "]");
}
} else {
throw ExceptionsHelper.badRequestException("Expected map for runtime field [" + fieldName + "] " +
"definition but got a " + fieldName.getClass().getSimpleName());
}
}
}
private static void checkNoMoreHistogramAggregations(Collection<AggregationBuilder> aggregations) {
for (AggregationBuilder agg : aggregations) {
if (ExtractorUtils.isHistogram(agg)) {

View file

@ -21,15 +21,19 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import org.elasticsearch.xpack.core.ml.utils.RuntimeMappingsValidator;
import org.elasticsearch.xpack.core.ml.utils.XContentObjectTransformer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -45,7 +49,8 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(
((List<String>) a[0]).toArray(new String[0]),
(QueryProvider) a[1],
(FetchSourceContext) a[2]));
(FetchSourceContext) a[2],
(Map<String, Object>) a[3]));
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY);
@ -53,14 +58,18 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(),
SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD);
return parser;
}
private final String[] index;
private final QueryProvider queryProvider;
private final FetchSourceContext sourceFiltering;
private final Map<String, Object> runtimeMappings;
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) {
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering,
@Nullable Map<String, Object> runtimeMappings) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX);
if (index.length == 0) {
throw new IllegalArgumentException("source.index must specify at least one index");
@ -73,6 +82,8 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
throw new IllegalArgumentException("source._source cannot be disabled");
}
this.sourceFiltering = sourceFiltering;
this.runtimeMappings = runtimeMappings == null ? Collections.emptyMap() : Collections.unmodifiableMap(runtimeMappings);
RuntimeMappingsValidator.validate(this.runtimeMappings);
}
public DataFrameAnalyticsSource(StreamInput in) throws IOException {
@ -83,6 +94,11 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
} else {
sourceFiltering = null;
}
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
runtimeMappings = in.readMap();
} else {
runtimeMappings = Collections.emptyMap();
}
}
public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) {
@ -90,6 +106,7 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
this.queryProvider = new QueryProvider(other.queryProvider);
this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext(
other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes());
this.runtimeMappings = Collections.unmodifiableMap(new HashMap<>(other.runtimeMappings));
}
@Override
@ -99,6 +116,9 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_7_6_0)) {
out.writeOptionalWriteable(sourceFiltering);
}
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeMap(runtimeMappings);
}
}
@Override
@ -109,6 +129,9 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
if (runtimeMappings.isEmpty() == false) {
builder.field(SearchSourceBuilder.RUNTIME_MAPPINGS_FIELD.getPreferredName(), runtimeMappings);
}
builder.endObject();
return builder;
}
@ -121,12 +144,13 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryProvider, other.queryProvider)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
&& Objects.equals(sourceFiltering, other.sourceFiltering)
&& Objects.equals(runtimeMappings, other.runtimeMappings);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering);
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering, runtimeMappings);
}
public String[] getIndex() {
@ -189,6 +213,10 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
return queryProvider.getQuery();
}
public Map<String, Object> getRuntimeMappings() {
return runtimeMappings;
}
public boolean isFieldExcluded(String path) {
if (sourceFiltering == null) {
return false;

View file

@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.utils;
import java.util.HashMap;
import java.util.Map;
public final class RuntimeMappingsValidator {
/**
* Perform a light check that the structure resembles runtime_mappings.
* The full check cannot happen until search
*/
public static void validate(Map<String, Object> runtimeMappings) {
for (Map.Entry<String, Object> entry : runtimeMappings.entrySet()) {
// top level objects are fields
String fieldName = entry.getKey();
if (entry.getValue() instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> propNode = new HashMap<>(((Map<String, Object>) entry.getValue()));
Object typeNode = propNode.get("type");
if (typeNode == null) {
throw ExceptionsHelper.badRequestException("No type specified for runtime field [{}]", fieldName);
}
} else {
throw ExceptionsHelper.badRequestException(
"Expected map for runtime field [{}] definition but got a {}",
fieldName,
fieldName.getClass().getSimpleName()
);
}
}
}
}

View file

@ -464,6 +464,10 @@
"query" : {
"type" : "object",
"enabled" : false
},
"runtime_mappings" : {
"type" : "object",
"enabled" : false
}
}
},

View file

@ -81,7 +81,7 @@ public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializing
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, new String[] {"excluded"}));
new FetchSourceContext(true, null, new String[] {"excluded"}), null);
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")
@ -99,7 +99,7 @@ public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializing
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, new String[] {"included"}, null));
new FetchSourceContext(true, new String[] {"included"}, null), null);
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")

View file

@ -468,7 +468,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setVersion(Version.CURRENT)
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(regression)
.build();
@ -487,7 +487,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setVersion(Version.V_7_5_0)
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(regression)
.build();
@ -509,7 +509,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
public void testCtor_GivenMaxNumThreadsIsZero() {
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(new Regression("foo"))
.setMaxNumThreads(0)
@ -522,7 +522,7 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
public void testCtor_GivenMaxNumThreadsIsNegative() {
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
.setId("test_config")
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", null))
.setAnalysis(new Regression("foo"))
.setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0))

View file

@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
@ -22,7 +23,9 @@ import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -68,12 +71,24 @@ public class DataFrameAnalyticsSourceTests extends AbstractBWCSerializationTestC
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering);
Map<String, Object> runtimeMappings = null;
if (randomBoolean()) {
runtimeMappings = new HashMap<>();
Map<String, Object> runtimeField = new HashMap<>();
runtimeField.put("type", "keyword");
runtimeField.put("script", "");
runtimeMappings.put(randomAlphaOfLength(10), runtimeField);
}
return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering, runtimeMappings);
}
public static DataFrameAnalyticsSource mutateForVersion(DataFrameAnalyticsSource instance, Version version) {
if (version.before(Version.V_7_6_0)) {
return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null);
return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), null, null);
}
if (version.before(Version.V_8_0_0)) {
return new DataFrameAnalyticsSource(instance.getIndex(), instance.getQueryProvider(), instance.getSourceFiltering(), null);
}
return instance;
}
@ -85,18 +100,24 @@ public class DataFrameAnalyticsSourceTests extends AbstractBWCSerializationTestC
public void testConstructor_GivenDisabledSource() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource(
new String[] {"index"}, null, new FetchSourceContext(false, null, null)));
new String[] {"index"}, null, new FetchSourceContext(false, null, null), null));
assertThat(e.getMessage(), equalTo("source._source cannot be disabled"));
}
public void testConstructor_GivenInvalidRuntimeMappings() {
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsSource(
new String[] {"index"}, null, null, Collections.singletonMap("typeless", Collections.singletonMap("not a type", "42"))));
assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]"));
}
public void testIsFieldExcluded_GivenNoSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null);
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null, null);
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
public void testIsFieldExcluded_GivenSourceFilteringWithNulls() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null,
new FetchSourceContext(true, null, null));
new FetchSourceContext(true, null, null), null);
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
@ -139,7 +160,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractBWCSerializationTestC
private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List<String> includes, List<String> excludes) {
FetchSourceContext sourceFiltering = new FetchSourceContext(true,
includes.toArray(new String[0]), excludes.toArray(new String[0]));
return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering);
return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering, null);
}
@Override

View file

@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.ml.utils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.util.Collections;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
public class RuntimeMappingsValidatorTests extends ESTestCase {
public void testValidate_GivenFieldWhoseValueIsNotMap() {
Map<String, Object> runtimeMappings = Collections.singletonMap("mapless", "not a map");
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Expected map for runtime field [mapless] definition but got a String"));
}
public void testValidate_GivenFieldWithoutType() {
Map<String, Object> fieldMapping = Collections.singletonMap("not a type", "42");
Map<String, Object> runtimeMappings = Collections.singletonMap("typeless", fieldMapping);
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> RuntimeMappingsValidator.validate(runtimeMappings));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("No type specified for runtime field [typeless]"));
}
public void testValidate_GivenValid() {
Map<String, Object> fieldMapping = Collections.singletonMap("type", "keyword");
Map<String, Object> runtimeMappings = Collections.singletonMap("valid_field", fieldMapping);
RuntimeMappingsValidator.validate(runtimeMappings);
}
}

View file

@ -28,15 +28,19 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.action.EvaluateDataFrameAction;
import org.elasticsearch.xpack.core.ml.action.GetDataFrameAnalyticsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetTrainedModelsAction;
import org.elasticsearch.xpack.core.ml.action.NodeAcknowledgedResponse;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigUpdate;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.Accuracy;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.AucRoc;
import org.elasticsearch.xpack.core.ml.dataframe.evaluation.classification.MulticlassConfusionMatrix;
@ -47,7 +51,6 @@ import org.elasticsearch.xpack.core.ml.inference.MlInferenceNamedXContentProvide
import org.elasticsearch.xpack.core.ml.inference.TrainedModelConfig;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.OneHotEncoding;
import org.elasticsearch.xpack.core.ml.inference.preprocessing.PreProcessor;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.junit.After;
import org.junit.Before;
@ -55,6 +58,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -62,12 +66,12 @@ import java.util.Set;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.xpack.core.ml.MlTasks.AWAITING_UPGRADE;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
@ -810,6 +814,85 @@ public class ClassificationIT extends MlNativeDataFrameAnalyticsIntegTestCase {
waitUntilAnalyticsIsStopped(jobId);
}
public void testWithSearchRuntimeMappings() throws Exception {
initialize("classification_with_search_runtime_mappings");
indexData(sourceIndex, 300, 50, KEYWORD_FIELD);
String numericRuntimeField = NUMERICAL_FIELD + "_runtime";
String dependentVariableRuntimeField = KEYWORD_FIELD + "_runtime";
String predictedClassField = dependentVariableRuntimeField + "_prediction";
Map<String, Object> numericRuntimeFieldMapping = new HashMap<>();
numericRuntimeFieldMapping.put("type", "double");
numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FIELD + "'].value)");
Map<String, Object> dependentVariableRuntimeFieldMapping = new HashMap<>();
dependentVariableRuntimeFieldMapping.put("type", "keyword");
dependentVariableRuntimeFieldMapping.put("script",
"if (doc['" + KEYWORD_FIELD + "'].size() > 0) { emit(doc['" + KEYWORD_FIELD + "'].value); }");
Map<String, Object> runtimeFields = new HashMap<>();
runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping);
runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(jobId)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null))
.setAnalysis(new Classification(
dependentVariableRuntimeField,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
predictedClassField,
null,
null,
null,
null,
null,
null))
.build();
putAnalytics(config);
assertIsStopped(jobId);
assertProgressIsZero(jobId);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
client().admin().indices().refresh(new RefreshRequest(destIndex));
SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get();
for (SearchHit hit : destData.getHits()) {
Map<String, Object> destDoc = hit.getSourceAsMap();
Map<String, Object> resultsObject = getFieldValue(destDoc, "ml");
assertThat(getFieldValue(resultsObject, predictedClassField), is(in(KEYWORD_FIELD_VALUES)));
assertThat(getFieldValue(resultsObject, "is_training"), is(destDoc.containsKey(KEYWORD_FIELD)));
assertTopClasses(resultsObject, 2, dependentVariableRuntimeField, KEYWORD_FIELD_VALUES);
@SuppressWarnings("unchecked")
List<Map<String, Object>> importanceArray = (List<Map<String, Object>>)resultsObject.get("feature_importance");
assertThat(importanceArray, hasSize(1));
assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField));
}
assertProgressComplete(jobId);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertExactlyOneInferenceModelPersisted(jobId);
assertMlResultsFieldMappings(destIndex, predictedClassField, "keyword");
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [classification]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
expectedDestIndexAuditMessage(),
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
assertEvaluation(KEYWORD_FIELD, KEYWORD_FIELD_VALUES, "ml." + predictedClassField);
}
private static <T> T getOnlyElement(List<T> list) {
assertThat(list, hasSize(1));
return list.get(0);

View file

@ -110,7 +110,7 @@ public class DataFrameAnalysisCustomFeatureIT extends MlNativeDataFrameAnalytics
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(jobId)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null))
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()), null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(new Regression(NUMERICAL_FIELD,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(6).build(),

View file

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -17,25 +18,33 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.xpack.core.ml.action.ExplainDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.BoostedTreeParams;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Classification;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.Regression;
import org.elasticsearch.xpack.core.ml.dataframe.explain.FieldSelection;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTestCase {
public void testExplain_GivenMissingSourceIndex() {
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"missing_index"}, null, null, Collections.emptyMap()))
.setAnalysis(new OutlierDetection.Builder().build())
.buildForExplain();
@ -81,7 +90,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("filtered_field", "bingo")),
null))
null,
Collections.emptyMap()))
.setAnalysis(new Classification("categorical"))
.buildForExplain();
@ -98,7 +108,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
.setId("dfa-training-100-" + sourceIndex)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
null))
null,
Collections.emptyMap()))
.setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD,
BoostedTreeParams.builder().build(),
null,
@ -118,7 +129,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
.setId("dfa-training-50-" + sourceIndex)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
null))
null,
Collections.emptyMap()))
.setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD,
BoostedTreeParams.builder().build(),
null,
@ -147,7 +159,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
.setId("dfa-simultaneous-explain-" + sourceIndex)
.setSource(new DataFrameAnalyticsSource(new String[]{sourceIndex},
QueryProvider.fromParsedQuery(QueryBuilders.matchAllQuery()),
null))
null,
Collections.emptyMap()))
.setAnalysis(new Regression(RegressionIT.DEPENDENT_VARIABLE_FIELD,
BoostedTreeParams.builder().build(),
null,
@ -181,6 +194,57 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
}
}
public void testRuntimeFields() {
String sourceIndex = "test-explain-runtime-fields";
String mapping = "{\n" +
" \"properties\": {\n" +
" \"mapped_field\": {\n" +
" \"type\": \"double\"\n" +
" }\n" +
" },\n" +
" \"runtime\": {\n" +
" \"mapped_runtime_field\": {\n" +
" \"type\": \"double\"\n," +
" \"script\": \"emit(doc['mapped_field'].value + 10.0)\"\n" +
" }\n" +
" }\n" +
" }";
client().admin().indices().prepareCreate(sourceIndex)
.setMapping(mapping)
.get();
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk()
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10; i++) {
Object[] source = new Object[] {"mapped_field", i};
IndexRequest indexRequest = new IndexRequest(sourceIndex).source(source).opType(DocWriteRequest.OpType.CREATE);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
Map<String, Object> configRuntimeField = new HashMap<>();
configRuntimeField.put("type", "double");
configRuntimeField.put("script", "emit(doc['mapped_field'].value + 20.0)");
Map<String, Object> configRuntimeFields = Collections.singletonMap("config_runtime_field", configRuntimeField);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(sourceIndex + "-job")
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, configRuntimeFields))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
ExplainDataFrameAnalyticsAction.Response explainResponse = explainDataFrame(config);
List<FieldSelection> fieldSelection = explainResponse.getFieldSelection();
assertThat(fieldSelection.size(), equalTo(3));
assertThat(fieldSelection.stream().map(FieldSelection::getName).collect(Collectors.toList()),
contains("config_runtime_field", "mapped_field", "mapped_runtime_field"));
assertThat(fieldSelection.stream().map(FieldSelection::isIncluded).allMatch(isIncluded -> isIncluded), is(true));
}
@Override
boolean supportsInference() {
return false;

View file

@ -200,7 +200,8 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
QueryBuilder queryBuilder) throws Exception {
return new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null))
.setSource(new DataFrameAnalyticsSource(
new String[] { sourceIndex }, QueryProvider.fromParsedQuery(queryBuilder), null, Collections.emptyMap()))
.setDest(new DataFrameAnalyticsDest(destIndex, resultsField))
.setAnalysis(analysis)
.build();

View file

@ -42,6 +42,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -50,6 +51,8 @@ import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
@ -572,7 +575,7 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(jobId)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, Collections.emptyMap()))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(regression)
.setAnalyzedFields(new FetchSourceContext(true, null, new String[] {"field_1"}))
@ -693,6 +696,83 @@ public class RegressionIT extends MlNativeDataFrameAnalyticsIntegTestCase {
}
}
public void testWithSearchRuntimeMappings() throws Exception {
initialize("regression_with_search_runtime_mappings");
indexData(sourceIndex, 300, 50);
String numericRuntimeField = NUMERICAL_FEATURE_FIELD + "_runtime";
String dependentVariableRuntimeField = DEPENDENT_VARIABLE_FIELD + "_runtime";
String predictedClassField = dependentVariableRuntimeField + "_prediction";
Map<String, Object> numericRuntimeFieldMapping = new HashMap<>();
numericRuntimeFieldMapping.put("type", "double");
numericRuntimeFieldMapping.put("script", "emit(doc['" + NUMERICAL_FEATURE_FIELD + "'].value)");
Map<String, Object> dependentVariableRuntimeFieldMapping = new HashMap<>();
dependentVariableRuntimeFieldMapping.put("type", "double");
dependentVariableRuntimeFieldMapping.put("script",
"if (doc['" + DEPENDENT_VARIABLE_FIELD + "'].size() > 0) { emit(doc['" + DEPENDENT_VARIABLE_FIELD + "'].value); }");
Map<String, Object> runtimeFields = new HashMap<>();
runtimeFields.put(numericRuntimeField, numericRuntimeFieldMapping);
runtimeFields.put(dependentVariableRuntimeField, dependentVariableRuntimeFieldMapping);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(jobId)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeFields))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalyzedFields(new FetchSourceContext(true, new String[] { numericRuntimeField, dependentVariableRuntimeField }, null))
.setAnalysis(new Regression(
dependentVariableRuntimeField,
BoostedTreeParams.builder().setNumTopFeatureImportanceValues(1).build(),
null,
null,
null,
null,
null,
null,
null))
.build();
putAnalytics(config);
assertIsStopped(jobId);
assertProgressIsZero(jobId);
startAnalytics(jobId);
waitUntilAnalyticsIsStopped(jobId);
SearchResponse destData = client().prepareSearch(destIndex).setTrackTotalHits(true).setSize(1000).get();
for (SearchHit hit : destData.getHits()) {
Map<String, Object> destDoc = hit.getSourceAsMap();
Map<String, Object> resultsObject = getMlResultsObjectFromDestDoc(destDoc);
assertThat(resultsObject.containsKey(predictedClassField), is(true));
assertThat(resultsObject.containsKey("is_training"), is(true));
assertThat(resultsObject.get("is_training"), is(destDoc.containsKey(DEPENDENT_VARIABLE_FIELD)));
@SuppressWarnings("unchecked")
List<Map<String, Object>> importanceArray = (List<Map<String, Object>>)resultsObject.get("feature_importance");
assertThat(importanceArray, hasSize(1));
assertThat(importanceArray.get(0), hasEntry("feature_name", numericRuntimeField));
}
assertProgressComplete(jobId);
assertThat(searchStoredProgress(jobId).getHits().getTotalHits().value, equalTo(1L));
assertModelStatePersisted(stateDocId());
assertExactlyOneInferenceModelPersisted(jobId);
assertMlResultsFieldMappings(destIndex, predictedClassField, "double");
assertThatAuditMessagesMatch(jobId,
"Created analytics with analysis type [regression]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + destIndex + "]",
"Started reindexing to destination index [" + destIndex + "]",
"Finished reindexing to destination index [" + destIndex + "]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}
private void initialize(String jobId) {
this.jobId = jobId;
this.sourceIndex = jobId + "_source_index";

View file

@ -10,8 +10,6 @@ import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -29,15 +27,15 @@ import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsDest;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetection;
import org.elasticsearch.xpack.core.ml.utils.PhaseProgress;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.emptyString;
import static org.hamcrest.Matchers.equalTo;
@ -391,7 +389,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_multiple_source_indices";
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(sourceIndex, null, null))
.setSource(new DataFrameAnalyticsSource(sourceIndex, null, null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
@ -513,7 +511,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = ByteSizeValue.ofMb(1);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)
@ -550,7 +548,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = ByteSizeValue.ofTb(1);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)
@ -834,6 +832,112 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
"Finished analysis");
}
public void testOutlierDetection_GivenSearchRuntimeMappings() throws Exception {
String sourceIndex = "test-outlier-detection-index-with-search-runtime-fields";
String mappings = "{\"enabled\": false}";
client().admin().indices().prepareCreate(sourceIndex)
.setMapping(mappings)
.get();
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 5; i++) {
IndexRequest indexRequest = new IndexRequest(sourceIndex);
// We insert one odd value out of 5 for one feature
String docId = i == 0 ? "outlier" : "normal" + i;
indexRequest.id(docId);
indexRequest.source("numeric", i == 0 ? 100.0 : 1.0);
bulkRequestBuilder.add(indexRequest);
}
BulkResponse bulkResponse = bulkRequestBuilder.get();
if (bulkResponse.hasFailures()) {
fail("Failed to index data: " + bulkResponse.buildFailureMessage());
}
String id = "test_outlier_detection_index_with_search_runtime_fields";
Map<String, Object> runtimeMappings = new HashMap<>();
Map<String, Object> numericFieldRuntimeMapping = new HashMap<>();
numericFieldRuntimeMapping.put("type", "double");
numericFieldRuntimeMapping.put("script", "emit(params._source.numeric)");
runtimeMappings.put("runtime_numeric", numericFieldRuntimeMapping);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null, runtimeMappings))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
putAnalytics(config);
assertIsStopped(id);
assertProgressIsZero(id);
startAnalytics(id);
waitUntilAnalyticsIsStopped(id);
GetDataFrameAnalyticsStatsAction.Response.Stats stats = getAnalyticsStats(id);
assertThat(stats.getDataCounts().getJobId(), equalTo(id));
assertThat(stats.getDataCounts().getTrainingDocsCount(), equalTo(5L));
assertThat(stats.getDataCounts().getTestDocsCount(), equalTo(0L));
assertThat(stats.getDataCounts().getSkippedDocsCount(), equalTo(0L));
SearchResponse sourceData = client().prepareSearch(sourceIndex).get();
double scoreOfOutlier = 0.0;
double scoreOfNonOutlier = -1.0;
for (SearchHit hit : sourceData.getHits()) {
GetResponse destDocGetResponse = client().prepareGet().setIndex(config.getDest().getIndex()).setId(hit.getId()).get();
assertThat(destDocGetResponse.isExists(), is(true));
Map<String, Object> sourceDoc = hit.getSourceAsMap();
Map<String, Object> destDoc = destDocGetResponse.getSource();
for (String field : sourceDoc.keySet()) {
assertThat(destDoc.containsKey(field), is(true));
assertThat(destDoc.get(field), equalTo(sourceDoc.get(field)));
}
assertThat(destDoc.containsKey("ml"), is(true));
@SuppressWarnings("unchecked")
Map<String, Object> resultsObject = (Map<String, Object>) destDoc.get("ml");
assertThat(resultsObject.containsKey("outlier_score"), is(true));
double outlierScore = (double) resultsObject.get("outlier_score");
assertThat(outlierScore, allOf(greaterThanOrEqualTo(0.0), lessThanOrEqualTo(1.0)));
if (hit.getId().equals("outlier")) {
scoreOfOutlier = outlierScore;
@SuppressWarnings("unchecked")
List<Map<String, Object>> featureInfluence = (List<Map<String, Object>>) resultsObject.get("feature_influence");
assertThat(featureInfluence.size(), equalTo(1));
assertThat(featureInfluence.get(0).get("feature_name"), equalTo("runtime_numeric"));
} else {
if (scoreOfNonOutlier < 0) {
scoreOfNonOutlier = outlierScore;
} else {
assertThat(outlierScore, equalTo(scoreOfNonOutlier));
}
}
}
assertThat(scoreOfOutlier, is(greaterThan(scoreOfNonOutlier)));
assertProgressComplete(id);
assertThat(searchStoredProgress(id).getHits().getTotalHits().value, equalTo(1L));
assertThatAuditMessagesMatch(id,
"Created analytics with analysis type [outlier_detection]",
"Estimated memory usage for this analytics to be",
"Starting analytics on node",
"Started analytics",
"Creating destination index [" + sourceIndex + "-results]",
"Started reindexing to destination index [" + sourceIndex + "-results]",
"Finished reindexing to destination index [" + sourceIndex + "-results]",
"Started loading data",
"Started analyzing",
"Started writing results",
"Finished analysis");
}
@Override
boolean supportsInference() {
return false;

View file

@ -71,7 +71,7 @@ public class ChunkedTrainedModelPersisterIT extends MlSingleNodeTestCase {
String modelId = "stored-chunked-model";
DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(modelId)
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("my_dest", null))
.setAnalysis(new Regression("foo"))
.build();

View file

@ -64,7 +64,7 @@ public class UnusedStatsRemoverIT extends BaseMlIntegTestCase {
PutDataFrameAnalyticsAction.Request request = new PutDataFrameAnalyticsAction.Request(new DataFrameAnalyticsConfig.Builder()
.setId("analytics-with-stats")
.setModelMemoryLimit(ByteSizeValue.ofGb(1))
.setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[]{"foo"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("bar", null))
.setAnalysis(new Regression("prediction"))
.build());

View file

@ -72,6 +72,7 @@ public final class DestinationIndex {
private static final String PROPERTIES = "properties";
private static final String META = "_meta";
private static final String RUNTIME = "runtime";
private static final String DFA_CREATOR = "data-frame-analytics";
@ -124,23 +125,7 @@ public final class DestinationIndex {
ActionListener<MappingMetadata> mappingsListener = ActionListener.wrap(
mappings -> {
mappingsHolder.set(mappings);
List<RequiredField> requiredFields = config.getAnalysis().getRequiredFields();
if (requiredFields.isEmpty()) {
fieldCapabilitiesListener.onResponse(null);
return;
}
FieldCapabilitiesRequest fieldCapabilitiesRequest =
new FieldCapabilitiesRequest()
.indices(config.getSource().getIndex())
.fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new));
ClientHelper.executeWithHeadersAsync(
config.getHeaders(),
ML_ORIGIN,
client,
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
fieldCapabilitiesListener);
getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener);
},
listener::onFailure
);
@ -167,6 +152,27 @@ public final class DestinationIndex {
config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE, getSettingsRequest, getSettingsResponseListener);
}
private static void getFieldCapsForRequiredFields(Client client, DataFrameAnalyticsConfig config,
ActionListener<FieldCapabilitiesResponse> listener) {
List<RequiredField> requiredFields = config.getAnalysis().getRequiredFields();
if (requiredFields.isEmpty()) {
listener.onResponse(null);
return;
}
FieldCapabilitiesRequest fieldCapabilitiesRequest =
new FieldCapabilitiesRequest()
.indices(config.getSource().getIndex())
.fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new))
.runtimeFields(config.getSource().getRuntimeMappings());
ClientHelper.executeWithHeadersAsync(
config.getHeaders(),
ML_ORIGIN,
client,
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
listener);
}
private static CreateIndexRequest createIndexRequest(Clock clock,
DataFrameAnalyticsConfig config,
Settings settings,
@ -179,6 +185,8 @@ public final class DestinationIndex {
properties.putAll(createAdditionalMappings(config, fieldCapabilitiesResponse));
Map<String, Object> metadata = getOrPutDefault(mappingsAsMap, META, HashMap::new);
metadata.putAll(createMetadata(config.getId(), clock, Version.CURRENT));
Map<String, Object> runtimeMappings = getOrPutDefault(mappingsAsMap, RUNTIME, HashMap::new);
runtimeMappings.putAll(config.getSource().getRuntimeMappings());
return new CreateIndexRequest(destinationIndex, settings).mapping(mappingsAsMap);
}
@ -258,8 +266,12 @@ public final class DestinationIndex {
ActionListener<FieldCapabilitiesResponse> fieldCapabilitiesListener = ActionListener.wrap(
fieldCapabilitiesResponse -> {
Map<String, Object> addedMappings = new HashMap<>();
// Determine mappings to be added to the destination index
Map<String, Object> addedMappings = Map.of(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse));
addedMappings.put(PROPERTIES, createAdditionalMappings(config, fieldCapabilitiesResponse));
// Also add runtime mappings
addedMappings.put(RUNTIME, config.getSource().getRuntimeMappings());
// Add the mappings to the destination index
PutMappingRequest putMappingRequest =
@ -271,22 +283,7 @@ public final class DestinationIndex {
listener::onFailure
);
List<RequiredField> requiredFields = config.getAnalysis().getRequiredFields();
if (requiredFields.isEmpty()) {
fieldCapabilitiesListener.onResponse(null);
return;
}
FieldCapabilitiesRequest fieldCapabilitiesRequest =
new FieldCapabilitiesRequest()
.indices(config.getSource().getIndex())
.fields(requiredFields.stream().map(RequiredField::getName).toArray(String[]::new));
ClientHelper.executeWithHeadersAsync(
config.getHeaders(),
ML_ORIGIN,
client,
FieldCapabilitiesAction.INSTANCE,
fieldCapabilitiesRequest,
fieldCapabilitiesListener);
getFieldCapsForRequiredFields(client, config, fieldCapabilitiesListener);
}
private static void checkResultsFieldIsNotPresentInProperties(DataFrameAnalyticsConfig config, Map<String, Object> properties) {

View file

@ -176,6 +176,8 @@ public class DataFrameDataExtractor {
searchRequestBuilder.addDocValueField(docValueField.getSearchField(), docValueField.getDocValueFormat());
}
searchRequestBuilder.setRuntimeMappings(context.runtimeMappings);
return searchRequestBuilder;
}
@ -341,7 +343,8 @@ public class DataFrameDataExtractor {
.setIndices(context.indices)
.setSize(0)
.setQuery(summaryQuery)
.setTrackTotalHits(true);
.setTrackTotalHits(true)
.setRuntimeMappings(context.runtimeMappings);
}
private QueryBuilder allExtractedFieldsExistQuery() {

View file

@ -26,9 +26,14 @@ public class DataFrameDataExtractorContext {
final boolean supportsRowsWithMissingValues;
final TrainTestSplitterFactory trainTestSplitterFactory;
// Runtime mappings are necessary while we are still querying the source indices.
// They should be empty when we're querying the destination index as the runtime
// fields should be mapped in the index.
final Map<String, Object> runtimeMappings;
DataFrameDataExtractorContext(String jobId, ExtractedFields extractedFields, List<String> indices, QueryBuilder query, int scrollSize,
Map<String, String> headers, boolean includeSource, boolean supportsRowsWithMissingValues,
TrainTestSplitterFactory trainTestSplitterFactory) {
TrainTestSplitterFactory trainTestSplitterFactory, Map<String, Object> runtimeMappings) {
this.jobId = Objects.requireNonNull(jobId);
this.extractedFields = Objects.requireNonNull(extractedFields);
this.indices = indices.toArray(new String[indices.size()]);
@ -38,5 +43,6 @@ public class DataFrameDataExtractorContext {
this.includeSource = includeSource;
this.supportsRowsWithMissingValues = supportsRowsWithMissingValues;
this.trainTestSplitterFactory = Objects.requireNonNull(trainTestSplitterFactory);
this.runtimeMappings = Objects.requireNonNull(runtimeMappings);
}
}

View file

@ -35,11 +35,12 @@ public class DataFrameDataExtractorFactory {
private final Map<String, String> headers;
private final boolean supportsRowsWithMissingValues;
private final TrainTestSplitterFactory trainTestSplitterFactory;
private final Map<String, Object> runtimeMappings;
private DataFrameDataExtractorFactory(Client client, String analyticsId, List<String> indices, QueryBuilder sourceQuery,
ExtractedFields extractedFields, List<RequiredField> requiredFields, Map<String, String> headers,
boolean supportsRowsWithMissingValues,
TrainTestSplitterFactory trainTestSplitterFactory) {
boolean supportsRowsWithMissingValues, TrainTestSplitterFactory trainTestSplitterFactory,
Map<String, Object> runtimeMappings) {
this.client = Objects.requireNonNull(client);
this.analyticsId = Objects.requireNonNull(analyticsId);
this.indices = Objects.requireNonNull(indices);
@ -49,6 +50,7 @@ public class DataFrameDataExtractorFactory {
this.headers = headers;
this.supportsRowsWithMissingValues = supportsRowsWithMissingValues;
this.trainTestSplitterFactory = Objects.requireNonNull(trainTestSplitterFactory);
this.runtimeMappings = Objects.requireNonNull(runtimeMappings);
}
public DataFrameDataExtractor newExtractor(boolean includeSource) {
@ -61,7 +63,8 @@ public class DataFrameDataExtractorFactory {
headers,
includeSource,
supportsRowsWithMissingValues,
trainTestSplitterFactory
trainTestSplitterFactory,
runtimeMappings
);
return new DataFrameDataExtractor(client, context);
}
@ -90,7 +93,8 @@ public class DataFrameDataExtractorFactory {
ExtractedFields extractedFields) {
return new DataFrameDataExtractorFactory(client, taskId, Arrays.asList(config.getSource().getIndex()),
config.getSource().getParsedQuery(), extractedFields, config.getAnalysis().getRequiredFields(), config.getHeaders(),
config.getAnalysis().supportsMissingValues(), createTrainTestSplitterFactory(client, config, extractedFields));
config.getAnalysis().supportsMissingValues(), createTrainTestSplitterFactory(client, config, extractedFields),
config.getSource().getRuntimeMappings());
}
private static TrainTestSplitterFactory createTrainTestSplitterFactory(Client client, DataFrameAnalyticsConfig config,
@ -124,7 +128,7 @@ public class DataFrameDataExtractorFactory {
DataFrameDataExtractorFactory extractorFactory = new DataFrameDataExtractorFactory(client, config.getId(),
Collections.singletonList(config.getDest().getIndex()), config.getSource().getParsedQuery(), extractedFields,
config.getAnalysis().getRequiredFields(), config.getHeaders(), config.getAnalysis().supportsMissingValues(),
createTrainTestSplitterFactory(client, config, extractedFields));
createTrainTestSplitterFactory(client, config, extractedFields), Collections.emptyMap());
listener.onResponse(extractorFactory);
},
listener::onFailure

View file

@ -124,7 +124,10 @@ public class ExtractedFieldsDetectorFactory {
listener::onFailure
);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(0).query(config.getSource().getParsedQuery());
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.size(0)
.query(config.getSource().getParsedQuery())
.runtimeMappings(config.getSource().getRuntimeMappings());
for (FieldCardinalityConstraint constraint : fieldCardinalityConstraints) {
Map<String, FieldCapabilities> fieldCapsPerType = fieldCapabilitiesResponse.getField(constraint.getField());
if (fieldCapsPerType == null) {
@ -171,6 +174,7 @@ public class ExtractedFieldsDetectorFactory {
fieldCapabilitiesRequest.indices(index);
fieldCapabilitiesRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
fieldCapabilitiesRequest.fields("*");
fieldCapabilitiesRequest.runtimeFields(config.getSource().getRuntimeMappings());
LOGGER.debug(() -> new ParameterizedMessage(
"[{}] Requesting field caps for index {}", config.getId(), Arrays.toString(index)));
ClientHelper.executeWithHeaders(config.getHeaders(), ML_ORIGIN, client, () -> {

View file

@ -463,7 +463,7 @@ public class DestinationIndexTests extends ESTestCase {
private static DataFrameAnalyticsConfig createConfig(DataFrameAnalysis analysis) {
return new DataFrameAnalyticsConfig.Builder()
.setId(ANALYTICS_ID)
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, null))
.setAnalysis(analysis)
.build();

View file

@ -251,11 +251,11 @@ public class MappingsMergerTests extends ESTestCase {
}
private static DataFrameAnalyticsSource newSource() {
return new DataFrameAnalyticsSource(new String[] {"index"}, null, null);
return new DataFrameAnalyticsSource(new String[] {"index"}, null, null, null);
}
private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) {
return new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, excludes));
new FetchSourceContext(true, null, excludes), null);
}
}

View file

@ -565,7 +565,7 @@ public class DataFrameDataExtractorTests extends ESTestCase {
private TestExtractor createExtractor(boolean includeSource, boolean supportsRowsWithMissingValues) {
DataFrameDataExtractorContext context = new DataFrameDataExtractorContext(JOB_ID, extractedFields, indices, query, scrollSize,
headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory);
headers, includeSource, supportsRowsWithMissingValues, trainTestSplitterFactory, Collections.emptyMap());
return new TestExtractor(client, context);
}

View file

@ -1137,7 +1137,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
private DataFrameAnalyticsConfig buildOutlierDetectionConfig() {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new OutlierDetection.Builder().build())
@ -1151,7 +1151,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalysis(new Classification(dependentVariable))
.build();
@ -1160,7 +1160,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, List<PreProcessor> featureprocessors) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new Regression(dependentVariable,

View file

@ -71,7 +71,7 @@ public class InferenceRunnerTests extends ESTestCase {
config = new DataFrameAnalyticsConfig.Builder()
.setId("test")
.setAnalysis(RegressionTests.createRandom())
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("dest_index", "test_results_field"))
.build();
progressTracker = ProgressTracker.fromZeroes(config.getAnalysis().getProgressPhases(), config.getAnalysis().supportsInference());

View file

@ -82,7 +82,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(JOB_ID)
.setDescription(JOB_DESCRIPTION)
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("my_dest", null))
.setAnalysis(new Regression("foo"))
.build();

View file

@ -77,7 +77,7 @@ public class ChunkedTrainedModelPersisterTests extends ESTestCase {
DataFrameAnalyticsConfig analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(JOB_ID)
.setDescription(JOB_DESCRIPTION)
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null, null))
.setDest(new DataFrameAnalyticsDest("my_dest", null))
.setAnalysis(randomBoolean() ? new Regression("foo") : new Classification("foo"))
.build();

View file

@ -2100,3 +2100,39 @@ setup:
}}
- is_false: data_frame_analytics.0.create_time
- is_false: data_frame_analytics.0.version
---
"Test put with runtime mappings":
- do:
ml.put_data_frame_analytics:
id: "with-runtime-mappings"
body: >
{
"source": {
"index": "index-source",
"runtime_mappings": {
"runtime_field": {
"type": "double",
"script": ""
}
}
},
"dest": {
"index": "index-dest"
},
"analysis": {
"outlier_detection": {
}
}
}
- match: { id: "with-runtime-mappings" }
- match: { source.index: ["index-source"] }
- match: { source.runtime_mappings: {
"runtime_field": {
"type": "double",
"script": ""
}
}
}
- match: { dest.index: "index-dest" }