diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index 54dd11bf6caf..db64869c337a 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -167,13 +167,13 @@ final class MLRequestConverters { return request; } - static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) { + static Request deleteExpiredData(DeleteExpiredDataRequest deleteExpiredDataRequest) throws IOException { String endpoint = new EndpointBuilder() .addPathPartAsIs("_ml") .addPathPartAsIs("_delete_expired_data") .build(); Request request = new Request(HttpDelete.METHOD_NAME, endpoint); - + request.setEntity(createEntity(deleteExpiredDataRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java index 4da2d52ee357..c8ae8a01f9b5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteExpiredDataRequest.java @@ -19,16 +19,78 @@ package org.elasticsearch.client.ml; import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; /** * Request to delete expired model snapshots and forecasts */ -public class DeleteExpiredDataRequest implements Validatable { +public class DeleteExpiredDataRequest implements Validatable, ToXContentObject { + static final String REQUESTS_PER_SECOND = "requests_per_second"; + static final String TIMEOUT = "timeout"; + private final Float requestsPerSecond; + private final TimeValue timeout; /** * Create a new request to delete expired data */ public DeleteExpiredDataRequest() { + this(null, null); } + public DeleteExpiredDataRequest(Float requestsPerSecond, TimeValue timeout) { + this.requestsPerSecond = requestsPerSecond; + this.timeout = timeout; + } + + /** + * The requests allowed per second in the underlying Delete by Query requests executed. + * + * `-1.0f` indicates that the standard nightly cleanup behavior should be ran. + * Throttling scales according to the number of data nodes. + * `null` is default and means no throttling will occur. + */ + public Float getRequestsPerSecond() { + return requestsPerSecond; + } + + /** + * Indicates how long the deletion request will run until it timesout. + * + * Default value is 8 hours. + */ + public TimeValue getTimeout() { + return timeout; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeleteExpiredDataRequest that = (DeleteExpiredDataRequest) o; + return Objects.equals(requestsPerSecond, that.requestsPerSecond) && + Objects.equals(timeout, that.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(requestsPerSecond, timeout); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + if (requestsPerSecond != null) { + builder.field(REQUESTS_PER_SECOND, requestsPerSecond); + } + if (timeout != null) { + builder.field(TIMEOUT, timeout.getStringRep()); + } + builder.endObject(); + return builder; + } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index 86b14a7e8a74..bd552d5ff7de 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -214,12 +214,16 @@ public class MLRequestConvertersTests extends ESTestCase { requestEntityToString(request)); } - public void testDeleteExpiredData() { - DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest(); + public void testDeleteExpiredData() throws Exception { + float requestsPerSec = randomBoolean() ? -1.0f : (float)randomDoubleBetween(0.0, 100000.0, false); + DeleteExpiredDataRequest deleteExpiredDataRequest = new DeleteExpiredDataRequest( + requestsPerSec, + TimeValue.timeValueHours(1)); Request request = MLRequestConverters.deleteExpiredData(deleteExpiredDataRequest); assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals("/_ml/_delete_expired_data", request.getEndpoint()); + assertEquals("{\"requests_per_second\":" + requestsPerSec + ",\"timeout\":\"1h\"}", requestEntityToString(request)); } public void testDeleteJob() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 576e34365bd7..c5e2c02dea16 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -2035,7 +2035,11 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { MachineLearningIT.buildJob(jobId); { // tag::delete-expired-data-request - DeleteExpiredDataRequest request = new DeleteExpiredDataRequest(); // <1> + DeleteExpiredDataRequest request = new DeleteExpiredDataRequest( // <1> + 1000.0f, // <2> + TimeValue.timeValueHours(12) // <3> + ); + // end::delete-expired-data-request // tag::delete-expired-data-execute diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java new file mode 100644 index 000000000000..b5ed7a1e94b0 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/DeleteExpiredDataRequestTests.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + + +public class DeleteExpiredDataRequestTests extends AbstractXContentTestCase { + + private static ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "delete_expired_data_request", + true, + (a) -> new DeleteExpiredDataRequest((Float) a[0], (TimeValue) a[1]) + ); + static { + PARSER.declareFloat(ConstructingObjectParser.optionalConstructorArg(), + new ParseField(DeleteExpiredDataRequest.REQUESTS_PER_SECOND)); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), DeleteExpiredDataRequest.TIMEOUT), + new ParseField(DeleteExpiredDataRequest.TIMEOUT), + ObjectParser.ValueType.STRING); + } + + @Override + protected DeleteExpiredDataRequest createTestInstance() { + return new DeleteExpiredDataRequest(randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test")); + } + + @Override + protected DeleteExpiredDataRequest doParseInstance(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } +} diff --git a/docs/java-rest/high-level/ml/delete-expired-data.asciidoc b/docs/java-rest/high-level/ml/delete-expired-data.asciidoc index 8dc47750cbef..e51e44c5549c 100644 --- a/docs/java-rest/high-level/ml/delete-expired-data.asciidoc +++ b/docs/java-rest/high-level/ml/delete-expired-data.asciidoc @@ -21,6 +21,10 @@ A `DeleteExpiredDataRequest` object does not require any arguments. include-tagged::{doc-tests-file}[{api}-request] --------------------------------------------------- <1> Constructing a new request. +<2> Providing requests per second throttling for the + deletion processes. Default is no throttling. +<3> Setting how long the deletion processes will be allowed + to run before they are canceled. Default value is `8h` (8 hours). [id="{upid}-{api}-response"] ==== Delete Expired Data Response diff --git a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc index c1450ec43f8e..44679911ab25 100644 --- a/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc +++ b/docs/reference/ml/anomaly-detection/apis/delete-expired-data.asciidoc @@ -27,6 +27,17 @@ Deletes all job results, model snapshots and forecast data that have exceeded their `retention days` period. Machine learning state documents that are not associated with any job are also deleted. +[[ml-delete-expired-data-request-body]] +==== {api-request-body-title} + +`requests_per_second`:: +(Optional, float) The desired requests per second for the deletion processes. +The default behavior is no throttling. + +`timeout`:: +(Optional, string) How long can the underlying delete processes run until they are canceled. +The default value is `8h` (8 hours). + [[ml-delete-expired-data-example]] ==== {api-examples-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java index eb2d3e94c908..db4a10cd447c 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionRequestValidationException; @@ -14,6 +15,8 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -31,20 +34,94 @@ public class DeleteExpiredDataAction extends ActionType PARSER = new ObjectParser<>( + "delete_expired_data_request", + false, + Request::new); + + static { + PARSER.declareFloat(Request::setRequestsPerSecond, REQUESTS_PER_SECOND); + PARSER.declareString((obj, value) -> obj.setTimeout(TimeValue.parseTimeValue(value, TIMEOUT.getPreferredName())), + TIMEOUT); + } + + private Float requestsPerSecond; + private TimeValue timeout; + public Request() {} + public Request(Float requestsPerSecond, TimeValue timeValue) { + this.requestsPerSecond = requestsPerSecond; + this.timeout = timeValue; + } + public Request(StreamInput in) throws IOException { super(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.requestsPerSecond = in.readOptionalFloat(); + this.timeout = in.readOptionalTimeValue(); + } else { + this.requestsPerSecond = null; + this.timeout = null; + } + } + + public Float getRequestsPerSecond() { + return requestsPerSecond; + } + + public TimeValue getTimeout() { + return timeout; + } + + public Request setRequestsPerSecond(Float requestsPerSecond) { + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public Request setTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; } @Override public ActionRequestValidationException validate() { + if (this.requestsPerSecond != null && this.requestsPerSecond != -1.0f && this.requestsPerSecond <= 0) { + ActionRequestValidationException requestValidationException = new ActionRequestValidationException(); + requestValidationException.addValidationError("[requests_per_second] must either be -1 or greater than 0"); + return requestValidationException; + } return null; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(requestsPerSecond, request.requestsPerSecond) + && Objects.equals(timeout, request.timeout); + } + + @Override + public int hashCode() { + return Objects.hash(requestsPerSecond, timeout); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeOptionalFloat(requestsPerSecond); + out.writeOptionalTimeValue(timeout); + } + } } static class RequestBuilder extends ActionRequestBuilder { - RequestBuilder(ElasticsearchClient client, DeleteExpiredDataAction action) { super(client, action, new Request()); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java new file mode 100644 index 000000000000..aee8e62566a1 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/DeleteExpiredDataActionRequestTests.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.AbstractBWCWireSerializationTestCase; +import org.elasticsearch.xpack.core.ml.action.DeleteExpiredDataAction.Request; + +public class DeleteExpiredDataActionRequestTests extends AbstractBWCWireSerializationTestCase { + + @Override + protected Request createTestInstance() { + return new Request( + randomBoolean() ? null : randomFloat(), + randomBoolean() ? null : TimeValue.parseTimeValue(randomTimeValue(), "test") + ); + } + + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request mutateInstanceForVersion(Request instance, Version version) { + if (version.before(Version.V_8_0_0)) { + return new Request(); + } + return instance; + } +} diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 612fe23edbda..06ee0e48b6fd 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -95,11 +95,57 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } - public void testDeleteExpiredData() throws Exception { + public void testDeleteExpiredDataNoThrottle() throws Exception { + testExpiredDeletion(null, 10010); + } + + /** + * Verifies empty state indices deletion. Here is the summary of indices used by the test: + * + * +------------------+--------+----------+-------------------------+ + * | index name | empty? | current? | expected to be removed? | + * +------------------+--------+----------+-------------------------+ + * | .ml-state | yes | no | yes | + * | .ml-state-000001 | no | no | no | + * | .ml-state-000003 | yes | no | yes | + * | .ml-state-000005 | no | no | no | + * | .ml-state-000007 | yes | yes | no | + * +------------------+--------+----------+-------------------------+ + */ + public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception { + client().admin().indices().prepareCreate(".ml-state").get(); + client().admin().indices().prepareCreate(".ml-state-000001").get(); + client().prepareIndex(".ml-state-000001").setSource("field_1", "value_1").get(); + client().admin().indices().prepareCreate(".ml-state-000003").get(); + client().admin().indices().prepareCreate(".ml-state-000005").get(); + client().prepareIndex(".ml-state-000005").setSource("field_5", "value_5").get(); + client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get(); + refresh(); + + GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get(); + assertThat(Strings.toString(getIndexResponse), + getIndexResponse.getIndices(), + is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007"))); + + client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); + refresh(); + + getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get(); + assertThat(Strings.toString(getIndexResponse), + getIndexResponse.getIndices(), + // Only non-empty or current indices should survive deletion process + is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); + } + + public void testDeleteExpiredDataWithStandardThrottle() throws Exception { + testExpiredDeletion(-1.0f, 100); + } + + private void testExpiredDeletion(Float customThrottle, int numUnusedState) throws Exception { // Index some unused state documents (more than 10K to test scrolling works) String mlStateIndexName = AnomalyDetectorsIndexFields.STATE_INDEX_PREFIX + "-000001"; BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (int i = 0; i < 10010; i++) { + for (int i = 0; i < numUnusedState; i++) { String docId = "non_existing_job_" + randomFrom("model_state_1234567#" + i, "quantiles", "categorizer_state#" + i); IndexRequest indexRequest = new IndexRequest(mlStateIndexName) @@ -165,10 +211,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { // We must set a very small value for expires_in to keep this testable as the deletion cutoff point is the moment // the DeleteExpiredDataAction is called. - String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.timeValueSeconds(1)); + String forecastShortExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.timeValueSeconds(1)); shortExpiryForecastIds.add(forecastShortExpiryId); - String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), null); - String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(3), TimeValue.ZERO); + String forecastDefaultExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), null); + String forecastNoExpiryId = forecast(job.getId(), TimeValue.timeValueHours(1), TimeValue.ZERO); waitForecastToFinish(job.getId(), forecastShortExpiryId); waitForecastToFinish(job.getId(), forecastDefaultExpiryId); waitForecastToFinish(job.getId(), forecastNoExpiryId); @@ -195,9 +241,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { retainAllSnapshots("snapshots-retention-with-retain"); long totalModelSizeStatsBeforeDelete = client().prepareSearch("*") - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get().getHits().getTotalHits().value; + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get().getHits().getTotalHits().value; long totalNotificationsCountBeforeDelete = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; assertThat(totalModelSizeStatsBeforeDelete, greaterThan(0L)); @@ -214,7 +260,7 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { assertThat(indexUnusedStateDocsResponse.get().status(), equalTo(RestStatus.OK)); // Now call the action under test - assertThat(deleteExpiredData().isDeleted(), is(true)); + assertThat(deleteExpiredData(customThrottle).isDeleted(), is(true)); // no-retention job should have kept all data assertThat(getBuckets("no-retention").size(), is(greaterThanOrEqualTo(70))); @@ -244,9 +290,9 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { assertThat(getModelSnapshots("results-and-snapshots-retention").size(), equalTo(1)); long totalModelSizeStatsAfterDelete = client().prepareSearch("*") - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) - .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) - .get().getHits().getTotalHits().value; + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) + .setQuery(QueryBuilders.termQuery("result_type", "model_size_stats")) + .get().getHits().getTotalHits().value; long totalNotificationsCountAfterDelete = client().prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).get().getHits().getTotalHits().value; assertThat(totalModelSizeStatsAfterDelete, equalTo(totalModelSizeStatsBeforeDelete)); @@ -266,10 +312,10 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { // Verify .ml-state doesn't contain unused state documents SearchResponse stateDocsResponse = client().prepareSearch(AnomalyDetectorsIndex.jobStateIndexPattern()) - .setFetchSource(false) - .setTrackTotalHits(true) - .setSize(10000) - .get(); + .setFetchSource(false) + .setTrackTotalHits(true) + .setSize(10000) + .get(); // Assert at least one state doc for each job assertThat(stateDocsResponse.getHits().getTotalHits().value, greaterThanOrEqualTo(5L)); @@ -288,44 +334,6 @@ public class DeleteExpiredDataIT extends MlNativeAutodetectIntegTestCase { nonExistingJobDocsCount, equalTo(0)); } - /** - * Verifies empty state indices deletion. Here is the summary of indices used by the test: - * - * +------------------+--------+----------+-------------------------+ - * | index name | empty? | current? | expected to be removed? | - * +------------------+--------+----------+-------------------------+ - * | .ml-state | yes | no | yes | - * | .ml-state-000001 | no | no | no | - * | .ml-state-000003 | yes | no | yes | - * | .ml-state-000005 | no | no | no | - * | .ml-state-000007 | yes | yes | no | - * +------------------+--------+----------+-------------------------+ - */ - public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Exception { - client().admin().indices().prepareCreate(".ml-state").get(); - client().admin().indices().prepareCreate(".ml-state-000001").get(); - client().prepareIndex(".ml-state-000001").setSource("field_1", "value_1").get(); - client().admin().indices().prepareCreate(".ml-state-000003").get(); - client().admin().indices().prepareCreate(".ml-state-000005").get(); - client().prepareIndex(".ml-state-000005").setSource("field_5", "value_5").get(); - client().admin().indices().prepareCreate(".ml-state-000007").addAlias(new Alias(".ml-state-write").isHidden(true)).get(); - refresh(); - - GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get(); - assertThat(Strings.toString(getIndexResponse), - getIndexResponse.getIndices(), - is(arrayContaining(".ml-state", ".ml-state-000001", ".ml-state-000003", ".ml-state-000005", ".ml-state-000007"))); - - client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); - refresh(); - - getIndexResponse = client().admin().indices().prepareGetIndex().setIndices(".ml-state*").get(); - assertThat(Strings.toString(getIndexResponse), - getIndexResponse.getIndices(), - // Only non-empty or current indices should survive deletion process - is(arrayContaining(".ml-state-000001", ".ml-state-000005", ".ml-state-000007"))); - } - private static Job.Builder newJobBuilder(String id) { Detector.Builder detector = new Detector.Builder(); detector.setFunction("count"); diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java index 2a3bd0f00c00..f413e5b9f44f 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/test/java/org/elasticsearch/xpack/ml/integration/MlNativeIntegTestCase.java @@ -221,6 +221,16 @@ abstract class MlNativeIntegTestCase extends ESIntegTestCase { return response; } + protected DeleteExpiredDataAction.Response deleteExpiredData(Float customThrottle) throws Exception { + DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(); + request.setRequestsPerSecond(customThrottle); + DeleteExpiredDataAction.Response response = client().execute(DeleteExpiredDataAction.INSTANCE, request).get(); + // We need to refresh to ensure the deletion is visible + refresh("*"); + + return response; + } + protected PutFilterAction.Response putMlFilter(MlFilter filter) { return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet(); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index edcfbc048d4c..1c31d8c6be0a 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -421,6 +421,23 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys public static final Setting MIN_DISK_SPACE_OFF_HEAP = Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); + // Requests per second throttling for the nightly maintenance task + public static final Setting NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND = + new Setting<>( + "xpack.ml.nightly_maintenance_requests_per_second", + (s) -> Float.toString(-1.0f), + (s) -> { + float value = Float.parseFloat(s); + if (value <= 0.0f && value != -1.0f) { + throw new IllegalArgumentException("Failed to parse value [" + + s + "] for setting [xpack.ml.nightly_maintenance_requests_per_second] must be > 0.0 or exactly equal to -1.0"); + } + return value; + }, + Property.Dynamic, + Property.NodeScope + ); + private static final Logger logger = LogManager.getLogger(MachineLearning.class); private final Settings settings; @@ -466,7 +483,8 @@ public class MachineLearning extends Plugin implements SystemIndexPlugin, Analys InferenceProcessor.MAX_INFERENCE_PROCESSORS, ModelLoadingService.INFERENCE_MODEL_CACHE_SIZE, ModelLoadingService.INFERENCE_MODEL_CACHE_TTL, - ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES + ResultsPersisterService.PERSIST_RESULTS_MAX_RETRIES, + NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java index 1a9ff34522f3..b529495b7c3f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceService.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; @@ -51,19 +52,25 @@ public class MlDailyMaintenanceService implements Releasable { private final Supplier schedulerProvider; private volatile Scheduler.Cancellable cancellable; + private volatile float deleteExpiredDataRequestsPerSecond; - MlDailyMaintenanceService(ThreadPool threadPool, Client client, ClusterService clusterService, + MlDailyMaintenanceService(Settings settings, ThreadPool threadPool, Client client, ClusterService clusterService, MlAssignmentNotifier mlAssignmentNotifier, Supplier scheduleProvider) { this.threadPool = Objects.requireNonNull(threadPool); this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); this.schedulerProvider = Objects.requireNonNull(scheduleProvider); + this.deleteExpiredDataRequestsPerSecond = MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND.get(settings); } - public MlDailyMaintenanceService(ClusterName clusterName, ThreadPool threadPool, Client client, ClusterService clusterService, - MlAssignmentNotifier mlAssignmentNotifier) { - this(threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); + public MlDailyMaintenanceService(Settings settings, ClusterName clusterName, ThreadPool threadPool, + Client client, ClusterService clusterService, MlAssignmentNotifier mlAssignmentNotifier) { + this(settings, threadPool, client, clusterService, mlAssignmentNotifier, () -> delayToNextTime(clusterName)); + } + + void setDeleteExpiredDataRequestsPerSecond(float value) { + this.deleteExpiredDataRequestsPerSecond = value; } /** @@ -101,7 +108,7 @@ public class MlDailyMaintenanceService implements Releasable { } } - public boolean isStarted() { + boolean isStarted() { return cancellable != null; } @@ -129,7 +136,10 @@ public class MlDailyMaintenanceService implements Releasable { return; } LOGGER.info("triggering scheduled [ML] maintenance tasks"); - executeAsyncWithOrigin(client, ML_ORIGIN, DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request(), + executeAsyncWithOrigin(client, + ML_ORIGIN, + DeleteExpiredDataAction.INSTANCE, + new DeleteExpiredDataAction.Request(deleteExpiredDataRequestsPerSecond, TimeValue.timeValueHours(8)), ActionListener.wrap( response -> { if (response.isDeleted()) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 67b4e1e36b7e..2859bd0c6f73 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -26,34 +26,56 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi private static final Logger logger = LogManager.getLogger(MlInitializationService.class); - private final Settings settings; - private final ThreadPool threadPool; - private final ClusterService clusterService; private final Client client; - private final MlAssignmentNotifier mlAssignmentNotifier; private final AtomicBoolean isIndexCreationInProgress = new AtomicBoolean(false); - private volatile MlDailyMaintenanceService mlDailyMaintenanceService; + private final MlDailyMaintenanceService mlDailyMaintenanceService; MlInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, Client client, MlAssignmentNotifier mlAssignmentNotifier) { - this.settings = Objects.requireNonNull(settings); - this.threadPool = Objects.requireNonNull(threadPool); - this.clusterService = Objects.requireNonNull(clusterService); + this(client, + new MlDailyMaintenanceService( + settings, + Objects.requireNonNull(clusterService).getClusterName(), + threadPool, + client, + clusterService, + mlAssignmentNotifier + ), + clusterService); + } + + // For testing + MlInitializationService(Client client, MlDailyMaintenanceService dailyMaintenanceService, ClusterService clusterService) { this.client = Objects.requireNonNull(client); - this.mlAssignmentNotifier = Objects.requireNonNull(mlAssignmentNotifier); + this.mlDailyMaintenanceService = dailyMaintenanceService; clusterService.addListener(this); clusterService.addLocalNodeMasterListener(this); + clusterService.addLifecycleListener(new LifecycleListener() { + @Override + public void afterStart() { + clusterService.getClusterSettings().addSettingsUpdateConsumer( + MachineLearning.NIGHTLY_MAINTENANCE_REQUESTS_PER_SECOND, + mlDailyMaintenanceService::setDeleteExpiredDataRequestsPerSecond + ); + } + + @Override + public void beforeStop() { + offMaster(); + } + }); } + @Override public void onMaster() { - installDailyMaintenanceService(); + mlDailyMaintenanceService.start(); } @Override public void offMaster() { - uninstallDailyMaintenanceService(); + mlDailyMaintenanceService.stop(); } @Override @@ -85,35 +107,10 @@ class MlInitializationService implements LocalNodeMasterListener, ClusterStateLi return ThreadPool.Names.GENERIC; } - private synchronized void installDailyMaintenanceService() { - if (mlDailyMaintenanceService == null) { - mlDailyMaintenanceService = - new MlDailyMaintenanceService(clusterService.getClusterName(), threadPool, client, clusterService, mlAssignmentNotifier); - mlDailyMaintenanceService.start(); - clusterService.addLifecycleListener(new LifecycleListener() { - @Override - public void beforeStop() { - uninstallDailyMaintenanceService(); - } - }); - } - } - - private synchronized void uninstallDailyMaintenanceService() { - if (mlDailyMaintenanceService != null) { - mlDailyMaintenanceService.stop(); - mlDailyMaintenanceService = null; - } - } - /** For testing */ MlDailyMaintenanceService getDailyMaintenanceService() { return mlDailyMaintenanceService; } - /** For testing */ - synchronized void setDailyMaintenanceService(MlDailyMaintenanceService service) { - mlDailyMaintenanceService = service; - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java index fe3d1a73c89d..4357fa438ba7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataAction.java @@ -15,6 +15,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -43,8 +44,7 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction listener) { logger.info("Deleting expired data"); - Instant timeoutTime = Instant.now(clock).plus(MAX_DURATION); + Instant timeoutTime = Instant.now(clock).plus( + request.getTimeout() == null ? DEFAULT_MAX_DURATION : Duration.ofMillis(request.getTimeout().millis()) + ); + Supplier isTimedOutSupplier = () -> Instant.now(clock).isAfter(timeoutTime); - threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> deleteExpiredData(listener, isTimedOutSupplier)); + threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute( + () -> deleteExpiredData(request, listener, isTimedOutSupplier) + ); } - private void deleteExpiredData(ActionListener listener, + private void deleteExpiredData(DeleteExpiredDataAction.Request request, + ActionListener listener, Supplier isTimedOutSupplier) { AnomalyDetectionAuditor auditor = new AnomalyDetectionAuditor(client, clusterService.getNodeName()); List dataRemovers = Arrays.asList( @@ -89,24 +95,43 @@ public class TransportDeleteExpiredDataAction extends HandledTransportAction dataRemoversIterator = new VolatileCursorIterator<>(dataRemovers); - deleteExpiredData(dataRemoversIterator, listener, isTimedOutSupplier, true); + // If there is no throttle provided, default to none + float requestsPerSec = request.getRequestsPerSecond() == null ? Float.POSITIVE_INFINITY : request.getRequestsPerSecond(); + int numberOfDatanodes = Math.max(clusterService.state().getNodes().getDataNodes().size(), 1); + if (requestsPerSec == -1.0f) { + // With DEFAULT_SCROLL_SIZE = 1000 and a single data node this implies we spread deletion of + // 1 million documents over 5000 seconds ~= 83 minutes. + // If we have > 5 data nodes, we don't set our throttling. + requestsPerSec = numberOfDatanodes < 5 ? + (float)(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5) * numberOfDatanodes : + Float.POSITIVE_INFINITY; + } + deleteExpiredData(dataRemoversIterator, requestsPerSec, listener, isTimedOutSupplier, true); } void deleteExpiredData(Iterator mlDataRemoversIterator, + float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier, boolean haveAllPreviousDeletionsCompleted) { if (haveAllPreviousDeletionsCompleted && mlDataRemoversIterator.hasNext()) { MlDataRemover remover = mlDataRemoversIterator.next(); ActionListener nextListener = ActionListener.wrap( - booleanResponse -> deleteExpiredData(mlDataRemoversIterator, listener, isTimedOutSupplier, booleanResponse), + booleanResponse -> + deleteExpiredData( + mlDataRemoversIterator, + requestsPerSecond, + listener, + isTimedOutSupplier, + booleanResponse + ), listener::onFailure); // Removing expired ML data and artifacts requires multiple operations. // These are queued up and executed sequentially in the action listener, // the chained calls must all run the ML utility thread pool NOT the thread // the previous action returned in which in the case of a transport_client_boss // thread is a disaster. - remover.remove(new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), + remover.remove(requestsPerSecond, new ThreadedActionListener<>(logger, threadPool, executor, nextListener, false), isTimedOutSupplier); } else { if (haveAllPreviousDeletionsCompleted) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java index 0173f5b27d0d..322466b23dd6 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemover.java @@ -38,11 +38,15 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { - removeData(newJobIterator(), listener, isTimedOutSupplier); + public void remove(float requestsPerSecond, + ActionListener listener, + Supplier isTimedOutSupplier) { + removeData(newJobIterator(), requestsPerSecond, listener, isTimedOutSupplier); } - private void removeData(WrappedBatchedJobsIterator jobIterator, ActionListener listener, + private void removeData(WrappedBatchedJobsIterator jobIterator, + float requestsPerSecond, + ActionListener listener, Supplier isTimedOutSupplier) { if (jobIterator.hasNext() == false) { listener.onResponse(true); @@ -62,17 +66,17 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { Long retentionDays = getRetentionDays(job); if (retentionDays == null) { - removeData(jobIterator, listener, isTimedOutSupplier); + removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier); return; } calcCutoffEpochMs(job.getId(), retentionDays, ActionListener.wrap( response -> { if (response == null) { - removeData(jobIterator, listener, isTimedOutSupplier); + removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier); } else { - removeDataBefore(job, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( - r -> removeData(jobIterator, listener, isTimedOutSupplier), + removeDataBefore(job, requestsPerSecond, response.latestTimeMs, response.cutoffEpochMs, ActionListener.wrap( + r -> removeData(jobIterator, requestsPerSecond, listener, isTimedOutSupplier), listener::onFailure)); } }, @@ -93,7 +97,13 @@ abstract class AbstractExpiredJobDataRemover implements MlDataRemover { * Template method to allow implementation details of various types of data (e.g. results, model snapshots). * Implementors need to call {@code listener.onResponse} when they are done in order to continue to the next job. */ - abstract void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener); + abstract void removeDataBefore( + Job job, + float requestsPerSecond, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ); static BoolQueryBuilder createQuery(String jobId, long cutoffEpochMs) { return QueryBuilders.boolQuery() diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java index 8b8fbe0f2632..586b259e5ad9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemover.java @@ -32,7 +32,7 @@ public class EmptyStateIndexRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { try { if (isTimedOutSupplier.get()) { listener.onResponse(false); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 40611438fda5..f4f85dc70436 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -73,10 +73,10 @@ public class ExpiredForecastsRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { LOGGER.debug("Removing forecasts that expire before [{}]", cutoffEpochMs); ActionListener forecastStatsHandler = ActionListener.wrap( - searchResponse -> deleteForecasts(searchResponse, listener, isTimedOutSupplier), + searchResponse -> deleteForecasts(searchResponse, requestsPerSec, listener, isTimedOutSupplier), e -> listener.onFailure(new ElasticsearchException("An error occurred while searching forecasts to delete", e))); SearchSourceBuilder source = new SearchSourceBuilder(); @@ -95,7 +95,12 @@ public class ExpiredForecastsRemover implements MlDataRemover { MachineLearning.UTILITY_THREAD_POOL_NAME, forecastStatsHandler, false)); } - private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { + private void deleteForecasts( + SearchResponse searchResponse, + float requestsPerSec, + ActionListener listener, + Supplier isTimedOutSupplier + ) { List forecastsToDelete; try { forecastsToDelete = findForecastsToDelete(searchResponse); @@ -109,7 +114,9 @@ public class ExpiredForecastsRemover implements MlDataRemover { return; } - DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete); + DeleteByQueryRequest request = buildDeleteByQuery(forecastsToDelete) + .setRequestsPerSecond(requestsPerSec) + .setAbortOnVersionConflict(false); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener() { @Override public void onResponse(BulkByScrollResponse bulkByScrollResponse) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index d6808af8bd1a..8056c87e954e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -133,7 +133,13 @@ public class ExpiredModelSnapshotsRemover extends AbstractExpiredJobDataRemover } @Override - protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSec, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { // TODO: delete this test if we ever allow users to revert a job to no model snapshot, e.g. to recover from data loss if (job.getModelSnapshotId() == null) { // No snapshot to remove diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java index 54b0d1f54753..13a5823483c1 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemover.java @@ -84,9 +84,15 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { } @Override - protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSecond, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { LOGGER.debug("Removing results of job [{}] that have a timestamp before [{}]", job.getId(), cutoffEpochMs); - DeleteByQueryRequest request = createDBQRequest(job, cutoffEpochMs); + DeleteByQueryRequest request = createDBQRequest(job, requestsPerSecond, cutoffEpochMs); client.execute(DeleteByQueryAction.INSTANCE, request, new ActionListener<>() { @Override @@ -108,14 +114,14 @@ public class ExpiredResultsRemover extends AbstractExpiredJobDataRemover { }); } - private DeleteByQueryRequest createDBQRequest(Job job, long cutoffEpochMs) { + DeleteByQueryRequest createDBQRequest(Job job, float requestsPerSec, long cutoffEpochMs) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); - // Delete the documents gradually. - // With DEFAULT_SCROLL_SIZE = 1000 this implies we spread deletion of 1 million documents over 5000 seconds ~= 83 minutes. - request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE); - request.setRequestsPerSecond(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE / 5); + request.setBatchSize(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) + // We are deleting old data, we should simply proceed as a version conflict could mean that another deletion is taking place + .setAbortOnVersionConflict(false) + .setRequestsPerSecond(requestsPerSec); request.indices(AnomalyDetectorsIndex.jobResultsAliasedName(job.getId())); QueryBuilder excludeFilter = QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22..34a5335da8c7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -10,5 +10,5 @@ import org.elasticsearch.action.ActionListener; import java.util.function.Supplier; public interface MlDataRemover { - void remove(ActionListener listener, Supplier isTimedOutSupplier); + void remove(float requestsPerSecond, ActionListener listener, Supplier isTimedOutSupplier); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java index 6d0a9de1569d..723902073916 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/UnusedStateRemover.java @@ -58,14 +58,14 @@ public class UnusedStateRemover implements MlDataRemover { } @Override - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove(float requestsPerSec, ActionListener listener, Supplier isTimedOutSupplier) { try { List unusedStateDocIds = findUnusedStateDocIds(); if (isTimedOutSupplier.get()) { listener.onResponse(false); } else { if (unusedStateDocIds.size() > 0) { - executeDeleteUnusedStateDocs(unusedStateDocIds, listener); + executeDeleteUnusedStateDocs(unusedStateDocIds, requestsPerSec, listener); } else { listener.onResponse(true); } @@ -98,12 +98,12 @@ public class UnusedStateRemover implements MlDataRemover { private Set getJobIds() { Set jobIds = new HashSet<>(); - jobIds.addAll(getAnamalyDetectionJobIds()); + jobIds.addAll(getAnomalyDetectionJobIds()); jobIds.addAll(getDataFrameAnalyticsJobIds()); return jobIds; } - private Set getAnamalyDetectionJobIds() { + private Set getAnomalyDetectionJobIds() { Set jobIds = new HashSet<>(); // TODO Once at 8.0, we can stop searching for jobs in cluster state @@ -130,11 +130,13 @@ public class UnusedStateRemover implements MlDataRemover { return jobIds; } - private void executeDeleteUnusedStateDocs(List unusedDocIds, ActionListener listener) { + private void executeDeleteUnusedStateDocs(List unusedDocIds, float requestsPerSec, ActionListener listener) { LOGGER.info("Found [{}] unused state documents; attempting to delete", unusedDocIds.size()); DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(AnomalyDetectorsIndex.jobStateIndexPattern()) .setIndicesOptions(IndicesOptions.lenientExpandOpen()) + .setAbortOnVersionConflict(false) + .setRequestsPerSecond(requestsPerSec) .setQuery(QueryBuilders.idsQuery().addIds(unusedDocIds.toArray(new String[0]))); // _doc is the most efficient sort order and will also disable scoring diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java index e6eb50f8b041..dca079e3c556 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/RestDeleteExpiredDataAction.java @@ -41,7 +41,9 @@ public class RestDeleteExpiredDataAction extends BaseRestHandler { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { - DeleteExpiredDataAction.Request request = new DeleteExpiredDataAction.Request(); + DeleteExpiredDataAction.Request request = restRequest.hasContent() ? + DeleteExpiredDataAction.Request.PARSER.apply(restRequest.contentParser(), null) : + new DeleteExpiredDataAction.Request(); return channel -> client.execute(DeleteExpiredDataAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java index b3718366983b..c7bc4b50679b 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlDailyMaintenanceServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.persistent.PersistentTasksCustomMetadata; import org.elasticsearch.test.ESTestCase; @@ -83,7 +84,7 @@ public class MlDailyMaintenanceServiceTests extends ESTestCase { } private MlDailyMaintenanceService createService(CountDownLatch latch, Client client) { - return new MlDailyMaintenanceService(threadPool, client, clusterService, mlAssignmentNotifier, () -> { + return new MlDailyMaintenanceService(Settings.EMPTY, threadPool, client, clusterService, mlAssignmentNotifier, () -> { latch.countDown(); return TimeValue.timeValueMillis(100); }); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 860f46132753..5dba0453124e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -18,7 +18,6 @@ import java.util.concurrent.ExecutorService; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -65,31 +64,17 @@ public class MlInitializationServiceTests extends ESTestCase { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); initializationService.offMaster(); - assertThat(initializationService.getDailyMaintenanceService(), is(nullValue())); - } - - public void testInitialize_alreadyInitialized() { - MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); - MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); - initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); - initializationService.onMaster(); - - assertSame(initialDailyMaintenanceService, initializationService.getDailyMaintenanceService()); + assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(false)); } public void testNodeGoesFromMasterToNonMasterAndBack() { - MlInitializationService initializationService = - new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client, mlAssignmentNotifier); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); - initializationService.setDailyMaintenanceService(initialDailyMaintenanceService); + MlInitializationService initializationService = new MlInitializationService(client, initialDailyMaintenanceService, clusterService); initializationService.offMaster(); verify(initialDailyMaintenanceService).stop(); initializationService.onMaster(); - MlDailyMaintenanceService finalDailyMaintenanceService = initializationService.getDailyMaintenanceService(); - assertNotSame(initialDailyMaintenanceService, finalDailyMaintenanceService); - assertThat(initializationService.getDailyMaintenanceService().isStarted(), is(true)); + verify(initialDailyMaintenanceService).start(); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java index f3b180a9cf01..3937e5f8e0c5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportDeleteExpiredDataActionTests.java @@ -39,7 +39,11 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase { */ private static class DummyDataRemover implements MlDataRemover { - public void remove(ActionListener listener, Supplier isTimedOutSupplier) { + public void remove( + float requestsPerSec, + ActionListener listener, + Supplier isTimedOutSupplier + ) { listener.onResponse(isTimedOutSupplier.get() == false); } } @@ -73,7 +77,7 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase { Supplier isTimedOutSupplier = () -> false; - transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true); assertTrue(succeeded.get()); } @@ -93,7 +97,7 @@ public class TransportDeleteExpiredDataActionTests extends ESTestCase { Supplier isTimedOutSupplier = () -> (removersRemaining.getAndDecrement() <= 0); - transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), finalListener, isTimedOutSupplier, true); + transportDeleteExpiredDataAction.deleteExpiredData(removers.iterator(), 1.0f, finalListener, isTimedOutSupplier, true); assertFalse(succeeded.get()); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 8dc5bec70fbe..432d8ffa3e41 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -69,7 +69,13 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { } @Override - protected void removeDataBefore(Job job, long latestTimeMs, long cutoffEpochMs, ActionListener listener) { + protected void removeDataBefore( + Job job, + float requestsPerSec, + long latestTimeMs, + long cutoffEpochMs, + ActionListener listener + ) { listener.onResponse(Boolean.TRUE); } } @@ -118,7 +124,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> false); + remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -157,7 +163,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> false); + remover.remove(1.0f,listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -181,7 +187,7 @@ public class AbstractExpiredJobDataRemoverTests extends ESTestCase { TestListener listener = new TestListener(); ConcreteExpiredJobDataRemover remover = new ConcreteExpiredJobDataRemover(originSettingClient); - remover.remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + remover.remove(1.0f,listener, () -> attemptsLeft.getAndDecrement() <= 0); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java index 7b5e44f404fc..8cf4ac4cc199 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/EmptyStateIndexRemoverTests.java @@ -68,7 +68,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { } public void testRemove_TimedOut() { - remover.remove(listener, () -> true); + remover.remove(1.0f, listener, () -> true); InOrder inOrder = inOrder(client, listener); inOrder.verify(listener).onResponse(false); @@ -79,7 +79,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { when(indicesStatsResponse.getIndices()).thenReturn(Map.of()); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(any(), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -96,7 +96,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { ".ml-state-d", indexStats(".ml-state-d", 2))).when(indicesStatsResponse).getIndices(); doAnswer(withResponse(indicesStatsResponse)).when(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -120,7 +120,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { AcknowledgedResponse deleteIndexResponse = new AcknowledgedResponse(acknowledged); doAnswer(withResponse(deleteIndexResponse)).when(client).execute(eq(DeleteIndexAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); @@ -148,7 +148,7 @@ public class EmptyStateIndexRemoverTests extends ESTestCase { GetIndexResponse getIndexResponse = new GetIndexResponse(new String[] { ".ml-state-a" }, null, null, null, null, null); doAnswer(withResponse(getIndexResponse)).when(client).execute(eq(GetIndexAction.INSTANCE), any(), any()); - remover.remove(listener, () -> false); + remover.remove(1.0f, listener, () -> false); InOrder inOrder = inOrder(client, listener); inOrder.verify(client).execute(eq(IndicesStatsAction.INSTANCE), any(), any()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index f48ad5a7e122..9ee4cb177aaf 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -74,7 +74,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { givenClientRequestsSucceed(responses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -104,7 +104,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.emptyList())); givenClientRequestsSucceed(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(true)); @@ -141,7 +141,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); - createExpiredModelSnapshotsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0)); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -156,7 +156,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { ))); givenClientSearchRequestsFail(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); @@ -192,7 +192,7 @@ public class ExpiredModelSnapshotsRemoverTests extends ESTestCase { searchResponses.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(Collections.singletonList(snapshot2_2))); givenClientDeleteModelSnapshotRequestsFail(searchResponses); - createExpiredModelSnapshotsRemover().remove(listener, () -> false); + createExpiredModelSnapshotsRemover().remove(1.0f, listener, () -> false); listener.waitToCompletion(); assertThat(listener.success, is(false)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java index b1691baca0c7..92b9e85f661c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredResultsRemoverTests.java @@ -65,7 +65,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { givenDBQRequestsSucceed(); AbstractExpiredJobDataRemoverTests.givenJobs(client, Collections.emptyList()); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); verify(listener).onResponse(true); @@ -79,7 +79,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("bar").build() )); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); verify(listener).onResponse(true); verify(client).execute(eq(SearchAction.INSTANCE), any(), any()); @@ -94,7 +94,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), new Bucket("id_not_important", new Date(), 60)); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(2)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); @@ -114,7 +114,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { final int timeoutAfter = randomIntBetween(0, 1); AtomicInteger attemptsLeft = new AtomicInteger(timeoutAfter); - createExpiredResultsRemover().remove(listener, () -> (attemptsLeft.getAndDecrement() <= 0)); + createExpiredResultsRemover().remove(1.0f, listener, () -> (attemptsLeft.getAndDecrement() <= 0)); assertThat(capturedDeleteByQueryRequests.size(), equalTo(timeoutAfter)); verify(listener).onResponse(false); @@ -129,7 +129,7 @@ public class ExpiredResultsRemoverTests extends ESTestCase { JobTests.buildJobBuilder("results-2").setResultsRetentionDays(20L).build()), new Bucket("id_not_important", new Date(), 60)); - createExpiredResultsRemover().remove(listener, () -> false); + createExpiredResultsRemover().remove(1.0f, listener, () -> false); assertThat(capturedDeleteByQueryRequests.size(), equalTo(1)); DeleteByQueryRequest dbqRequest = capturedDeleteByQueryRequests.get(0); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json index ac38a7a34493..4c55f853a742 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_expired_data.json @@ -14,6 +14,9 @@ ] } ] + }, + "body":{ + "description":"deleting expired data parameters" } } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml new file mode 100644 index 000000000000..319c564b2669 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ml/delete_expired_data.yml @@ -0,0 +1,36 @@ +setup: + - skip: + features: headers + - do: + headers: + Authorization: "Basic eF9wYWNrX3Jlc3RfdXNlcjp4LXBhY2stdGVzdC1wYXNzd29yZA==" # run as x_pack_rest_user, i.e. the test setup superuser + ml.put_job: + job_id: delete-expired-data + body: > + { + "job_id": "delete-expired-data", + "description":"Analysis of response time by airline", + "analysis_config" : { + "bucket_span" : "1h", + "detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}] + }, + "data_description" : { + "field_delimiter":",", + "time_field":"time", + "time_format":"yyyy-MM-dd HH:mm:ssX" + } + } + +--- +"Test delete expired data with no body": + - do: + ml.delete_expired_data: {} + + - match: { deleted: true} +--- +"Test delete expired data with body parameters": + - do: + ml.delete_expired_data: + body: > + { "timeout": "10h", "requests_per_second": 100000.0 } + - match: { deleted: true}