[ML] relax throttling on expired data cleanup (#56711)

Throttling nightly cleanup as much as we do has been over cautious.

Night cleanup should be more lenient in its throttling. We still
keep the same batch size, but now the requests per second scale
with the number of data nodes. If we have more than 5 data nodes,
we don't throttle at all.

Additionally, the API now has `requests_per_second` and `timeout` set.
So users calling the API directly can set the throttling.

This commit also adds a new setting `xpack.ml.nightly_maintenance_requests_per_second`.
This will allow users to adjust throttling of the nightly maintenance.
This commit is contained in:
Benjamin Trent 2020-05-18 07:21:06 -04:00 committed by GitHub
parent 79a69cb676
commit 8fed077b0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 577 additions and 181 deletions

View file

@ -167,13 +167,13 @@ final class MLRequestConverters {
return request; return request;
} }
static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) { static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) throws IOException {
String endpoint = new EndpointBuilder() String endpoint = new EndpointBuilder()
.addPathPartAsIs("_ml") .addPathPartAsIs("_ml")
.addPathPartAsIs("_delete_expired_data") .addPathPartAsIs("_delete_expired_data")
.build(); .build();
Request request = new Request(HttpDelete.METHOD_NAME, endpoint); Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE));
return request; return request;
} }

View file

@ -19,16 +19,78 @@
package org.elasticsearch.client.ml; package org.elasticsearch.client.ml;
import org.elasticsearch.client.Validatable; import org.elasticsearch.client.Validatable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
/** /**
* Request to delete expired model snapshots and forecasts * Request to delete expired model snapshots and forecasts
*/ */
public class DeleteExpiredDataRequest implements Validatable { public class DeleteExpiredDataRequest implements Validatable, ToXContentObject {
static final String REQUESTS_PER_SECOND = "requests_per_second";
static final String TIMEOUT = "timeout";
private final Float requestsPerSecond;
private final TimeValue timeout;
/** /**
* Create a new request to delete expired data * Create a new request to delete expired data
*/ */
public DeleteExpiredDataRequest() { public DeleteExpiredDataRequest() {
this(null, null);
} }
public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) {
this.requestsPerSecond = requestsPerSecond;
this.timeout = timeout;
}
/**
* The requests allowed per second in the underlying Delete by Query requests executed.
*
* `-1.0f` indicates that the standard nightly cleanup behavior should be ran.
* Throttling scales according to the number of data nodes.
* `null` is default and means no throttling will occur.
*/
public Float getRequestsPerSecond() {
return requestsPerSecond;
}
/**
* Indicates how long the deletion request will run until it timesout.
*
* Default value is 8 hours.
*/
public TimeValue getTimeout() {
return timeout;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o;
return Objects.equals(requestsPerSecond, that.requestsPerSecond) &&
Objects.equals(timeout, that.timeout);
}
@Override
public int hashCode() {
return Objects.hash(requestsPerSecond, timeout);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (requestsPerSecond != null) {
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
}
if (timeout != null) {
builder.field(TIMEOUT, timeout.getStringRep());
}
builder.endObject();
return builder;
}
} }

View file

@ -214,12 +214,16 @@ public class MLRequestConvertersTests extends ESTestCase {
requestEntityToString(request)); requestEntityToString(request));
} }
public void testDeleteExpiredData() { public void testDeleteExpiredData() throws Exception {
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(); float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false);
DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(
requestsPerSec,
TimeValue.timeValueHours(1));
Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest); Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest);
assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals("/_ml/_delete_expired_data", request.getEndpoint()); assertEquals("/_ml/_delete_expired_data", request.getEndpoint());
assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request));
} }
public void testDeleteJob() { public void testDeleteJob() {

View file

@ -2035,7 +2035,11 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase {
MachineLearningIT.buildJob(jobId); MachineLearningIT.buildJob(jobId);
{ {
// tag::delete-expired-data-request // tag::delete-expired-data-request
DeleteExpiredDataRequest request = new DeleteExpiredDataRequest(); // <1> DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1>
1000.0f, // <2>
TimeValue.timeValueHours(12) // <3>
);
// end::delete-expired-data-request // end::delete-expired-data-request
// tag::delete-expired-data-execute // tag::delete-expired-data-execute

View file

@ -0,0 +1,62 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.ml;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.AbstractXContentTestCase;
import java.io.IOException;
public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase<DeleteExpiredDataRequest> {
private static ConstructingObjectParser<DeleteExpiredDataRequest, Void> PARSER = new ConstructingObjectParser<>(
"delete_expired_data_request",
true,
(a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1])
);
static {
PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(),
new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND));
PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), DeleteExpiredDataRequest.TIMEOUT),
new ParseField(DeleteExpiredDataRequest.TIMEOUT),
ObjectParser.ValueType.STRING);
}
@Override
protected DeleteExpiredDataRequest createTestInstance() {
return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test"));
}
@Override
protected DeleteExpiredDataRequest doParseInstance(XContentParser parser) throws IOException {
return PARSER.apply(parser, null);
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
}

View file

