mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-25 07:37:19 -04:00
[ML] Data frame analytics max_num_threads setting (#59254)
This adds a setting to data frame analytics jobs called `max_number_threads`. The setting expects a positive integer. When used the user specifies the max number of threads that may be used by the analysis. Note that the actual number of threads used is limited by the number of processors on the node where the job is assigned. Also, the process may use a couple more threads for operational functionality that is not the analysis itself. This setting may also be updated for a stopped job. More threads may reduce the time it takes to complete the job at the cost of using more CPU.
This commit is contained in:
parent
650f20eb0d
commit
da0249f6c2
16 changed files with 206 additions and 29 deletions
|
@ -57,6 +57,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
static final ParseField CREATE_TIME = new ParseField("create_time");
|
||||
static final ParseField VERSION = new ParseField("version");
|
||||
static final ParseField ALLOW_LAZY_START = new ParseField("allow_lazy_start");
|
||||
static final ParseField MAX_NUM_THREADS = new ParseField("max_num_threads");
|
||||
|
||||
private static final ObjectParser<Builder, Void> PARSER = new ObjectParser<>("data_frame_analytics_config", true, Builder::new);
|
||||
|
||||
|
@ -80,6 +81,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
ValueType.VALUE);
|
||||
PARSER.declareString(Builder::setVersion, Version::fromString, VERSION);
|
||||
PARSER.declareBoolean(Builder::setAllowLazyStart, ALLOW_LAZY_START);
|
||||
PARSER.declareInt(Builder::setMaxNumThreads, MAX_NUM_THREADS);
|
||||
}
|
||||
|
||||
private static DataFrameAnalysis parseAnalysis(XContentParser parser) throws IOException {
|
||||
|
@ -100,11 +102,13 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
private final Instant createTime;
|
||||
private final Version version;
|
||||
private final Boolean allowLazyStart;
|
||||
private final Integer maxNumThreads;
|
||||
|
||||
private DataFrameAnalyticsConfig(@Nullable String id, @Nullable String description, @Nullable DataFrameAnalyticsSource source,
|
||||
@Nullable DataFrameAnalyticsDest dest, @Nullable DataFrameAnalysis analysis,
|
||||
@Nullable FetchSourceContext analyzedFields, @Nullable ByteSizeValue modelMemoryLimit,
|
||||
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart) {
|
||||
@Nullable Instant createTime, @Nullable Version version, @Nullable Boolean allowLazyStart,
|
||||
@Nullable Integer maxNumThreads) {
|
||||
this.id = id;
|
||||
this.description = description;
|
||||
this.source = source;
|
||||
|
@ -115,6 +119,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());;
|
||||
this.version = version;
|
||||
this.allowLazyStart = allowLazyStart;
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -157,6 +162,10 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
return allowLazyStart;
|
||||
}
|
||||
|
||||
public Integer getMaxNumThreads() {
|
||||
return maxNumThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -193,6 +202,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
if (allowLazyStart != null) {
|
||||
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
|
||||
}
|
||||
if (maxNumThreads != null) {
|
||||
builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -212,12 +224,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
&& Objects.equals(modelMemoryLimit, other.modelMemoryLimit)
|
||||
&& Objects.equals(createTime, other.createTime)
|
||||
&& Objects.equals(version, other.version)
|
||||
&& Objects.equals(allowLazyStart, other.allowLazyStart);
|
||||
&& Objects.equals(allowLazyStart, other.allowLazyStart)
|
||||
&& Objects.equals(maxNumThreads, other.maxNumThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart);
|
||||
return Objects.hash(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime, version, allowLazyStart,
|
||||
maxNumThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -237,6 +251,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
private Instant createTime;
|
||||
private Version version;
|
||||
private Boolean allowLazyStart;
|
||||
private Integer maxNumThreads;
|
||||
|
||||
private Builder() {}
|
||||
|
||||
|
@ -290,9 +305,14 @@ public class DataFrameAnalyticsConfig implements ToXContentObject {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxNumThreads(Integer maxNumThreads) {
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataFrameAnalyticsConfig build() {
|
||||
return new DataFrameAnalyticsConfig(id, description, source, dest, analysis, analyzedFields, modelMemoryLimit, createTime,
|
||||
version, allowLazyStart);
|
||||
version, allowLazyStart, maxNumThreads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,22 +51,25 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
|
||||
VALUE);
|
||||
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
|
||||
|
||||
PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS);
|
||||
}
|
||||
|
||||
private final String id;
|
||||
private final String description;
|
||||
private final ByteSizeValue modelMemoryLimit;
|
||||
private final Boolean allowLazyStart;
|
||||
private final Integer maxNumThreads;
|
||||
|
||||
private DataFrameAnalyticsConfigUpdate(String id,
|
||||
@Nullable String description,
|
||||
@Nullable ByteSizeValue modelMemoryLimit,
|
||||
@Nullable Boolean allowLazyStart) {
|
||||
@Nullable Boolean allowLazyStart,
|
||||
@Nullable Integer maxNumThreads) {
|
||||
this.id = id;
|
||||
this.description = description;
|
||||
this.modelMemoryLimit = modelMemoryLimit;
|
||||
this.allowLazyStart = allowLazyStart;
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -85,6 +88,10 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
return allowLazyStart;
|
||||
}
|
||||
|
||||
public Integer getMaxNumThreads() {
|
||||
return maxNumThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -98,6 +105,9 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
if (allowLazyStart != null) {
|
||||
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
|
||||
}
|
||||
if (maxNumThreads != null) {
|
||||
builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -117,12 +127,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
return Objects.equals(this.id, that.id)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
|
||||
&& Objects.equals(this.allowLazyStart, that.allowLazyStart);
|
||||
&& Objects.equals(this.allowLazyStart, that.allowLazyStart)
|
||||
&& Objects.equals(this.maxNumThreads, that.maxNumThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart);
|
||||
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
@ -131,6 +142,7 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
private String description;
|
||||
private ByteSizeValue modelMemoryLimit;
|
||||
private Boolean allowLazyStart;
|
||||
private Integer maxNumThreads;
|
||||
|
||||
private Builder() {}
|
||||
|
||||
|
@ -158,8 +170,13 @@ public class DataFrameAnalyticsConfigUpdate implements ToXContentObject {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxNumThreads(Integer maxNumThreads) {
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataFrameAnalyticsConfigUpdate build() {
|
||||
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart);
|
||||
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1308,6 +1308,7 @@ public class MachineLearningIT extends ESRestHighLevelClientTestCase {
|
|||
assertThat(createdConfig.getAnalyzedFields(), equalTo(config.getAnalyzedFields()));
|
||||
assertThat(createdConfig.getModelMemoryLimit(), equalTo(ByteSizeValue.parseBytesSizeValue("1gb", ""))); // default value
|
||||
assertThat(createdConfig.getDescription(), equalTo("some description"));
|
||||
assertThat(createdConfig.getMaxNumThreads(), equalTo(1));
|
||||
}
|
||||
|
||||
public void testPutDataFrameAnalyticsConfig_GivenRegression() throws Exception {
|
||||
|
|
|
@ -3040,6 +3040,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
.setAnalyzedFields(analyzedFields) // <5>
|
||||
.setModelMemoryLimit(new ByteSizeValue(5, ByteSizeUnit.MB)) // <6>
|
||||
.setDescription("this is an example description") // <7>
|
||||
.setMaxNumThreads(1) // <8>
|
||||
.build();
|
||||
// end::put-data-frame-analytics-config
|
||||
|
||||
|
@ -3096,6 +3097,7 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
|
|||
.setId("my-analytics-config") // <1>
|
||||
.setDescription("new description") // <2>
|
||||
.setModelMemoryLimit(new ByteSizeValue(128, ByteSizeUnit.MB)) // <3>
|
||||
.setMaxNumThreads(4) // <4>
|
||||
.build();
|
||||
// end::update-data-frame-analytics-config-update
|
||||
|
||||
|
|
|
@ -69,6 +69,9 @@ public class DataFrameAnalyticsConfigTests extends AbstractXContentTestCase<Data
|
|||
if (randomBoolean()) {
|
||||
builder.setAllowLazyStart(randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setMaxNumThreads(randomIntBetween(1, 20));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -46,6 +46,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractXContentTestCas
|
|||
if (randomBoolean()) {
|
||||
builder.setAllowLazyStart(randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setMaxNumThreads(randomIntBetween(1, 20));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,7 @@ include-tagged::{doc-tests-file}[{api}-config]
|
|||
<5> The fields to be included in / excluded from the analysis
|
||||
<6> The memory limit for the model created as part of the analysis process
|
||||
<7> Optionally, a human-readable description
|
||||
<8> The maximum number of threads to be used by the analysis. Defaults to 1.
|
||||
|
||||
[id="{upid}-{api}-query-config"]
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ include-tagged::{doc-tests-file}[{api}-config-update]
|
|||
<1> The {dfanalytics-job} ID
|
||||
<2> The human-readable description
|
||||
<3> The memory limit for the model created as part of the analysis process
|
||||
<4> The maximum number of threads to be used by the analysis
|
||||
|
||||
[id="{upid}-{api}-query-config"]
|
||||
|
||||
|
|
|
@ -327,6 +327,14 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
|
|||
(Required, object)
|
||||
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=dest]
|
||||
|
||||
`max_num_threads`::
|
||||
(Optional, integer)
|
||||
The maximum number of threads to be used by the analysis.
|
||||
The default value is `1`. Using more threads may decrease the time
|
||||
necessary to complete the analysis at the cost of using more CPU.
|
||||
Note that the process may use additional threads for operational
|
||||
functionality other than the analysis itself.
|
||||
|
||||
`model_memory_limit`::
|
||||
(Optional, string)
|
||||
The approximate maximum amount of memory resources that are permitted for
|
||||
|
@ -507,7 +515,8 @@ The API returns the following result:
|
|||
"model_memory_limit": "1gb",
|
||||
"create_time" : 1562265491319,
|
||||
"version" : "8.0.0",
|
||||
"allow_lazy_start" : false
|
||||
"allow_lazy_start" : false,
|
||||
"max_num_threads": 1
|
||||
}
|
||||
----
|
||||
// TESTRESPONSE[s/1562265491319/$body.$_path/]
|
||||
|
|
|
@ -71,6 +71,14 @@ the `starting` state until sufficient {ml} node capacity is available.
|
|||
(Optional, string)
|
||||
include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=description-dfa]
|
||||
|
||||
`max_num_threads`::
|
||||
(Optional, integer)
|
||||
The maximum number of threads to be used by the analysis.
|
||||
The default value is `1`. Using more threads may decrease the time
|
||||
necessary to complete the analysis at the cost of using more CPU.
|
||||
Note that the process may use additional threads for operational
|
||||
functionality other than the analysis itself.
|
||||
|
||||
`model_memory_limit`::
|
||||
(Optional, string)
|
||||
The approximate maximum amount of memory resources that are permitted for
|
||||
|
|
|
@ -124,7 +124,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
private final Instant createTime;
|
||||
private final Version version;
|
||||
private final boolean allowLazyStart;
|
||||
private final Integer maxNumThreads;
|
||||
private final int maxNumThreads;
|
||||
|
||||
private DataFrameAnalyticsConfig(String id, String description, DataFrameAnalyticsSource source, DataFrameAnalyticsDest dest,
|
||||
DataFrameAnalysis analysis, Map<String, String> headers, ByteSizeValue modelMemoryLimit,
|
||||
|
@ -141,7 +141,11 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
|
||||
this.version = version;
|
||||
this.allowLazyStart = allowLazyStart;
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
|
||||
if (maxNumThreads != null && maxNumThreads < 1) {
|
||||
throw ExceptionsHelper.badRequestException("[{}] must be a positive integer", MAX_NUM_THREADS.getPreferredName());
|
||||
}
|
||||
this.maxNumThreads = maxNumThreads == null ? 1 : maxNumThreads;
|
||||
}
|
||||
|
||||
public DataFrameAnalyticsConfig(StreamInput in) throws IOException {
|
||||
|
@ -170,9 +174,9 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
allowLazyStart = false;
|
||||
}
|
||||
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
|
||||
maxNumThreads = in.readOptionalVInt();
|
||||
maxNumThreads = in.readVInt();
|
||||
} else {
|
||||
maxNumThreads = null;
|
||||
maxNumThreads = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -256,9 +260,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
builder.field(VERSION.getPreferredName(), version);
|
||||
}
|
||||
builder.field(ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
|
||||
if (maxNumThreads != null) {
|
||||
builder.field(MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -288,7 +290,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
out.writeBoolean(allowLazyStart);
|
||||
}
|
||||
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
|
||||
out.writeOptionalVInt(maxNumThreads);
|
||||
out.writeVInt(maxNumThreads);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -309,7 +311,7 @@ public class DataFrameAnalyticsConfig implements ToXContentObject, Writeable {
|
|||
&& Objects.equals(createTime, other.createTime)
|
||||
&& Objects.equals(version, other.version)
|
||||
&& Objects.equals(allowLazyStart, other.allowLazyStart)
|
||||
&& Objects.equals(maxNumThreads, other.maxNumThreads);
|
||||
&& maxNumThreads == other.maxNumThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.dataframe;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -13,6 +14,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
|||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
@ -33,22 +35,30 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
DataFrameAnalyticsConfig.MODEL_MEMORY_LIMIT,
|
||||
VALUE);
|
||||
PARSER.declareBoolean(Builder::setAllowLazyStart, DataFrameAnalyticsConfig.ALLOW_LAZY_START);
|
||||
|
||||
PARSER.declareInt(Builder::setMaxNumThreads, DataFrameAnalyticsConfig.MAX_NUM_THREADS);
|
||||
}
|
||||
|
||||
private final String id;
|
||||
private final String description;
|
||||
private final ByteSizeValue modelMemoryLimit;
|
||||
private final Boolean allowLazyStart;
|
||||
private final Integer maxNumThreads;
|
||||
|
||||
private DataFrameAnalyticsConfigUpdate(String id,
|
||||
@Nullable String description,
|
||||
@Nullable ByteSizeValue modelMemoryLimit,
|
||||
@Nullable Boolean allowLazyStart) {
|
||||
@Nullable Boolean allowLazyStart,
|
||||
@Nullable Integer maxNumThreads) {
|
||||
this.id = id;
|
||||
this.description = description;
|
||||
this.modelMemoryLimit = modelMemoryLimit;
|
||||
this.allowLazyStart = allowLazyStart;
|
||||
|
||||
if (maxNumThreads != null && maxNumThreads < 1) {
|
||||
throw ExceptionsHelper.badRequestException("[{}] must be a positive integer",
|
||||
DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName());
|
||||
}
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
}
|
||||
|
||||
public DataFrameAnalyticsConfigUpdate(StreamInput in) throws IOException {
|
||||
|
@ -56,6 +66,11 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
this.description = in.readOptionalString();
|
||||
this.modelMemoryLimit = in.readOptionalWriteable(ByteSizeValue::new);
|
||||
this.allowLazyStart = in.readOptionalBoolean();
|
||||
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
|
||||
this.maxNumThreads = in.readOptionalVInt();
|
||||
} else {
|
||||
this.maxNumThreads = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -64,6 +79,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
out.writeOptionalString(description);
|
||||
out.writeOptionalWriteable(modelMemoryLimit);
|
||||
out.writeOptionalBoolean(allowLazyStart);
|
||||
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
|
||||
out.writeOptionalVInt(maxNumThreads);
|
||||
}
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -82,6 +100,10 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
return allowLazyStart;
|
||||
}
|
||||
|
||||
public Integer getMaxNumThreads() {
|
||||
return maxNumThreads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
@ -95,6 +117,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
if (allowLazyStart != null) {
|
||||
builder.field(DataFrameAnalyticsConfig.ALLOW_LAZY_START.getPreferredName(), allowLazyStart);
|
||||
}
|
||||
if (maxNumThreads != null) {
|
||||
builder.field(DataFrameAnalyticsConfig.MAX_NUM_THREADS.getPreferredName(), maxNumThreads);
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -120,6 +145,9 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
if (allowLazyStart != null) {
|
||||
builder.setAllowLazyStart(allowLazyStart);
|
||||
}
|
||||
if (maxNumThreads != null) {
|
||||
builder.setMaxNumThreads(maxNumThreads);
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
|
@ -127,7 +155,8 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
* Whether this update applied to the given source config requires analytics task restart.
|
||||
*/
|
||||
public boolean requiresRestart(DataFrameAnalyticsConfig source) {
|
||||
return getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false;
|
||||
return (getModelMemoryLimit() != null && getModelMemoryLimit().equals(source.getModelMemoryLimit()) == false)
|
||||
|| (getMaxNumThreads() != null && getMaxNumThreads().equals(source.getMaxNumThreads()) == false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -145,12 +174,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
return Objects.equals(this.id, that.id)
|
||||
&& Objects.equals(this.description, that.description)
|
||||
&& Objects.equals(this.modelMemoryLimit, that.modelMemoryLimit)
|
||||
&& Objects.equals(this.allowLazyStart, that.allowLazyStart);
|
||||
&& Objects.equals(this.allowLazyStart, that.allowLazyStart)
|
||||
&& Objects.equals(this.maxNumThreads, that.maxNumThreads);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart);
|
||||
return Objects.hash(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
@ -159,6 +189,7 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
private String description;
|
||||
private ByteSizeValue modelMemoryLimit;
|
||||
private Boolean allowLazyStart;
|
||||
private Integer maxNumThreads;
|
||||
|
||||
public Builder(String id) {
|
||||
this.id = id;
|
||||
|
@ -188,8 +219,13 @@ public class DataFrameAnalyticsConfigUpdate implements Writeable, ToXContentObje
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxNumThreads(Integer maxNumThreads) {
|
||||
this.maxNumThreads = maxNumThreads;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DataFrameAnalyticsConfigUpdate build() {
|
||||
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart);
|
||||
return new DataFrameAnalyticsConfigUpdate(id, description, modelMemoryLimit, allowLazyStart, maxNumThreads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.index.query.MatchAllQueryBuilder;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
|
||||
import org.elasticsearch.xpack.core.ml.AbstractBWCSerializationTestCase;
|
||||
|
@ -496,6 +497,32 @@ public class DataFrameAnalyticsConfigTests extends AbstractBWCSerializationTestC
|
|||
assertThat(DataFrameAnalyticsConfig.extractJobIdFromDocId("foo"), is(nullValue()));
|
||||
}
|
||||
|
||||
public void testCtor_GivenMaxNumThreadsIsZero() {
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
|
||||
.setId("test_config")
|
||||
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
|
||||
.setDest(new DataFrameAnalyticsDest("dest_index", null))
|
||||
.setAnalysis(new Regression("foo"))
|
||||
.setMaxNumThreads(0)
|
||||
.build());
|
||||
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
|
||||
}
|
||||
|
||||
public void testCtor_GivenMaxNumThreadsIsNegative() {
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> new DataFrameAnalyticsConfig.Builder()
|
||||
.setId("test_config")
|
||||
.setSource(new DataFrameAnalyticsSource(new String[] {"source_index"}, null, null))
|
||||
.setDest(new DataFrameAnalyticsDest("dest_index", null))
|
||||
.setAnalysis(new Regression("foo"))
|
||||
.setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0))
|
||||
.build());
|
||||
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
|
||||
}
|
||||
|
||||
private static void assertTooSmall(ElasticsearchStatusException e) {
|
||||
assertThat(e.getMessage(), startsWith("model_memory_limit must be at least 1kb."));
|
||||
}
|
||||
|
|
|
@ -5,9 +5,11 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.core.ml.dataframe;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.AbstractSerializingTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -47,6 +49,9 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
|
|||
if (randomBoolean()) {
|
||||
builder.setAllowLazyStart(randomBoolean());
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
builder.setMaxNumThreads(randomIntBetween(1, 20));
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
@ -81,6 +86,15 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
|
|||
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setAllowLazyStart(true).build())));
|
||||
}
|
||||
|
||||
public void testMergeWithConfig_UpdatedMaxNumThreads() {
|
||||
String id = randomValidId();
|
||||
DataFrameAnalyticsConfig config = DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(3).build();
|
||||
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(5).build();
|
||||
assertThat(
|
||||
update.mergeWithConfig(config).build(),
|
||||
is(equalTo(new DataFrameAnalyticsConfig.Builder(config).setMaxNumThreads(5).build())));
|
||||
}
|
||||
|
||||
public void testMergeWithConfig_UpdatedAllUpdatableProperties() {
|
||||
String id = randomValidId();
|
||||
DataFrameAnalyticsConfig config =
|
||||
|
@ -88,12 +102,14 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
|
|||
.setDescription("old description")
|
||||
.setModelMemoryLimit(new ByteSizeValue(1024))
|
||||
.setAllowLazyStart(false)
|
||||
.setMaxNumThreads(1)
|
||||
.build();
|
||||
DataFrameAnalyticsConfigUpdate update =
|
||||
new DataFrameAnalyticsConfigUpdate.Builder(id)
|
||||
.setDescription("new description")
|
||||
.setModelMemoryLimit(new ByteSizeValue(2048))
|
||||
.setAllowLazyStart(true)
|
||||
.setMaxNumThreads(4)
|
||||
.build();
|
||||
assertThat(
|
||||
update.mergeWithConfig(config).build(),
|
||||
|
@ -102,6 +118,7 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
|
|||
.setDescription("new description")
|
||||
.setModelMemoryLimit(new ByteSizeValue(2048))
|
||||
.setAllowLazyStart(true)
|
||||
.setMaxNumThreads(4)
|
||||
.build())));
|
||||
}
|
||||
|
||||
|
@ -155,9 +172,35 @@ public class DataFrameAnalyticsConfigUpdateTests extends AbstractSerializingTest
|
|||
assertThat(update.requiresRestart(config), is(true));
|
||||
}
|
||||
|
||||
public void testRequiresRestart_MaxNumThreadsUpdateRequiresRestart() {
|
||||
String id = randomValidId();
|
||||
DataFrameAnalyticsConfig config =
|
||||
DataFrameAnalyticsConfigTests.createRandomBuilder(id).setMaxNumThreads(1).build();
|
||||
DataFrameAnalyticsConfigUpdate update = new DataFrameAnalyticsConfigUpdate.Builder(id).setMaxNumThreads(8).build();
|
||||
|
||||
assertThat(update.requiresRestart(config), is(true));
|
||||
}
|
||||
|
||||
public void testCtor_GivenMaxNumberThreadsIsZero() {
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(0).build());
|
||||
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
|
||||
}
|
||||
|
||||
public void testCtor_GivenMaxNumberThreadsIsNegative() {
|
||||
ElasticsearchException e = expectThrows(ElasticsearchException.class,
|
||||
() -> new DataFrameAnalyticsConfigUpdate.Builder("test").setMaxNumThreads(randomIntBetween(Integer.MIN_VALUE, 0)).build());
|
||||
|
||||
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
|
||||
assertThat(e.getMessage(), equalTo("[max_num_threads] must be a positive integer"));
|
||||
}
|
||||
|
||||
private boolean isNoop(DataFrameAnalyticsConfig config, DataFrameAnalyticsConfigUpdate update) {
|
||||
return (update.getDescription() == null || Objects.equals(config.getDescription(), update.getDescription()))
|
||||
&& (update.getModelMemoryLimit() == null || Objects.equals(config.getModelMemoryLimit(), update.getModelMemoryLimit()))
|
||||
&& (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart()));
|
||||
&& (update.isAllowLazyStart() == null || Objects.equals(config.isAllowLazyStart(), update.isAllowLazyStart()))
|
||||
&& (update.getMaxNumThreads() == null || Objects.equals(config.getMaxNumThreads(), update.getMaxNumThreads()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -473,7 +473,7 @@ public class AnalyticsProcessManager {
|
|||
ExtractedFields extractedFields) {
|
||||
DataFrameDataExtractor.DataSummary dataSummary = dataExtractor.collectDataSummary();
|
||||
Set<String> categoricalFields = dataExtractor.getCategoricalFields(config.getAnalysis());
|
||||
int threads = config.getMaxNumThreads() == null ? 1 : Math.min(config.getMaxNumThreads(), numAllocatedProcessors);
|
||||
int threads = Math.min(config.getMaxNumThreads(), numAllocatedProcessors);
|
||||
return new AnalyticsProcessConfig(
|
||||
config.getId(),
|
||||
dataSummary.rows,
|
||||
|
|
|
@ -2118,12 +2118,14 @@ setup:
|
|||
"analysis": {"outlier_detection":{}},
|
||||
"description": "before update",
|
||||
"model_memory_limit": "20mb",
|
||||
"allow_lazy_start": false
|
||||
"allow_lazy_start": false,
|
||||
"max_num_threads": 1
|
||||
}
|
||||
- match: { id: "update-test-job" }
|
||||
- match: { description: "before update" }
|
||||
- match: { model_memory_limit: "20mb" }
|
||||
- match: { allow_lazy_start: false }
|
||||
- match: { max_num_threads: 1 }
|
||||
|
||||
- do:
|
||||
ml.update_data_frame_analytics:
|
||||
|
@ -2132,12 +2134,14 @@ setup:
|
|||
{
|
||||
"description": "after update",
|
||||
"model_memory_limit": "30mb",
|
||||
"allow_lazy_start": true
|
||||
"allow_lazy_start": true,
|
||||
"max_num_threads": 2
|
||||
}
|
||||
- match: { id: "update-test-job" }
|
||||
- match: { description: "after update" }
|
||||
- match: { model_memory_limit: "30mb" }
|
||||
- match: { allow_lazy_start: true }
|
||||
- match: { max_num_threads: 2 }
|
||||
|
||||
---
|
||||
"Test update given missing analytics":
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue