[ML] Add optional source filtering during data frame reindexing (#49690)

This adds a `_source` setting under the `source` setting of a data
frame analytics config. The new `_source` is reusing the structure
of a `FetchSourceContext` like `analyzed_fields` does. Specifying
includes and excludes for source allows selecting which fields
will get reindexed and will be available in the destination index.

Closes #49531
This commit is contained in:
Dimitris Athanasiou 2019-11-29 14:20:31 +02:00 committed by GitHub
parent f8e39d2ff1
commit bad07b76f7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 521 additions and 127 deletions

View file

@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
import java.util.Arrays;
@ -44,20 +45,27 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private static final ParseField INDEX = new ParseField("index");
private static final ParseField QUERY = new ParseField("query");
public static final ParseField _SOURCE = new ParseField("_source");
private static ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_source", true, Builder::new);
static {
PARSER.declareStringArray(Builder::setIndex, INDEX);
PARSER.declareObject(Builder::setQueryConfig, (p, c) -> QueryConfig.fromXContent(p), QUERY);
PARSER.declareField(Builder::setSourceFiltering,
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
}
private final String[] index;
private final QueryConfig queryConfig;
private final FetchSourceContext sourceFiltering;
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig) {
private DataFrameAnalyticsSource(String[] index, @Nullable QueryConfig queryConfig, @Nullable FetchSourceContext sourceFiltering) {
this.index = Objects.requireNonNull(index);
this.queryConfig = queryConfig;
this.sourceFiltering = sourceFiltering;
}
public String[] getIndex() {
@ -68,6 +76,10 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return queryConfig;
}
public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -75,6 +87,9 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
if (queryConfig != null) {
builder.field(QUERY.getPreferredName(), queryConfig.getQuery());
}
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
builder.endObject();
return builder;
}
@ -86,12 +101,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryConfig, other.queryConfig);
&& Objects.equals(queryConfig, other.queryConfig)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryConfig);
return Objects.hash(Arrays.asList(index), queryConfig, sourceFiltering);
}
@Override
@ -103,6 +119,7 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
private String[] index;
private QueryConfig queryConfig;
private FetchSourceContext sourceFiltering;
private Builder() {}
@ -121,8 +138,13 @@ public class DataFrameAnalyticsSource implements ToXContentObject {
return this;
}
public Builder setSourceFiltering(FetchSourceContext sourceFiltering) {
this.sourceFiltering = sourceFiltering;
return this;
}
public DataFrameAnalyticsSource build() {
return new DataFrameAnalyticsSource(index, queryConfig);
return new DataFrameAnalyticsSource(index, queryConfig, sourceFiltering);
}
}
}

View file

@ -2939,6 +2939,9 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
DataFrameAnalyticsSource sourceConfig = DataFrameAnalyticsSource.builder() // <1>
.setIndex("put-test-source-index") // <2>
.setQueryConfig(queryConfig) // <3>
.setSourceFiltering(new FetchSourceContext(true,
new String[] { "included_field_1", "included_field_2" },
new String[] { "excluded_field" })) // <4>
.build();
// end::put-data-frame-analytics-source-config

View file