@ -21,6 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments.
include-tagged::{doc-tests-file}[{api}-request] include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------- ---------------------------------------------------
<1> Constructing a new request. <1> Constructing a new request.
<2> Providing requests per second throttling for the
deletion processes. Default is no throttling.
<3> Setting how long the deletion processes will be allowed
to run before they are canceled. Default value is `8h` (8 hours).
[id="{upid}-{api}-response"] [id="{upid}-{api}-response"]
==== Delete Expired Data Response ==== Delete Expired Data Response

View file

@ -27,6 +27,17 @@ Deletes all job results, model snapshots and forecast data that have exceeded
their `retention days` period. Machine learning state documents that are not their `retention days` period. Machine learning state documents that are not
associated with any job are also deleted. associated with any job are also deleted.
[[ml-delete-expired-data-request-body]]
==== {api-request-body-title}
`requests_per_second`::
(Optional, float) The desired requests per second for the deletion processes.
The default behavior is no throttling.
`timeout`::
(Optional, string) How long can the underlying delete processes run until they are canceled.
The default value is `8h` (8 hours).
[[ml-delete-expired-data-example]] [[ml-delete-expired-data-example]]
==== {api-examples-title} ==== {api-examples-title}

View file

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.core.ml.action; package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
@ -14,6 +15,8 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -31,20 +34,94 @@ public class DeleteExpiredDataAction extends ActionType<DeleteExpiredDataAction.
public static class Request extends ActionRequest { public static class Request extends ActionRequest {
public static final ParseField REQUESTS_PER_SECOND = new ParseField("requests_per_second");
public static final ParseField TIMEOUT = new ParseField("timeout");
public static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(
"delete_expired_data_request",
false,
Request::new);
static {
PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND);
PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())),
TIMEOUT);
}
private Float requestsPerSecond;
private TimeValue timeout;
public Request() {} public Request() {}
public Request(Float requestsPerSecond, TimeValue timeValue) {
this.requestsPerSecond = requestsPerSecond;
this.timeout = timeValue;
}
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
this.requestsPerSecond = in.readOptionalFloat();
this.timeout = in.readOptionalTimeValue();
} else {
this.requestsPerSecond = null;
this.timeout = null;
}
}
public Float getRequestsPerSecond() {
return requestsPerSecond;
}
public TimeValue getTimeout() {
return timeout;
}
public Request setRequestsPerSecond(Float requestsPerSecond) {
this.requestsPerSecond = requestsPerSecond;
return this;
}
public Request setTimeout(TimeValue timeout) {
this.timeout = timeout;
return this;
} }
@Override @Override
public ActionRequestValidationException validate() { public ActionRequestValidationException validate() {
if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) {
ActionRequestValidationException requestValidationException = new ActionRequestValidationException();
requestValidationException.addValidationError("[requests_per_second] must either be -1 or greater than 0");
return requestValidationException;
}
return null; return null;
} }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(requestsPerSecond, request.requestsPerSecond)
&& Objects.equals(timeout, request.timeout);
}
@Override
public int hashCode() {
return Objects.hash(requestsPerSecond, timeout);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeOptionalFloat(requestsPerSecond);
out.writeOptionalTimeValue(timeout);
}
}
} }
static class RequestBuilder extends ActionRequestBuilder<Request, Response> { static class RequestBuilder extends ActionRequestBuilder<Request, Response> {
RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) { RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) {
super(client, action, new Request()); super(client, action, new Request());
} }

View file

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ml.action;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase;
import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction.Request;
public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializationTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")
);
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request mutateInstanceForVersion(Request instance, Version version) {
if (version.before(Version.V_8_0_0)) {
return new Request();
}
return instance;
}
}

View file

