diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java index bf05815144bc..ec6b2da719bf 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java @@ -712,6 +712,9 @@ final class MLRequestConverters { if (deleteRequest.getForce() != null) { params.putParam("force", Boolean.toString(deleteRequest.getForce())); } + if (deleteRequest.getTimeout() != null) { + params.withTimeout(deleteRequest.getTimeout()); + } request.addParameters(params.asMap()); return request; diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java index b7300430b55b..dc69a1728159 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/DeleteDataFrameAnalyticsRequest.java @@ -21,6 +21,7 @@ package org.elasticsearch.client.ml; import org.elasticsearch.client.Validatable; import org.elasticsearch.client.ValidationException; +import org.elasticsearch.common.unit.TimeValue; import java.util.Objects; import java.util.Optional; @@ -32,6 +33,7 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable { private final String id; private Boolean force; + private TimeValue timeout; public DeleteDataFrameAnalyticsRequest(String id) { this.id = id; @@ -55,6 +57,19 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable { this.force = force; } + public TimeValue getTimeout() { + return timeout; + } + + /** + * Sets the time to wait until the job is deleted. + * + * @param timeout The time to wait until the job is deleted. + */ + public void setTimeout(TimeValue timeout) { + this.timeout = timeout; + } + @Override public Optional validate() { if (id == null) { @@ -69,11 +84,13 @@ public class DeleteDataFrameAnalyticsRequest implements Validatable { if (o == null || getClass() != o.getClass()) return false; DeleteDataFrameAnalyticsRequest other = (DeleteDataFrameAnalyticsRequest) o; - return Objects.equals(id, other.id) && Objects.equals(force, other.force); + return Objects.equals(id, other.id) + && Objects.equals(force, other.force) + && Objects.equals(timeout, other.timeout); } @Override public int hashCode() { - return Objects.hash(id, force); + return Objects.hash(id, force, timeout); } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java index bd552d5ff7de..83ceeb1a9b7a 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/MLRequestConvertersTests.java @@ -816,9 +816,21 @@ public class MLRequestConvertersTests extends ESTestCase { assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint()); assertNull(request.getEntity()); + assertThat(request.getParameters().size(), equalTo(1)); assertEquals(Boolean.toString(true), request.getParameters().get("force")); } + public void testDeleteDataFrameAnalytics_WithTimeout() { + DeleteDataFrameAnalyticsRequest deleteRequest = new DeleteDataFrameAnalyticsRequest(randomAlphaOfLength(10)); + deleteRequest.setTimeout(TimeValue.timeValueSeconds(10)); + Request request = MLRequestConverters.deleteDataFrameAnalytics(deleteRequest); + assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); + assertEquals("/_ml/data_frame/analytics/" + deleteRequest.getId(), request.getEndpoint()); + assertNull(request.getEntity()); + assertThat(request.getParameters().size(), equalTo(1)); + assertEquals(request.getParameters().get("timeout"), "10s"); + } + public void testEvaluateDataFrame() throws IOException { EvaluateDataFrameRequest evaluateRequest = EvaluateDataFrameRequestTests.createRandom(); Request request = MLRequestConverters.evaluateDataFrame(evaluateRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 96d993f3556d..018fa125c077 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -3090,9 +3090,10 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { DeleteDataFrameAnalyticsRequest request = new DeleteDataFrameAnalyticsRequest("my-analytics-config"); // <1> // end::delete-data-frame-analytics-request - //tag::delete-data-frame-analytics-request-force + //tag::delete-data-frame-analytics-request-options request.setForce(false); // <1> - //end::delete-data-frame-analytics-request-force + request.setTimeout(TimeValue.timeValueMinutes(1)); // <2> + //end::delete-data-frame-analytics-request-options // tag::delete-data-frame-analytics-execute AcknowledgedResponse response = client.machineLearning().deleteDataFrameAnalytics(request, RequestOptions.DEFAULT); diff --git a/docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc b/docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc index d57b37ff2174..2874414465fb 100644 --- a/docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc +++ b/docs/java-rest/high-level/ml/delete-data-frame-analytics.asciidoc @@ -27,10 +27,11 @@ The following arguments are optional: ["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- -include-tagged::{doc-tests-file}[{api}-request-force] +include-tagged::{doc-tests-file}[{api}-request-options] --------------------------------------------------- <1> Use to forcefully delete a job that is not stopped. This method is quicker than stopping and deleting the job. Defaults to `false`. +<2> Use to set the time to wait until the job is deleted. Defaults to 1 minute. include::../execution.asciidoc[] diff --git a/docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc b/docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc index 576f366b7dc2..8fdc5c7f59f3 100644 --- a/docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc +++ b/docs/reference/ml/df-analytics/apis/delete-dfanalytics.asciidoc @@ -44,6 +44,11 @@ include::{es-repo-dir}/ml/ml-shared.asciidoc[tag=job-id-data-frame-analytics] (Optional, boolean) If `true`, it deletes a job that is not stopped; this method is quicker than stopping and deleting the job. +`timeout`:: + (Optional, <>) The time to wait for the job to be deleted. + Defaults to 1 minute. + + [[ml-delete-dfanalytics-example]] ==== {api-examples-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java index 74a473ec6832..a8bd73733358 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/DeleteDataFrameAnalyticsAction.java @@ -15,11 +15,13 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsConfig; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.TimeUnit; public class DeleteDataFrameAnalyticsAction extends ActionType { @@ -33,6 +35,10 @@ public class DeleteDataFrameAnalyticsAction extends ActionType { public static final ParseField FORCE = new ParseField("force"); + public static final ParseField TIMEOUT = new ParseField("timeout"); + + // Default timeout matches that of delete by query + private static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); private String id; private boolean force; @@ -47,9 +53,12 @@ public class DeleteDataFrameAnalyticsAction extends ActionType listener) { - String id = request.getId(); TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); ParentTaskAssigningClient parentTaskClient = new ParentTaskAssigningClient(client, taskId); if (request.isForce()) { - forceDelete(parentTaskClient, id, listener); + forceDelete(parentTaskClient, request, listener); } else { - normalDelete(parentTaskClient, state, id, listener); + normalDelete(parentTaskClient, state, request, listener); } } - private void forceDelete(ParentTaskAssigningClient parentTaskClient, String id, + private void forceDelete(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request, ActionListener listener) { - logger.debug("[{}] Force deleting data frame analytics job", id); + logger.debug("[{}] Force deleting data frame analytics job", request.getId()); ActionListener stopListener = ActionListener.wrap( - stopResponse -> normalDelete(parentTaskClient, clusterService.state(), id, listener), + stopResponse -> normalDelete(parentTaskClient, clusterService.state(), request, listener), listener::onFailure ); - StopDataFrameAnalyticsAction.Request request = new StopDataFrameAnalyticsAction.Request(id); - request.setForce(true); - executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, request, stopListener); + stopJob(parentTaskClient, request, stopListener); } - private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, String id, - ActionListener listener) { + private void stopJob(ParentTaskAssigningClient parentTaskClient, DeleteDataFrameAnalyticsAction.Request request, + ActionListener listener) { + // We first try to stop the job normally. Normal stop returns after the job was stopped. + // If that fails then we proceed to force stopping which returns as soon as the persistent task is removed. + // If we just did force stopping, then there is a chance we proceed to delete the config while it's + // still used from the running task which results in logging errors. + + StopDataFrameAnalyticsAction.Request stopRequest = new StopDataFrameAnalyticsAction.Request(request.getId()); + stopRequest.setTimeout(request.timeout()); + + ActionListener normalStopListener = ActionListener.wrap( + listener::onResponse, + normalStopFailure -> { + stopRequest.setForce(true); + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, ActionListener.wrap( + listener::onResponse, + forceStopFailure -> { + logger.error(new ParameterizedMessage("[{}] Failed to stop normally", request.getId()), normalStopFailure); + logger.error(new ParameterizedMessage("[{}] Failed to stop forcefully", request.getId()), forceStopFailure); + listener.onFailure(forceStopFailure); + } + )); + } + ); + + executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, StopDataFrameAnalyticsAction.INSTANCE, stopRequest, normalStopListener); + } + + private void normalDelete(ParentTaskAssigningClient parentTaskClient, ClusterState state, + DeleteDataFrameAnalyticsAction.Request request, ActionListener listener) { + String id = request.getId(); PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); DataFrameAnalyticsState taskState = MlTasks.getDataFrameAnalyticsState(id, tasks); if (taskState != DataFrameAnalyticsState.STOPPED) { @@ -172,14 +200,14 @@ public class TransportDeleteDataFrameAnalyticsAction logger.warn("[{}] DBQ failure: {}", id, failure); } } - deleteStats(parentTaskClient, id, deleteStatsHandler); + deleteStats(parentTaskClient, id, request.timeout(), deleteStatsHandler); }, listener::onFailure ); // Step 2. Delete state ActionListener configListener = ActionListener.wrap( - config -> deleteState(parentTaskClient, config, deleteStateHandler), + config -> deleteState(parentTaskClient, config, request.timeout(), deleteStateHandler), listener::onFailure ); @@ -208,6 +236,7 @@ public class TransportDeleteDataFrameAnalyticsAction private void deleteState(ParentTaskAssigningClient parentTaskClient, DataFrameAnalyticsConfig config, + TimeValue timeout, ActionListener listener) { List ids = new ArrayList<>(); ids.add(StoredProgress.documentId(config.getId())); @@ -218,22 +247,25 @@ public class TransportDeleteDataFrameAnalyticsAction parentTaskClient, AnomalyDetectorsIndex.jobStateIndexPattern(), QueryBuilders.idsQuery().addIds(ids.toArray(String[]::new)), + timeout, listener ); } private void deleteStats(ParentTaskAssigningClient parentTaskClient, String jobId, + TimeValue timeout, ActionListener listener) { executeDeleteByQuery( parentTaskClient, MlStatsIndex.indexPattern(), QueryBuilders.termQuery(Fields.JOB_ID.getPreferredName(), jobId), + timeout, listener ); } - private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, + private void executeDeleteByQuery(ParentTaskAssigningClient parentTaskClient, String index, QueryBuilder query, TimeValue timeout, ActionListener listener) { DeleteByQueryRequest request = new DeleteByQueryRequest(index); request.setQuery(query); @@ -241,6 +273,7 @@ public class TransportDeleteDataFrameAnalyticsAction request.setSlices(AbstractBulkByScrollRequest.AUTO_SLICES); request.setAbortOnVersionConflict(false); request.setRefresh(true); + request.setTimeout(timeout); executeAsyncWithOrigin(parentTaskClient, ML_ORIGIN, DeleteByQueryAction.INSTANCE, request, listener); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java index 3b898e870ad4..925952eb02ad 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/dataframe/RestDeleteDataFrameAnalyticsAction.java @@ -37,6 +37,7 @@ public class RestDeleteDataFrameAnalyticsAction extends BaseRestHandler { String id = restRequest.param(DataFrameAnalyticsConfig.ID.getPreferredName()); DeleteDataFrameAnalyticsAction.Request request = new DeleteDataFrameAnalyticsAction.Request(id); request.setForce(restRequest.paramAsBoolean(DeleteDataFrameAnalyticsAction.Request.FORCE.getPreferredName(), request.isForce())); + request.timeout(restRequest.paramAsTime(DeleteDataFrameAnalyticsAction.Request.TIMEOUT.getPreferredName(), request.timeout())); return channel -> client.execute(DeleteDataFrameAnalyticsAction.INSTANCE, request, new RestToXContentListener<>(channel)); } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json index 4a077b375e77..7c3308c664d0 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ml.delete_data_frame_analytics.json @@ -26,6 +26,10 @@ "type":"boolean", "description":"True if the job should be forcefully deleted", "default":false + }, + "timeout":{ + "type":"time", + "description":"Controls the time to wait until a job is deleted. Defaults to 1 minute" } } }