@ -23,6 +23,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
@ -35,9 +36,17 @@ import static org.elasticsearch.client.ml.dataframe.QueryConfigTests.randomQuery
public class DataFrameAnalyticsSourceTests extends AbstractXContentTestCase<DataFrameAnalyticsSource> {
public static DataFrameAnalyticsSource randomSourceConfig() {
FetchSourceContext sourceFiltering = null;
if (randomBoolean()) {
sourceFiltering = new FetchSourceContext(true,
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
return DataFrameAnalyticsSource.builder()
.setIndex(generateRandomStringArray(10, 10, false, false))
.setQueryConfig(randomBoolean() ? null : randomQueryConfig())
.setSourceFiltering(sourceFiltering)
.build();
}

View file

@ -52,6 +52,7 @@ include-tagged::{doc-tests-file}[{api}-source-config]
<1> Constructing a new DataFrameAnalyticsSource
<2> The source index
<3> The query from which to gather the data. If query is not set, a `match_all` query is used by default.
<4> Source filtering to select which fields will exist in the destination index.
===== QueryConfig

View file

@ -16,17 +16,18 @@
<<dfanalytics-types>>.
`analyzed_fields`::
(object) You can specify both `includes` and/or `excludes` patterns. If
`analyzed_fields` is not set, only the relevant fields will be included. For
example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand field selection.
`includes`:::
(array) An array of strings that defines the fields that will be included in
(Optional, array) An array of strings that defines the fields that will be included in
the analysis.
`excludes`:::
(array) An array of strings that defines the fields that will be excluded
(Optional, array) An array of strings that defines the fields that will be excluded
from the analysis.
@ -81,8 +82,8 @@ PUT _ml/data_frame/analytics/loganalytics
that setting. For more information, see <<ml-settings>>.
`source`::
(object) The source configuration consisting an `index` and optionally a
`query` object.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
`index`:::
(Required, string or array) Index or indices on which to perform the
@ -96,6 +97,19 @@ PUT _ml/data_frame/analytics/loganalytics
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.
`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.
`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.
[[dfanalytics-types]]
==== Analysis objects

View file

@ -101,12 +101,12 @@ single number. For example, in case of age ranges, you can model the values as
<<dfanalytics-types>>.
`analyzed_fields`::
(Optional, object) You can specify both `includes` and/or `excludes` patterns.
If `analyzed_fields` is not set, only the relevant fields will be included.
For example, all the numeric fields for {oldetection}. For the supported field
types, see <<ml-put-dfanalytics-supported-fields>>. If you specify fields
either in `includes` or in `excludes` that have a data type that is not
supported, an error occurs.
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be included in the analysis. If `analyzed_fields` is not set,
only the relevant fields will be included. For example, all the numeric fields
for {oldetection}. For the supported field types, see <<ml-put-dfanalytics-supported-fields>>.
Also see the <<explain-dfanalytics>> which helps understand
field selection.
`includes`:::
(Optional, array) An array of strings that defines the fields that will be
@ -142,8 +142,8 @@ single number. For example, in case of age ranges, you can model the values as
that setting. For more information, see <<ml-settings>>.
`source`::
(Required, object) The source configuration, consisting of `index` and
optionally a `query`.
(object) The configuration of how to source the analysis data. It requires an `index`.
Optionally, `query` and `_source` may be specified.
`index`:::
(Required, string or array) Index or indices on which to perform the
@ -157,6 +157,19 @@ single number. For example, in case of age ranges, you can model the values as
as this object is passed verbatim to {es}. By default, this property has
the following value: `{"match_all": {}}`.
`_source`:::
(Optional, object) Specify `includes` and/or `excludes` patterns to select
which fields will be present in the destination. Fields that are excluded
cannot be included in the analysis.
`includes`::::
(array) An array of strings that defines the fields that will be included in
the destination.
`excludes`::::
(array) An array of strings that defines the fields that will be excluded
from the destination.
`allow_lazy_start`::
(Optional, boolean) Whether this job should be allowed to start when there
is insufficient {ml} node capacity for it to be immediately assigned to a node.

View file

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
@ -18,6 +19,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import java.io.IOException;
@ -87,6 +89,24 @@ public class PutDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalytic
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException error = null;
error = checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(config, error);
return error;
}
private ActionRequestValidationException checkNoIncludedAnalyzedFieldsAreExcludedBySourceFiltering(
DataFrameAnalyticsConfig config, ActionRequestValidationException error) {
if (config.getAnalyzedFields() == null) {
return null;
}
for (String analyzedInclude : config.getAnalyzedFields().includes()) {
if (config.getSource().isFieldExcluded(analyzedInclude)) {
return ValidateActions.addValidationError("field [" + analyzedInclude + "] is included in ["
+ DataFrameAnalyticsConfig.ANALYZED_FIELDS.getPreferredName() + "] but not in ["
+ DataFrameAnalyticsConfig.SOURCE.getPreferredName() + "."
+ DataFrameAnalyticsSource._SOURCE.getPreferredName() + "]", error);
}
}
return null;
}

View file

@ -127,7 +127,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
private final Version version;
private final boolean allowLazyStart;
public DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
FetchSourceContext analyzedFields, Instant createTime, Version version, boolean allowLazyStart) {
this.id = ExceptionsHelper.requireNonNull(id, ID);

View file

@ -6,17 +6,21 @@
package org.elasticsearch.xpack.core.ml.dataframe;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
@ -33,20 +37,29 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
public static final ParseField INDEX = new ParseField("index");
public static final ParseField QUERY = new ParseField("query");
public static final ParseField _SOURCE = new ParseField("_source");
public static ConstructingObjectParser<DataFrameAnalyticsSource, Void> createParser(boolean ignoreUnknownFields) {
ConstructingObjectParser<DataFrameAnalyticsSource, Void> parser = new ConstructingObjectParser<>("data_frame_analytics_source",
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(((List<String>) a[0]).toArray(new String[0]), (QueryProvider) a[1]));
ignoreUnknownFields, a -> new DataFrameAnalyticsSource(
((List<String>) a[0]).toArray(new String[0]),
(QueryProvider) a[1],
(FetchSourceContext) a[2]));
parser.declareStringArray(ConstructingObjectParser.constructorArg(), INDEX);
parser.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> QueryProvider.fromXContent(p, ignoreUnknownFields, Messages.DATA_FRAME_ANALYTICS_BAD_QUERY_FORMAT), QUERY);
parser.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FetchSourceContext.fromXContent(p),
_SOURCE,
ObjectParser.ValueType.OBJECT_ARRAY_BOOLEAN_OR_STRING);
return parser;
}
private final String[] index;
private final QueryProvider queryProvider;
private final FetchSourceContext sourceFiltering;
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider) {
public DataFrameAnalyticsSource(String[] index, @Nullable QueryProvider queryProvider, @Nullable FetchSourceContext sourceFiltering) {
this.index = ExceptionsHelper.requireNonNull(index, INDEX);
if (index.length == 0) {
throw new IllegalArgumentException("source.index must specify at least one index");
@ -55,22 +68,36 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
throw new IllegalArgumentException("source.index must contain non-null and non-empty strings");
}
this.queryProvider = queryProvider == null ? QueryProvider.defaultQuery() : queryProvider;
if (sourceFiltering != null && sourceFiltering.fetchSource() == false) {
throw new IllegalArgumentException("source._source cannot be disabled");
}
this.sourceFiltering = sourceFiltering;
}
public DataFrameAnalyticsSource(StreamInput in) throws IOException {
index = in.readStringArray();
queryProvider = QueryProvider.fromStream(in);
if (in.getVersion().onOrAfter(Version.CURRENT)) {
sourceFiltering = in.readOptionalWriteable(FetchSourceContext::new);
} else {
sourceFiltering = null;
}
}
public DataFrameAnalyticsSource(DataFrameAnalyticsSource other) {
this.index = Arrays.copyOf(other.index, other.index.length);
this.queryProvider = new QueryProvider(other.queryProvider);
this.sourceFiltering = other.sourceFiltering == null ? null : new FetchSourceContext(
other.sourceFiltering.fetchSource(), other.sourceFiltering.includes(), other.sourceFiltering.excludes());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeStringArray(index);
queryProvider.writeTo(out);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(sourceFiltering);
}
}
@Override
@ -78,6 +105,9 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
builder.startObject();
builder.array(INDEX.getPreferredName(), index);
builder.field(QUERY.getPreferredName(), queryProvider.getQuery());
if (sourceFiltering != null) {
builder.field(_SOURCE.getPreferredName(), sourceFiltering);
}
builder.endObject();
return builder;
}
@ -89,12 +119,13 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
DataFrameAnalyticsSource other = (DataFrameAnalyticsSource) o;
return Arrays.equals(index, other.index)
&& Objects.equals(queryProvider, other.queryProvider);
&& Objects.equals(queryProvider, other.queryProvider)
&& Objects.equals(sourceFiltering, other.sourceFiltering);
}
@Override
public int hashCode() {
return Objects.hash(Arrays.asList(index), queryProvider);
return Objects.hash(Arrays.asList(index), queryProvider, sourceFiltering);
}
public String[] getIndex() {
@ -118,6 +149,10 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
return queryProvider.getParsedQuery();
}
public FetchSourceContext getSourceFiltering() {
return sourceFiltering;
}
Exception getQueryParsingException() {
return queryProvider.getParsingException();
}
@ -147,4 +182,47 @@ public class DataFrameAnalyticsSource implements Writeable, ToXContentObject {
Map<String, Object> getQuery() {
return queryProvider.getQuery();
}
public boolean isFieldExcluded(String path) {
if (sourceFiltering == null) {
return false;
}
// First we check in the excludes as they are applied last
for (String exclude : sourceFiltering.excludes()) {
if (pathMatchesSourcePattern(path, exclude)) {
return true;
}
}
// Now we can check the includes
// Empty includes means no further exclusions
if (sourceFiltering.includes().length == 0) {
return false;
}
for (String include : sourceFiltering.includes()) {
if (pathMatchesSourcePattern(path, include)) {
return false;
}
}
return true;
}
private static boolean pathMatchesSourcePattern(String path, String sourcePattern) {
if (sourcePattern.equals(path)) {
return true;
}
if (Regex.isSimpleMatchPattern(sourcePattern)) {
return Regex.simpleMatch(sourcePattern, path);
}
// At this stage sourcePattern is a concrete field name and path is not equal to it.
// We should check if path is a nested field of pattern.
// Let us take "foo" as an example.
// Fields that are "foo.*" should also be matched.
return Regex.simpleMatch(sourcePattern + ".*", path);
}
}