@ -95,11 +95,57 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
} }
public void testDeleteExpiredData() throws Exception { public void testDeleteExpiredDataNoThrottle() throws Exception {
testExpiredDeletion(null, 10010);
}
/**
* Verifies empty state indices deletion. Here is the summary of indices used by the test:
*
* +------------------+--------+----------+-------------------------+
* | index name | empty? | current? | expected to be removed? |
* +------------------+--------+----------+-------------------------+
* | .ml-state | yes | no | yes |
* | .ml-state-000001 | no | no | no |
* | .ml-state-000003 | yes | no | yes |
* | .ml-state-000005 | no | no | no |
* | .ml-state-000007 | yes | yes | no |
* +------------------+--------+----------+-------------------------+
*/
public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception {
client().admin().indices().prepareCreate(".ml-state").get();
client().admin().indices().prepareCreate(".ml-state-000001").get();
client().prepareIndex(".ml-state-000001").setSource("field_1", "value_1").get();
client().admin().indices().prepareCreate(".ml-state-000003").get();
client().admin().indices().prepareCreate(".ml-state-000005").get();
client().prepareIndex(".ml-state-000005").setSource("field_5", "value_5").get();
client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get();
refresh();
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007")));
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
refresh();
getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
// Only non-empty or current indices should survive deletion process
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}
public void testDeleteExpiredDataWithStandardThrottle() throws Exception {
testExpiredDeletion(-1.0f, 100);
}
private void testExpiredDeletion(Float customThrottle, int numUnusedState) throws Exception {
// Index some unused state documents (more than 10K to test scrolling works) // Index some unused state documents (more than 10K to test scrolling works)
String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001";
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < 10010; i++) { for (int i = 0; i < numUnusedState; i++) {
String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i);
IndexRequest indexRequest = IndexRequest indexRequest =
new IndexRequest(mlStateIndexName) new IndexRequest(mlStateIndexName)
@ -165,10 +211,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment
// the DeleteExpiredDataAction is called. // the DeleteExpiredDataAction is called.
String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1)); String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1));
shortExpiryForecastIds.add(forecastShortExpiryId); shortExpiryForecastIds.add(forecastShortExpiryId);
String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null); String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), null);
String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO); String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO);
waitForecastToFinish(job.getId(), forecastShortExpiryId); waitForecastToFinish(job.getId(), forecastShortExpiryId);
waitForecastToFinish(job.getId(), forecastDefaultExpiryId); waitForecastToFinish(job.getId(), forecastDefaultExpiryId);
waitForecastToFinish(job.getId(), forecastNoExpiryId); waitForecastToFinish(job.getId(), forecastNoExpiryId);
@ -195,9 +241,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
retainAllSnapshots("snapshots-retention-with-retain"); retainAllSnapshots("snapshots-retention-with-retain");
long totalModelSizeStatsBeforeDelete = client().prepareSearch("*") long totalModelSizeStatsBeforeDelete = client().prepareSearch("*")
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value; .get().getHits().getTotalHits().value;
long totalNotificationsCountBeforeDelete = long totalNotificationsCountBeforeDelete =
client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L)); assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L));
@ -214,7 +260,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK)); assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK));
// Now call the action under test // Now call the action under test
assertThat(deleteExpiredData().isDeleted(), is(true)); assertThat(deleteExpiredData(customThrottle).isDeleted(), is(true));
// no-retention job should have kept all data // no-retention job should have kept all data
assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70)));
@ -244,9 +290,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1)); assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1));
long totalModelSizeStatsAfterDelete = client().prepareSearch("*") long totalModelSizeStatsAfterDelete = client().prepareSearch("*")
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats"))
.get().getHits().getTotalHits().value; .get().getHits().getTotalHits().value;
long totalNotificationsCountAfterDelete = long totalNotificationsCountAfterDelete =
client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value;
assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete)); assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete));
@ -266,10 +312,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
// Verify .ml-state doesn't contain unused state documents // Verify .ml-state doesn't contain unused state documents
SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern())
.setFetchSource(false) .setFetchSource(false)
.setTrackTotalHits(true) .setTrackTotalHits(true)
.setSize(10000) .setSize(10000)
.get(); .get();
// Assert at least one state doc for each job // Assert at least one state doc for each job
assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L)); assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L));
@ -288,44 +334,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase {
nonExistingJobDocsCount, equalTo(0)); nonExistingJobDocsCount, equalTo(0));
} }
/**
* Verifies empty state indices deletion. Here is the summary of indices used by the test:
*
* +------------------+--------+----------+-------------------------+
* | index name | empty? | current? | expected to be removed? |
* +------------------+--------+----------+-------------------------+
* | .ml-state | yes | no | yes |
* | .ml-state-000001 | no | no | no |
* | .ml-state-000003 | yes | no | yes |
* | .ml-state-000005 | no | no | no |
* | .ml-state-000007 | yes | yes | no |
* +------------------+--------+----------+-------------------------+
*/
public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception {
client().admin().indices().prepareCreate(".ml-state").get();
client().admin().indices().prepareCreate(".ml-state-000001").get();
client().prepareIndex(".ml-state-000001").setSource("field_1", "value_1").get();
client().admin().indices().prepareCreate(".ml-state-000003").get();
client().admin().indices().prepareCreate(".ml-state-000005").get();
client().prepareIndex(".ml-state-000005").setSource("field_5", "value_5").get();
client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get();
refresh();
GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007")));
client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get();
refresh();
getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get();
assertThat(Strings.toString(getIndexResponse),
getIndexResponse.getIndices(),
// Only non-empty or current indices should survive deletion process
is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007")));
}
private static Job.Builder newJobBuilder(String id) { private static Job.Builder newJobBuilder(String id) {
Detector.Builder detector = new Detector.Builder(); Detector.Builder detector = new Detector.Builder();
detector.setFunction("count"); detector.setFunction("count");

View file

@ -221,6 +221,16 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase {
return response; return response;
} }
protected DeleteExpiredDataAction.Response deleteExpiredData(Float customThrottle) throws Exception {
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request();
request.setRequestsPerSecond(customThrottle);
DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE, request).get();
// We need to refresh to ensure the deletion is visible
refresh("*");
return response;
}
protected PutFilterAction.Response putMlFilter(MlFilter filter) { protected PutFilterAction.Response putMlFilter(MlFilter filter) {
return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet();
} }

View file

@ -421,6 +421,23 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP = public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
// Requests per second throttling for the nightly maintenance task
public static final Setting<Float> NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND =
new Setting<>(
"xpack.ml.nightly_maintenance_requests_per_second",
(s) -> Float.toString(-1.0f),
(s) -> {
float value = Float.parseFloat(s);
if (value <= 0.0f && value != -1.0f) {
throw new IllegalArgumentException("Failed to parse value [" +
s + "] for setting [xpack.ml.nightly_maintenance_requests_per_second] must be > 0.0 or exactly equal to -1.0");
}
return value;
},
Property.Dynamic,
Property.NodeScope
);
private static final Logger logger = LogManager.getLogger(MachineLearning.class); private static final Logger logger = LogManager.getLogger(MachineLearning.class);
private final Settings settings; private final Settings settings;
@ -466,7 +483,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys
InferenceProcessor.MAX_INFERENCE_PROCESSORS, InferenceProcessor.MAX_INFERENCE_PROCESSORS,
ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE, ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE,
ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, ModelLoadingService.INFERENCE_MODEL_CACHE_TTL,
ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES,
NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND
); );
} }

View file

@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
@ -51,19 +52,25 @@ public class MlDailyMaintenanceService implements Releasable {
private final Supplier<TimeValue> schedulerProvider; private final Supplier<TimeValue> schedulerProvider;
private volatile Scheduler.Cancellable cancellable; private volatile Scheduler.Cancellable cancellable;
private volatile float deleteExpiredDataRequestsPerSecond;
MlDailyMaintenanceService(ThreadPool threadPool, Client client, ClusterService clusterService, MlDailyMaintenanceService(Settings settings, ThreadPool threadPool, Client client, ClusterService clusterService,
MlAssignmentNotifier mlAssignmentNotifier, Supplier<TimeValue> scheduleProvider) { MlAssignmentNotifier mlAssignmentNotifier, Supplier<TimeValue> scheduleProvider) {
this.threadPool = Objects.requireNonNull(threadPool); this.threadPool = Objects.requireNonNull(threadPool);
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);
this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier);
this.schedulerProvider = Objects.requireNonNull(scheduleProvider); this.schedulerProvider = Objects.requireNonNull(scheduleProvider);
this.deleteExpiredDataRequestsPerSecond = MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND.get(settings);
} }
public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client, ClusterService clusterService, public MlDailyMaintenanceService(Settings settings, ClusterName clusterName, ThreadPool threadPool,
MlAssignmentNotifier mlAssignmentNotifier) { Client client, ClusterService clusterService, MlAssignmentNotifier mlAssignmentNotifier) {
this(threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); this(settings, threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName));
}
void setDeleteExpiredDataRequestsPerSecond(float value) {
this.deleteExpiredDataRequestsPerSecond = value;
} }
/** /**
@ -101,7 +108,7 @@ public class MlDailyMaintenanceService implements Releasable {
} }
} }
public boolean isStarted() { boolean isStarted() {
return cancellable != null; return cancellable != null;
} }
@ -129,7 +136,10 @@ public class MlDailyMaintenanceService implements Releasable {
return; return;
} }
LOGGER.info("triggering scheduled [ML] maintenance tasks"); LOGGER.info("triggering scheduled [ML] maintenance tasks");
executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), executeAsyncWithOrigin(client,
ML_ORIGIN,
DeleteExpiredDataAction.INSTANCE,
new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)),
ActionListener.wrap( ActionListener.wrap(
response -> { response -> {
if (response.isDeleted()) { if (response.isDeleted()) {

View file

@ -26,34 +26,56 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi
private static final Logger logger = LogManager.getLogger(MlInitializationService.class); private static final Logger logger = LogManager.getLogger(MlInitializationService.class);
private final Settings settings;
private final ThreadPool threadPool;
private final ClusterService clusterService;
private final Client client; private final Client client;
private final MlAssignmentNotifier mlAssignmentNotifier;
private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false); private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false);
private volatile MlDailyMaintenanceService mlDailyMaintenanceService; private final MlDailyMaintenanceService mlDailyMaintenanceService;
MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client,
MlAssignmentNotifier mlAssignmentNotifier) { MlAssignmentNotifier mlAssignmentNotifier) {
this.settings = Objects.requireNonNull(settings); this(client,
this.threadPool = Objects.requireNonNull(threadPool); new MlDailyMaintenanceService(
this.clusterService = Objects.requireNonNull(clusterService); settings,
Objects.requireNonNull(clusterService).getClusterName(),
threadPool,
client,
clusterService,
mlAssignmentNotifier
),
clusterService);
}
// For testing
MlInitializationService(Client client, MlDailyMaintenanceService dailyMaintenanceService, ClusterService clusterService) {
this.client = Objects.requireNonNull(client); this.client = Objects.requireNonNull(client);
this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); this.mlDailyMaintenanceService = dailyMaintenanceService;
clusterService.addListener(this); clusterService.addListener(this);
clusterService.addLocalNodeMasterListener(this); clusterService.addLocalNodeMasterListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void afterStart() {
clusterService.getClusterSettings().addSettingsUpdateConsumer(
MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND,
mlDailyMaintenanceService::setDeleteExpiredDataRequestsPerSecond
);
}
@Override
public void beforeStop() {
offMaster();
}
});
} }
@Override @Override
public void onMaster() { public void onMaster() {
installDailyMaintenanceService(); mlDailyMaintenanceService.start();
} }
@Override @Override
public void offMaster() { public void offMaster() {
uninstallDailyMaintenanceService(); mlDailyMaintenanceService.stop();
} }
@Override @Override
@ -85,35 +107,10 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi
return ThreadPool.Names.GENERIC; return ThreadPool.Names.GENERIC;
} }
private synchronized void installDailyMaintenanceService() {
if (mlDailyMaintenanceService == null) {
mlDailyMaintenanceService =
new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client, clusterService, mlAssignmentNotifier);
mlDailyMaintenanceService.start();
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
uninstallDailyMaintenanceService();
}
});
}
}
private synchronized void uninstallDailyMaintenanceService() {
if (mlDailyMaintenanceService != null) {
mlDailyMaintenanceService.stop();
mlDailyMaintenanceService = null;
}
}
/** For testing */ /** For testing */
MlDailyMaintenanceService getDailyMaintenanceService() { MlDailyMaintenanceService getDailyMaintenanceService() {
return mlDailyMaintenanceService; return mlDailyMaintenanceService;
} }
/** For testing */
synchronized void setDailyMaintenanceService(MlDailyMaintenanceService service) {
mlDailyMaintenanceService = service;
}
} }