View file

@ -418,6 +418,9 @@ public class ElasticsearchMappings {
.startObject(DataFrameAnalyticsSource.QUERY.getPreferredName())
.field(ENABLED, false)
.endObject()
.startObject(DataFrameAnalyticsSource._SOURCE.getPreferredName())
.field(ENABLED, false)
.endObject()
.endObject()
.endObject()
.startObject(DataFrameAnalyticsConfig.DEST.getPreferredName())

View file

@ -303,6 +303,7 @@ public final class ReservedFieldNames {
DataFrameAnalyticsDest.RESULTS_FIELD.getPreferredName(),
DataFrameAnalyticsSource.INDEX.getPreferredName(),
DataFrameAnalyticsSource.QUERY.getPreferredName(),
DataFrameAnalyticsSource._SOURCE.getPreferredName(),
OutlierDetection.NAME.getPreferredName(),
OutlierDetection.N_NEIGHBORS.getPreferredName(),
OutlierDetection.METHOD.getPreferredName(),

View file

@ -11,16 +11,25 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.action.PutDataFrameAnalyticsAction.Request;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfigTests;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.MlDataFrameAnalysisNamedXContentProvider;
import org.elasticsearch.xpack.core.ml.dataframe.analyses.OutlierDetectionTests;
import org.junit.Before;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializingTestCase<Request> {
private String id;
@ -65,4 +74,39 @@ public class PutDataFrameAnalyticsActionRequestTests extends AbstractSerializing
protected Request doParseInstance(XContentParser parser) {
return Request.parseRequest(id, parser);
}
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsExcludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, new String[] {"excluded"}));
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"excluded"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(source)
.setAnalysis(OutlierDetectionTests.createRandom())
.setAnalyzedFields(analyzedFields)
.buildForExplain();
Request request = new Request(config);
Exception e = request.validate();
assertThat(e, is(notNullValue()));
assertThat(e.getMessage(), containsString("field [excluded] is included in [analyzed_fields] but not in [source._source]"));
}
public void testValidate_GivenRequestWithIncludedAnalyzedFieldThatIsIncludedInSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, new String[] {"included"}, null));
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"included"}, null);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(source)
.setAnalysis(OutlierDetectionTests.createRandom())
.setAnalyzedFields(analyzedFields)
.buildForExplain();
Request request = new Request(config);
Exception e = request.validate();
assertThat(e, is(nullValue()));
}
}