View file

@ -15,6 +15,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
@ -43,8 +44,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
private static final Logger logger = LogManager.getLogger(TransportDeleteExpiredDataAction.class); private static final Logger logger = LogManager.getLogger(TransportDeleteExpiredDataAction.class);
// TODO: make configurable in the request static final Duration DEFAULT_MAX_DURATION = Duration.ofHours(8);
static final Duration MAX_DURATION = Duration.ofHours(8);
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final String executor; private final String executor;
@ -73,12 +73,18 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
protected void doExecute(Task task, DeleteExpiredDataAction.Request request, protected void doExecute(Task task, DeleteExpiredDataAction.Request request,
ActionListener<DeleteExpiredDataAction.Response> listener) { ActionListener<DeleteExpiredDataAction.Response> listener) {
logger.info("Deleting expired data"); logger.info("Deleting expired data");
Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION); Instant timeoutTime = Instant.now(clock).plus(
request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis())
);
Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); Supplier<Boolean> isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime);
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier)); threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(
() -> deleteExpiredData(request, listener, isTimedOutSupplier)
);
} }
private void deleteExpiredData(ActionListener<DeleteExpiredDataAction.Response> listener, private void deleteExpiredData(DeleteExpiredDataAction.Request request,
ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier) { Supplier<Boolean> isTimedOutSupplier) {
AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName());
List<MlDataRemover> dataRemovers = Arrays.asList( List<MlDataRemover> dataRemovers = Arrays.asList(
@ -89,24 +95,43 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction<Del
new EmptyStateIndexRemover(client) new EmptyStateIndexRemover(client)
); );
Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); Iterator<MlDataRemover> dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers);
deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); // If there is no throttle provided, default to none
float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond();
int numberOfDatanodes = Math.max(clusterService.state().getNodes().getDataNodes().size(), 1);
if (requestsPerSec == -1.0f) {
// With DEFAULT_SCROLL_SIZE = 1000 and a single data node this implies we spread deletion of
// 1 million documents over 5000 seconds ~= 83 minutes.
// If we have > 5 data nodes, we don't set our throttling.
requestsPerSec = numberOfDatanodes < 5 ?
(float)(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes :
Float.POSITIVE_INFINITY;
}
deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true);
} }
void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator, void deleteExpiredData(Iterator<MlDataRemover> mlDataRemoversIterator,
float requestsPerSecond,
ActionListener<DeleteExpiredDataAction.Response> listener, ActionListener<DeleteExpiredDataAction.Response> listener,
Supplier<Boolean> isTimedOutSupplier, Supplier<Boolean> isTimedOutSupplier,
boolean haveAllPreviousDeletionsCompleted) { boolean haveAllPreviousDeletionsCompleted) {
if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) { if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) {
MlDataRemover remover = mlDataRemoversIterator.next(); MlDataRemover remover = mlDataRemoversIterator.next();
ActionListener<Boolean> nextListener = ActionListener.wrap( ActionListener<Boolean> nextListener = ActionListener.wrap(
booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse), booleanResponse ->
deleteExpiredData(
mlDataRemoversIterator,
requestsPerSecond,
listener,
isTimedOutSupplier,
booleanResponse
),
listener::onFailure); listener::onFailure);
// Removing expired ML data and artifacts requires multiple operations. // Removing expired ML data and artifacts requires multiple operations.
// These are queued up and executed sequentially in the action listener, // These are queued up and executed sequentially in the action listener,
// the chained calls must all run the ML utility thread pool NOT the thread // the chained calls must all run the ML utility thread pool NOT the thread
// the previous action returned in which in the case of a transport_client_boss // the previous action returned in which in the case of a transport_client_boss
// thread is a disaster. // thread is a disaster.
remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), remover.remove(requestsPerSecond, new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false),
isTimedOutSupplier); isTimedOutSupplier);
} else { } else {
if (haveAllPreviousDeletionsCompleted) { if (haveAllPreviousDeletionsCompleted) {

View file

@ -38,11 +38,15 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
} }
@Override @Override
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { public void remove(float requestsPerSecond,
removeData(newJobIterator(), listener, isTimedOutSupplier); ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) {
removeData(newJobIterator(), requestsPerSecond, listener, isTimedOutSupplier);
} }
private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener<Boolean> listener, private void removeData(WrappedBatchedJobsIterator jobIterator,
float requestsPerSecond,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier) { Supplier<Boolean> isTimedOutSupplier) {
if (jobIterator.hasNext() == false) { if (jobIterator.hasNext() == false) {
listener.onResponse(true); listener.onResponse(true);
@ -62,17 +66,17 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
Long retentionDays = getRetentionDays(job); Long retentionDays = getRetentionDays(job);
if (retentionDays == null) { if (retentionDays == null) {
removeData(jobIterator, listener, isTimedOutSupplier); removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier);
return; return;
} }
calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap( calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap(
response -> { response -> {
if (response == null) { if (response == null) {
removeData(jobIterator, listener, isTimedOutSupplier); removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier);
} else { } else {
removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( removeDataBefore(job, requestsPerSecond, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap(
r -> removeData(jobIterator, listener, isTimedOutSupplier), r -> removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier),
listener::onFailure)); listener::onFailure));
} }
}, },
@ -93,7 +97,13 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover {
* Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Template method to allow implementation details of various types of data (e.g. results, model snapshots).
* Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job.
*/ */
abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener); abstract void removeDataBefore(
Job job,
float requestsPerSecond,
long latestTimeMs,
long cutoffEpochMs,
ActionListener<Boolean> listener
);
static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) {
return QueryBuilders.boolQuery() return QueryBuilders.boolQuery()

View file

@ -32,7 +32,7 @@ public class EmptyStateIndexRemover implements MlDataRemover {
} }
@Override @Override
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { public void remove(float requestsPerSec, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
try { try {
if (isTimedOutSupplier.get()) { if (isTimedOutSupplier.get()) {
listener.onResponse(false); listener.onResponse(false);

View file

@ -73,10 +73,10 @@ public class ExpiredForecastsRemover implements MlDataRemover {
} }
@Override @Override
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { public void remove(float requestsPerSec, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs); LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs);
ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap( ActionListener<SearchResponse> forecastStatsHandler = ActionListener.wrap(
searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier), searchResponse -> deleteForecasts(searchResponse, requestsPerSec, listener, isTimedOutSupplier),
e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e)));
SearchSourceBuilder source = new SearchSourceBuilder(); SearchSourceBuilder source = new SearchSourceBuilder();
@ -95,7 +95,12 @@ public class ExpiredForecastsRemover implements MlDataRemover {
MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false));
} }
private void deleteForecasts(SearchResponse searchResponse, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { private void deleteForecasts(
SearchResponse searchResponse,
float requestsPerSec,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier
) {
List<ForecastRequestStats> forecastsToDelete; List<ForecastRequestStats> forecastsToDelete;
try { try {
forecastsToDelete = findForecastsToDelete(searchResponse); forecastsToDelete = findForecastsToDelete(searchResponse);
@ -109,7 +114,9 @@ public class ExpiredForecastsRemover implements MlDataRemover {
return; return;
} }
DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete); DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete)
.setRequestsPerSecond(requestsPerSec)
.setAbortOnVersionConflict(false);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() { client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<BulkByScrollResponse>() {
@Override @Override
public void onResponse(BulkByScrollResponse bulkByScrollResponse) { public void onResponse(BulkByScrollResponse bulkByScrollResponse) {

View file

@ -133,7 +133,13 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover
} }
@Override @Override
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) { protected void removeDataBefore(
Job job,
float requestsPerSec,
long latestTimeMs,
long cutoffEpochMs,
ActionListener<Boolean> listener
) {
// TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss // TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss
if (job.getModelSnapshotId() == null) { if (job.getModelSnapshotId() == null) {
// No snapshot to remove // No snapshot to remove

View file

@ -84,9 +84,15 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
} }
@Override @Override
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) { protected void removeDataBefore(
Job job,
float requestsPerSecond,
long latestTimeMs,
long cutoffEpochMs,
ActionListener<Boolean> listener
) {
LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs);
DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs);
client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() {
@Override @Override
@ -108,14 +114,14 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover {
}); });
} }
private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) {
DeleteByQueryRequest request = new DeleteByQueryRequest(); DeleteByQueryRequest request = new DeleteByQueryRequest();
request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES);
// Delete the documents gradually. request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE)
// With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes. // We are deleting old data, we should simply proceed as a version conflict could mean that another deletion is taking place
request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE); .setAbortOnVersionConflict(false)
request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5); .setRequestsPerSecond(requestsPerSec);
request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId()));
QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(),

View file

@ -10,5 +10,5 @@ import org.elasticsearch.action.ActionListener;
import java.util.function.Supplier; import java.util.function.Supplier;
public interface MlDataRemover { public interface MlDataRemover {
void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier); void remove(float requestsPerSecond, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier);
} }

View file

@ -58,14 +58,14 @@ public class UnusedStateRemover implements MlDataRemover {
} }
@Override @Override
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { public void remove(float requestsPerSec, ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) {
try { try {
List<String> unusedStateDocIds = findUnusedStateDocIds(); List<String> unusedStateDocIds = findUnusedStateDocIds();
if (isTimedOutSupplier.get()) { if (isTimedOutSupplier.get()) {
listener.onResponse(false); listener.onResponse(false);
} else { } else {
if (unusedStateDocIds.size() > 0) { if (unusedStateDocIds.size() > 0) {
executeDeleteUnusedStateDocs(unusedStateDocIds, listener); executeDeleteUnusedStateDocs(unusedStateDocIds, requestsPerSec, listener);
} else { } else {
listener.onResponse(true); listener.onResponse(true);
} }
@ -98,12 +98,12 @@ public class UnusedStateRemover implements MlDataRemover {
private Set<String> getJobIds() { private Set<String> getJobIds() {
Set<String> jobIds = new HashSet<>(); Set<String> jobIds = new HashSet<>();
jobIds.addAll(getAnamalyDetectionJobIds()); jobIds.addAll(getAnomalyDetectionJobIds());
jobIds.addAll(getDataFrameAnalyticsJobIds()); jobIds.addAll(getDataFrameAnalyticsJobIds());
return jobIds; return jobIds;
} }
private Set<String> getAnamalyDetectionJobIds() { private Set<String> getAnomalyDetectionJobIds() {
Set<String> jobIds = new HashSet<>(); Set<String> jobIds = new HashSet<>();
// TODO Once at 8.0, we can stop searching for jobs in cluster state // TODO Once at 8.0, we can stop searching for jobs in cluster state
@ -130,11 +130,13 @@ public class UnusedStateRemover implements MlDataRemover {
return jobIds; return jobIds;
} }
private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, ActionListener<Boolean> listener) { private void executeDeleteUnusedStateDocs(List<String> unusedDocIds, float requestsPerSec, ActionListener<Boolean> listener) {
LOGGER.info("Found [{}] unused state documents; attempting to delete", LOGGER.info("Found [{}] unused state documents; attempting to delete",
unusedDocIds.size()); unusedDocIds.size());
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern())
.setIndicesOptions(IndicesOptions.lenientExpandOpen()) .setIndicesOptions(IndicesOptions.lenientExpandOpen())
.setAbortOnVersionConflict(false)
.setRequestsPerSecond(requestsPerSec)
.setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0])));
// _doc is the most efficient sort order and will also disable scoring // _doc is the most efficient sort order and will also disable scoring

View file

@ -41,7 +41,9 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler {
@Override @Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(); DeleteExpiredDataAction.Request request = restRequest.hasContent() ?
DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) :
new DeleteExpiredDataAction.Request();
return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }
} }

View file

@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -83,7 +84,7 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase {
} }
private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) {
return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> { return new MlDailyMaintenanceService(Settings.EMPTY, threadPool, client, clusterService, mlAssignmentNotifier, () -> {
latch.countDown(); latch.countDown();
return TimeValue.timeValueMillis(100); return TimeValue.timeValueMillis(100);
}); });