View file

@ -12,12 +12,18 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.AbstractSerializingTestCase;
import org.elasticsearch.xpack.core.ml.utils.QueryProvider;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<DataFrameAnalyticsSource> {
@ -46,6 +52,7 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<D
public static DataFrameAnalyticsSource createRandom() {
String[] index = generateRandomStringArray(10, 10, false, false);
QueryProvider queryProvider = null;
FetchSourceContext sourceFiltering = null;
if (randomBoolean()) {
try {
queryProvider = QueryProvider.fromParsedQuery(QueryBuilders.termQuery(randomAlphaOfLength(10), randomAlphaOfLength(10)));
@ -54,11 +61,75 @@ public class DataFrameAnalyticsSourceTests extends AbstractSerializingTestCase<D
throw new UncheckedIOException(e);
}
}
return new DataFrameAnalyticsSource(index, queryProvider);
if (randomBoolean()) {
sourceFiltering = new FetchSourceContext(true,
generateRandomStringArray(10, 10, false, false),
generateRandomStringArray(10, 10, false, false));
}
return new DataFrameAnalyticsSource(index, queryProvider, sourceFiltering);
}
@Override
protected Writeable.Reader<DataFrameAnalyticsSource> instanceReader() {
return DataFrameAnalyticsSource::new;
}
public void testConstructor_GivenDisabledSource() {
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> new DataFrameAnalyticsSource(
new String[] {"index"}, null, new FetchSourceContext(false, null, null)));
assertThat(e.getMessage(), equalTo("source._source cannot be disabled"));
}
public void testIsFieldExcluded_GivenNoSourceFiltering() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null, null);
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
public void testIsFieldExcluded_GivenSourceFilteringWithNulls() {
DataFrameAnalyticsSource source = new DataFrameAnalyticsSource(new String[] { "index" }, null,
new FetchSourceContext(true, null, null));
assertThat(source.isFieldExcluded(randomAlphaOfLength(10)), is(false));
}
public void testIsFieldExcluded_GivenExcludes() {
assertThat(newSourceWithExcludes("foo").isFieldExcluded("bar"), is(false));
assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo"), is(true));
assertThat(newSourceWithExcludes("foo").isFieldExcluded("foo.bar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foobar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo.bar"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("foo*"), is(true));
assertThat(newSourceWithExcludes("foo*").isFieldExcluded("fo*"), is(false));
}
public void testIsFieldExcluded_GivenIncludes() {
assertThat(newSourceWithIncludes("foo").isFieldExcluded("bar"), is(true));
assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo"), is(false));
assertThat(newSourceWithIncludes("foo").isFieldExcluded("foo.bar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foobar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo.bar"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("foo*"), is(false));
assertThat(newSourceWithIncludes("foo*").isFieldExcluded("fo*"), is(true));
}
public void testIsFieldExcluded_GivenIncludesAndExcludes() {
// Excludes take precedence
assertThat(newSourceWithIncludesExcludes(Collections.singletonList("foo"), Collections.singletonList("foo"))
.isFieldExcluded("foo"), is(true));
}
private static DataFrameAnalyticsSource newSourceWithIncludes(String... includes) {
return newSourceWithIncludesExcludes(Arrays.asList(includes), Collections.emptyList());
}
private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) {
return newSourceWithIncludesExcludes(Collections.emptyList(), Arrays.asList(excludes));
}
private static DataFrameAnalyticsSource newSourceWithIncludesExcludes(List<String> includes, List<String> excludes) {
FetchSourceContext sourceFiltering = new FetchSourceContext(true,
includes.toArray(new String[0]), excludes.toArray(new String[0]));
return new DataFrameAnalyticsSource(new String[] { "index" } , null, sourceFiltering);
}
}

View file

@ -52,6 +52,7 @@ integTest.runner {
'ml/data_frame_analytics_crud/Test put config with dest index included in source via alias',
'ml/data_frame_analytics_crud/Test put config with unknown top level field',
'ml/data_frame_analytics_crud/Test put config with unknown field in outlier detection analysis',
'ml/data_frame_analytics_crud/Test put config given analyzed_fields include field excluded by source',
'ml/data_frame_analytics_crud/Test put config given missing source',
'ml/data_frame_analytics_crud/Test put config given source with empty index array',
'ml/data_frame_analytics_crud/Test put config given source with empty string in index array',

View file

@ -53,7 +53,8 @@ public class ExplainDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsInteg
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex },
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one"))))
QueryProvider.fromParsedQuery(QueryBuilders.termQuery("categorical", "only-one")),
null))
.setAnalysis(new Classification("categorical"))
.buildForExplain();