View file

@ -18,7 +18,6 @@ import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -65,31 +64,17 @@ public class MlInitializationServiceTests extends ESTestCase {
MlInitializationService initializationService = MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
initializationService.offMaster(); initializationService.offMaster();
assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(false));
}
public void testInitialize_alreadyInitialized() {
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
initializationService.onMaster();
assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService());
} }
public void testNodeGoesFromMasterToNonMasterAndBack() { public void testNodeGoesFromMasterToNonMasterAndBack() {
MlInitializationService initializationService =
new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
initializationService.setDailyMaintenanceService(initialDailyMaintenanceService);
MlInitializationService initializationService = new MlInitializationService(client, initialDailyMaintenanceService, clusterService);
initializationService.offMaster(); initializationService.offMaster();
verify(initialDailyMaintenanceService).stop(); verify(initialDailyMaintenanceService).stop();
initializationService.onMaster(); initializationService.onMaster();
MlDailyMaintenanceService finalDailyMaintenanceService = initializationService.getDailyMaintenanceService(); verify(initialDailyMaintenanceService).start();
assertNotSame(initialDailyMaintenanceService, finalDailyMaintenanceService);
assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true));
} }
} }

View file

@ -39,7 +39,11 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
*/ */
private static class DummyDataRemover implements MlDataRemover { private static class DummyDataRemover implements MlDataRemover {
public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOutSupplier) { public void remove(
float requestsPerSec,
ActionListener<Boolean> listener,
Supplier<Boolean> isTimedOutSupplier
) {
listener.onResponse(isTimedOutSupplier.get() == false); listener.onResponse(isTimedOutSupplier.get() == false);
} }
} }
@ -73,7 +77,7 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
Supplier<Boolean> isTimedOutSupplier = () -> false; Supplier<Boolean> isTimedOutSupplier = () -> false;
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
assertTrue(succeeded.get()); assertTrue(succeeded.get());
} }
@ -93,7 +97,7 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase {
Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0); Supplier<Boolean> isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0);
transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true);
assertFalse(succeeded.get()); assertFalse(succeeded.get());
} }

View file

@ -69,7 +69,13 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
} }
@Override @Override
protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener<Boolean> listener) { protected void removeDataBefore(
Job job,
float requestsPerSec,
long latestTimeMs,
long cutoffEpochMs,
ActionListener<Boolean> listener
) {
listener.onResponse(Boolean.TRUE); listener.onResponse(Boolean.TRUE);
} }
} }
@ -118,7 +124,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
TestListener listener = new TestListener(); TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
remover.remove(listener, () -> false); remover.remove(1.0f,listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
@ -157,7 +163,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
TestListener listener = new TestListener(); TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
remover.remove(listener, () -> false); remover.remove(1.0f,listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
@ -181,7 +187,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase {
TestListener listener = new TestListener(); TestListener listener = new TestListener();
ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient);
remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));

View file

@ -68,7 +68,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
} }
public void testRemove_TimedOut() { public void testRemove_TimedOut() {
remover.remove(listener, () -> true); remover.remove(1.0f, listener, () -> true);
InOrder inOrder = inOrder(client, listener); InOrder inOrder = inOrder(client, listener);
inOrder.verify(listener).onResponse(false); inOrder.verify(listener).onResponse(false);
@ -79,7 +79,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
when(indicesStatsResponse.getIndices()).thenReturn(Map.of()); when(indicesStatsResponse.getIndices()).thenReturn(Map.of());
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any()); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any());
remover.remove(listener, () -> false); remover.remove(1.0f, listener, () -> false);
InOrder inOrder = inOrder(client, listener); InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
@ -96,7 +96,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
".ml-state-d", indexStats(".ml-state-d", 2))).when(indicesStatsResponse).getIndices(); ".ml-state-d", indexStats(".ml-state-d", 2))).when(indicesStatsResponse).getIndices();
doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
remover.remove(listener, () -> false); remover.remove(1.0f, listener, () -> false);
InOrder inOrder = inOrder(client, listener); InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
@ -120,7 +120,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged); AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged);
doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any()); doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any());
remover.remove(listener, () -> false); remover.remove(1.0f, listener, () -> false);
InOrder inOrder = inOrder(client, listener); InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());
@ -148,7 +148,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase {
GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null, null); GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null, null);
doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any());
remover.remove(listener, () -> false); remover.remove(1.0f, listener, () -> false);
InOrder inOrder = inOrder(client, listener); InOrder inOrder = inOrder(client, listener);
inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any());

View file

@ -74,7 +74,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
givenClientRequestsSucceed(responses); givenClientRequestsSucceed(responses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList()));
givenClientRequestsSucceed(searchResponses); givenClientRequestsSucceed(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(true)); assertThat(listener.success, is(true));
@ -141,7 +141,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
final int timeoutAfter = randomIntBetween(0, 1); final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0));
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));
@ -156,7 +156,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
))); )));
givenClientSearchRequestsFail(searchResponses); givenClientSearchRequestsFail(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));
@ -192,7 +192,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase {
searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2)));
givenClientDeleteModelSnapshotRequestsFail(searchResponses); givenClientDeleteModelSnapshotRequestsFail(searchResponses);
createExpiredModelSnapshotsRemover().remove(listener, () -> false); createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false);
listener.waitToCompletion(); listener.waitToCompletion();
assertThat(listener.success, is(false)); assertThat(listener.success, is(false));

View file

@ -65,7 +65,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
givenDBQRequestsSucceed(); givenDBQRequestsSucceed();
AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList());
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(1.0f, listener, () -> false);
verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
verify(listener).onResponse(true); verify(listener).onResponse(true);
@ -79,7 +79,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("bar").build() JobTests.buildJobBuilder("bar").build()
)); ));
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(1.0f, listener, () -> false);
verify(listener).onResponse(true); verify(listener).onResponse(true);
verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(client).execute(eq(SearchAction.INSTANCE), any(), any());
@ -94,7 +94,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
new Bucket("id_not_important", new Date(), 60)); new Bucket("id_not_important", new Date(), 60));
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(1.0f, listener, () -> false);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);
@ -114,7 +114,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
final int timeoutAfter = randomIntBetween(0, 1); final int timeoutAfter = randomIntBetween(0, 1);
AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter);
createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); createExpiredResultsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0));
assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter));
verify(listener).onResponse(false); verify(listener).onResponse(false);
@ -129,7 +129,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase {
JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()),
new Bucket("id_not_important", new Date(), 60)); new Bucket("id_not_important", new Date(), 60));
createExpiredResultsRemover().remove(listener, () -> false); createExpiredResultsRemover().remove(1.0f, listener, () -> false);
assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1));
DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0);

View file

@ -14,6 +14,9 @@
] ]
} }
] ]
},
"body":{
"description":"deleting expired data parameters"
} }
} }
} }

View file

@ -0,0 +1,36 @@
setup:
- skip:
features: headers
- do:
headers:
Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser
ml.put_job:
job_id: delete-expired-data
body: >
{
"job_id": "delete-expired-data",
"description":"Analysis of response time by airline",
"analysis_config" : {
"bucket_span" : "1h",
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"field_delimiter":",",
"time_field":"time",
"time_format":"yyyy-MM-dd HH:mm:ssX"
}
}
---
"Test delete expired data with no body":
- do:
ml.delete_expired_data: {}
- match: { deleted: true}
---
"Test delete expired data with body parameters":
- do:
ml.delete_expired_data:
body: >
{ "timeout": "10h", "requests_per_second": 100000.0 }
- match: { deleted: true}