View file

@ -164,7 +164,7 @@ abstract class MlNativeDataFrameAnalyticsIntegTestCase extends MlNativeIntegTest
@Nullable String resultsField, DataFrameAnalysis analysis) {
return new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, resultsField))
.setAnalysis(analysis)
.build();

View file

@ -356,7 +356,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
String id = "test_outlier_detection_with_multiple_source_indices";
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(sourceIndex, null))
.setSource(new DataFrameAnalyticsSource(sourceIndex, null, null))
.setDest(new DataFrameAnalyticsDest(destIndex, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();
@ -472,7 +472,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.MB);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)
@ -516,7 +516,7 @@ public class RunDataFrameAnalyticsIT extends MlNativeDataFrameAnalyticsIntegTest
ByteSizeValue modelMemoryLimit = new ByteSizeValue(1, ByteSizeUnit.TB);
DataFrameAnalyticsConfig config = new DataFrameAnalyticsConfig.Builder()
.setId(id)
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null))
.setSource(new DataFrameAnalyticsSource(new String[] { sourceIndex }, null, null))
.setDest(new DataFrameAnalyticsDest(sourceIndex + "-results", null))
.setAnalysis(new OutlierDetection.Builder().build())
.setModelMemoryLimit(modelMemoryLimit)

View file

@ -237,7 +237,7 @@ public class TransportStartDataFrameAnalyticsAction
// Step 5. Validate mappings can be merged
ActionListener<StartContext> toValidateMappingsListener = ActionListener.wrap(
startContext -> MappingsMerger.mergeMappings(client, startContext.config.getHeaders(),
startContext.config.getSource().getIndex(), ActionListener.wrap(
startContext.config.getSource(), ActionListener.wrap(
mappings -> validateMappingsMergeListener.onResponse(startContext), finalListener::onFailure)),
finalListener::onFailure
);

View file

@ -84,8 +84,6 @@ public final class DataFrameAnalyticsIndex {
ActionListener<CreateIndexRequest> listener) {
AtomicReference<Settings> settingsHolder = new AtomicReference<>();
String[] sourceIndex = config.getSource().getIndex();
ActionListener<ImmutableOpenMap<String, MappingMetaData>> mappingsListener = ActionListener.wrap(
mappings -> listener.onResponse(createIndexRequest(clock, config, settingsHolder.get(), mappings)),
listener::onFailure
@ -94,7 +92,7 @@ public final class DataFrameAnalyticsIndex {
ActionListener<Settings> settingsListener = ActionListener.wrap(
settings -> {
settingsHolder.set(settings);
MappingsMerger.mergeMappings(client, config.getHeaders(), sourceIndex, mappingsListener);
MappingsMerger.mergeMappings(client, config.getHeaders(), config.getSource(), mappingsListener);
},
listener::onFailure
);
@ -105,7 +103,7 @@ public final class DataFrameAnalyticsIndex {
);
GetSettingsRequest getSettingsRequest = new GetSettingsRequest();
getSettingsRequest.indices(sourceIndex);
getSettingsRequest.indices(config.getSource().getIndex());
getSettingsRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
getSettingsRequest.names(PRESERVED_SETTINGS);
ClientHelper.executeWithHeadersAsync(config.getHeaders(), ML_ORIGIN, client, GetSettingsAction.INSTANCE,

View file

@ -179,6 +179,7 @@ public class DataFrameAnalyticsManager {
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(config.getSource().getIndex());
reindexRequest.setSourceQuery(config.getSource().getParsedQuery());
reindexRequest.getSearchRequest().source().fetchSource(config.getSource().getSourceFiltering());
reindexRequest.setDestIndex(config.getDest().getIndex());
reindexRequest.setScript(new Script("ctx._source." + DataFrameAnalyticsIndex.ID_COPY + " = ctx._id"));

View file

@ -15,6 +15,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import java.util.Collections;
@ -32,22 +33,22 @@ public final class MappingsMerger {
private MappingsMerger() {}
public static void mergeMappings(Client client, Map<String, String> headers, String[] index,
public static void mergeMappings(Client client, Map<String, String> headers, DataFrameAnalyticsSource source,
ActionListener<ImmutableOpenMap<String, MappingMetaData>> listener) {
ActionListener<GetMappingsResponse> mappingsListener = ActionListener.wrap(
getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(getMappingsResponse)),
getMappingsResponse -> listener.onResponse(MappingsMerger.mergeMappings(source, getMappingsResponse)),
listener::onFailure
);
GetMappingsRequest getMappingsRequest = new GetMappingsRequest();
getMappingsRequest.indices(index);
getMappingsRequest.indices(source.getIndex());
ClientHelper.executeWithHeadersAsync(headers, ML_ORIGIN, client, GetMappingsAction.INSTANCE, getMappingsRequest, mappingsListener);
}
static ImmutableOpenMap<String, MappingMetaData> mergeMappings(GetMappingsResponse getMappingsResponse) {
static ImmutableOpenMap<String, MappingMetaData> mergeMappings(DataFrameAnalyticsSource source,
GetMappingsResponse getMappingsResponse) {
ImmutableOpenMap<String, MappingMetaData> indexToMappings = getMappingsResponse.getMappings();
String type = null;
Map<String, Object> mergedMappings = new HashMap<>();
Iterator<ObjectObjectCursor<String, MappingMetaData>> iterator = indexToMappings.iterator();
@ -61,13 +62,16 @@ public final class MappingsMerger {
Map<String, Object> fieldMappings = (Map<String, Object>) currentMappings.get("properties");
for (Map.Entry<String, Object> fieldMapping : fieldMappings.entrySet()) {
if (mergedMappings.containsKey(fieldMapping.getKey())) {
if (mergedMappings.get(fieldMapping.getKey()).equals(fieldMapping.getValue()) == false) {
throw ExceptionsHelper.badRequestException("cannot merge mappings because of differences for field [{}]",
fieldMapping.getKey());
String field = fieldMapping.getKey();
if (source.isFieldExcluded(field) == false) {
if (mergedMappings.containsKey(field)) {
if (mergedMappings.get(field).equals(fieldMapping.getValue()) == false) {
throw ExceptionsHelper.badRequestException(
"cannot merge mappings because of differences for field [{}]", field);
}
} else {
mergedMappings.put(fieldMapping.getKey(), fieldMapping.getValue());
mergedMappings.put(field, fieldMapping.getValue());
}
}
}
}

View file

@ -85,6 +85,7 @@ public class ExtractedFieldsDetector {
fields.removeAll(IGNORE_FIELDS);
checkResultsFieldIsNotPresent();
removeFieldsUnderResultsField(fields);
applySourceFiltering(fields);
FetchSourceContext analyzedFields = config.getAnalyzedFields();
// If the user has not explicitly included fields we'll include all compatible fields
@ -132,6 +133,16 @@ public class ExtractedFieldsDetector {
}
}
private void applySourceFiltering(Set<String> fields) {
Iterator<String> fieldsIterator = fields.iterator();
while (fieldsIterator.hasNext()) {
String field = fieldsIterator.next();
if (config.getSource().isFieldExcluded(field)) {
fieldsIterator.remove();
}
}
}
private void addExcludedField(String field, String reason, Set<FieldSelection> fieldSelection) {
fieldSelection.add(FieldSelection.excluded(field, getMappingTypes(field), reason));
}

View file

@ -58,7 +58,7 @@ public class DataFrameAnalyticsIndexTests extends ESTestCase {
private static final DataFrameAnalyticsConfig ANALYTICS_CONFIG =
new DataFrameAnalyticsConfig.Builder()
.setId(ANALYTICS_ID)
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, null))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, null))
.setAnalysis(new OutlierDetection.Builder().build())
.build();

View file

@ -10,9 +10,10 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsSource;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.containsInAnyOrder;
@ -21,7 +22,7 @@ import static org.hamcrest.Matchers.is;
public class MappingsMergerTests extends ESTestCase {
public void testMergeMappings_GivenIndicesWithIdenticalMappings() throws IOException {
public void testMergeMappings_GivenIndicesWithIdenticalMappings() {
Map<String, Object> index1Mappings = Map.of("properties", Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings"));
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
@ -34,14 +35,14 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
assertThat(mergedMappings.valuesIt().next().getSourceAsMap(), equalTo(index1Mappings));
}
public void testMergeMappings_GivenFieldWithDifferentMapping() throws IOException {
public void testMergeMappings_GivenFieldWithDifferentMapping() {
Map<String, Object> index1Mappings = Map.of("properties", Map.of("field_1", "field_1_mappings"));
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
@ -55,12 +56,12 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> MappingsMerger.mergeMappings(getMappingsResponse));
() -> MappingsMerger.mergeMappings(newSource(), getMappingsResponse));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("cannot merge mappings because of differences for field [field_1]"));
}
public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() throws IOException {
public void testMergeMappings_GivenIndicesWithDifferentMappingsButNoConflicts() {
Map<String, Object> index1Mappings = Map.of("properties",
Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings"));
MappingMetaData index1MappingMetaData = new MappingMetaData("_doc", index1Mappings);
@ -75,7 +76,7 @@ public class MappingsMergerTests extends ESTestCase {
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(getMappingsResponse);
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(newSource(), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
@ -92,4 +93,35 @@ public class MappingsMergerTests extends ESTestCase {
assertThat(fieldMappings.get("field_2"), equalTo("field_2_mappings"));
assertThat(fieldMappings.get("field_3"), equalTo("field_3_mappings"));
}
public void testMergeMappings_GivenSourceFiltering() {
Map<String, Object> indexMappings = Map.of("properties", Map.of("field_1", "field_1_mappings", "field_2", "field_2_mappings"));
MappingMetaData indexMappingMetaData = new MappingMetaData("_doc", indexMappings);
ImmutableOpenMap.Builder<String, MappingMetaData> mappings = ImmutableOpenMap.builder();
mappings.put("index", indexMappingMetaData);
GetMappingsResponse getMappingsResponse = new GetMappingsResponse(mappings.build());
ImmutableOpenMap<String, MappingMetaData> mergedMappings = MappingsMerger.mergeMappings(
newSourceWithExcludes("field_1"), getMappingsResponse);
assertThat(mergedMappings.size(), equalTo(1));
assertThat(mergedMappings.containsKey("_doc"), is(true));
Map<String, Object> mappingsAsMap = mergedMappings.valuesIt().next().getSourceAsMap();
@SuppressWarnings("unchecked")
Map<String, Object> fieldMappings = (Map<String, Object>) mappingsAsMap.get("properties");
assertThat(fieldMappings.size(), equalTo(1));
assertThat(fieldMappings.containsKey("field_2"), is(true));
}
private static DataFrameAnalyticsSource newSource() {
return new DataFrameAnalyticsSource(new String[] {"index"}, null, null);
}
private static DataFrameAnalyticsSource newSourceWithExcludes(String... excludes) {
return new DataFrameAnalyticsSource(new String[] {"index"}, null,
new FetchSourceContext(true, null, excludes));
}
}

View file

@ -183,6 +183,6 @@ public class SourceDestValidatorTests extends ESTestCase {
}
private static DataFrameAnalyticsSource createSource(String... index) {
return new DataFrameAnalyticsSource(index, null);
return new DataFrameAnalyticsSource(index, null, null);
}
}

View file

@ -45,6 +45,9 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
private static final String DEST_INDEX = "dest_index";
private static final String RESULTS_FIELD = "ml";
private FetchSourceContext sourceFiltering;
private FetchSourceContext analyzedFields;
public void testDetect_GivenFloatField() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("some_float", "float").build();
@ -87,7 +90,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]." +
" Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -99,7 +102,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -171,7 +174,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -183,11 +186,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("some_keyword", "keyword")
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
analyzedFields = new FetchSourceContext(true, new String[0], new String[] {"foo"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -199,11 +202,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("some_keyword", "keyword")
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[] {"some_float", "some_keyword"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("required field [foo] is missing; analysis requires fields [foo]"));
}
@ -213,10 +216,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("foo", "float")
.addAggregatableField("bar", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"});
analyzedFields = new FetchSourceContext(true, new String[] {"foo", "bar"}, new String[] {"foo"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
@ -239,7 +242,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("foo"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("invalid types [keyword] for required field [foo]; " +
"expected types are [byte, double, float, half_float, integer, long, scaled_float, short]"));
@ -255,7 +258,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildClassificationConfig("some_float"), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("invalid types [float] for required field [some_float]; " +
"expected types are [boolean, byte, integer, ip, keyword, long, short, text]"));
@ -270,7 +273,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(SOURCE_INDEX,
buildClassificationConfig("some_keyword"), false, 100, fieldCapabilities, Collections.singletonMap("some_keyword", 3L));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("Field [some_keyword] must have at most [2] distinct values but there were at least [3]"));
}
@ -281,7 +284,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -291,11 +294,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("_id", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{"_id"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [_id] could be detected"));
}
@ -304,11 +307,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("foo", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"});
analyzedFields = new FetchSourceContext(true, new String[]{"*"}, new String[] {"bar"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [bar] could be detected"));
}
@ -318,10 +321,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("numeric", "float")
.addAggregatableField("categorical", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"});
analyzedFields = new FetchSourceContext(true, null, new String[] {"categorical"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
@ -366,11 +369,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("my_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{"your_field1", "my*"}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [your_field1] could be detected"));
}
@ -381,11 +384,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("my_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
analyzedFields = new FetchSourceContext(true, new String[0], new String[]{"my_*"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No compatible fields could be detected in index [source_index]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
}
@ -397,10 +400,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<String> extractedFieldNames = fieldExtraction.v1().getAllFields().stream().map(ExtractedField::getName)
@ -422,11 +425,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext desiredFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
analyzedFields = new FetchSourceContext(true, new String[]{"your*", "my_*"}, new String[]{"*nope"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(desiredFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("field [your_keyword] has unsupported type [keyword]. " +
"Supported types are [boolean, byte, double, float, half_float, integer, long, scaled_float, short]."));
@ -442,7 +445,7 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
"please set a different results_field"));
@ -479,11 +482,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("A field that matches the dest.results_field [ml] already exists; " +
"please set a different results_field"));
@ -496,11 +499,11 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("your_field2", "float")
.addAggregatableField("your_keyword", "keyword")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[]{RESULTS_FIELD}, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(analyzedFields), true, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, () -> extractedFieldsDetector.detect());
SOURCE_INDEX, buildOutlierDetectionConfig(), true, 100, fieldCapabilities, Collections.emptyMap());
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, extractedFieldsDetector::detect);
assertThat(e.getMessage(), equalTo("No field [ml] could be detected"));
}
@ -814,10 +817,10 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
.addAggregatableField("field_1.keyword", "keyword")
.addAggregatableField("field_2", "float")
.build();
FetchSourceContext analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
analyzedFields = new FetchSourceContext(true, new String[] { "field_1", "field_2" }, new String[0]);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildRegressionConfig("field_2", analyzedFields), false, 100, fieldCapabilities, Collections.emptyMap());
SOURCE_INDEX, buildRegressionConfig("field_2"), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
assertThat(fieldExtraction.v1().getAllFields().size(), equalTo(2));
@ -832,38 +835,76 @@ public class ExtractedFieldsDetectorTests extends ESTestCase {
);
}
private static DataFrameAnalyticsConfig buildOutlierDetectionConfig() {
return buildOutlierDetectionConfig(null);
public void testDetect_GivenSourceFilteringWithIncludes() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("field_11", "float")
.addAggregatableField("field_12", "float")
.addAggregatableField("field_21", "float")
.addAggregatableField("field_22", "float").build();
sourceFiltering = new FetchSourceContext(true, new String[] {"field_1*"}, null);
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
assertThat(allFields.size(), equalTo(2));
assertThat(allFields.get(0).getName(), equalTo("field_11"));
assertThat(allFields.get(1).getName(), equalTo("field_12"));
assertFieldSelectionContains(fieldExtraction.v2(),
FieldSelection.included("field_11", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL),
FieldSelection.included("field_12", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL));
}
private static DataFrameAnalyticsConfig buildOutlierDetectionConfig(FetchSourceContext analyzedFields) {
public void testDetect_GivenSourceFilteringWithExcludes() {
FieldCapabilitiesResponse fieldCapabilities = new MockFieldCapsResponseBuilder()
.addAggregatableField("field_11", "float")
.addAggregatableField("field_12", "float")
.addAggregatableField("field_21", "float")
.addAggregatableField("field_22", "float").build();
sourceFiltering = new FetchSourceContext(true, null, new String[] {"field_1*"});
ExtractedFieldsDetector extractedFieldsDetector = new ExtractedFieldsDetector(
SOURCE_INDEX, buildOutlierDetectionConfig(), false, 100, fieldCapabilities, Collections.emptyMap());
Tuple<ExtractedFields, List<FieldSelection>> fieldExtraction = extractedFieldsDetector.detect();
List<ExtractedField> allFields = fieldExtraction.v1().getAllFields();
assertThat(allFields.size(), equalTo(2));
assertThat(allFields.get(0).getName(), equalTo("field_21"));
assertThat(allFields.get(1).getName(), equalTo("field_22"));
assertFieldSelectionContains(fieldExtraction.v2(),
FieldSelection.included("field_21", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL),
FieldSelection.included("field_22", Collections.singleton("float"), false, FieldSelection.FeatureType.NUMERICAL));
}
private DataFrameAnalyticsConfig buildOutlierDetectionConfig() {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new OutlierDetection.Builder().build())
.build();
}
private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) {
return buildRegressionConfig(dependentVariable, null);
}
private static DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable, FetchSourceContext analyzedFields) {
private DataFrameAnalyticsConfig buildRegressionConfig(String dependentVariable) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalyzedFields(analyzedFields)
.setAnalysis(new Regression(dependentVariable))
.build();
}
private static DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) {
private DataFrameAnalyticsConfig buildClassificationConfig(String dependentVariable) {
return new DataFrameAnalyticsConfig.Builder()
.setId("foo")
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null))
.setSource(new DataFrameAnalyticsSource(SOURCE_INDEX, null, sourceFiltering))
.setDest(new DataFrameAnalyticsDest(DEST_INDEX, RESULTS_FIELD))
.setAnalysis(new Classification(dependentVariable))
.build();

View file

@ -71,7 +71,7 @@ public class AnalyticsResultProcessorTests extends ESTestCase {
analyticsConfig = new DataFrameAnalyticsConfig.Builder()
.setId(JOB_ID)
.setDescription(JOB_DESCRIPTION)
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null))
.setSource(new DataFrameAnalyticsSource(new String[] {"my_source"}, null, null))
.setDest(new DataFrameAnalyticsDest("my_dest", null))
.setAnalysis(new Regression("foo"))
.build();

View file

@ -41,7 +41,8 @@ setup:
{
"source": {
"index": "index-source",
"query": {"term" : { "user" : "Kimchy" }}
"query": {"term" : { "user" : "Kimchy" }},
"_source": [ "obj1.*", "obj2.*" ]
},
"dest": {
"index": "index-dest"
@ -1852,3 +1853,28 @@ setup:
}}
- is_true: create_time
- is_true: version
---
"Test put config given analyzed_fields include field excluded by source":
- do:
catch: /field \[excluded\] is included in \[analyzed_fields\] but not in \[source._source\]/
ml.put_data_frame_analytics:
id: "analyzed_fields-include-field-excluded-by-source"
body: >
{
"source": {
"index": "index-source",
"query": {"term" : { "user" : "Kimchy" }},
"_source": {
"excludes": ["excluded"]
}
},
"dest": {
"index": "index-dest"
},
"analysis": {"outlier_detection":{}},
"analyzed_fields": {
"includes": ["excluded"]
}
}