diff --git a/docs/changelog/94162.yaml b/docs/changelog/94162.yaml new file mode 100644 index 000000000000..69048419cf96 --- /dev/null +++ b/docs/changelog/94162.yaml @@ -0,0 +1,5 @@ +pr: 94162 +summary: Add `delete_destination_index` parameter to the `Delete Transform API` +area: Transform +type: enhancement +issues: [] diff --git a/docs/reference/transform/apis/delete-transform.asciidoc b/docs/reference/transform/apis/delete-transform.asciidoc index ae89fba19ea3..9a097407749a 100644 --- a/docs/reference/transform/apis/delete-transform.asciidoc +++ b/docs/reference/transform/apis/delete-transform.asciidoc @@ -36,6 +36,11 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id] current state. The default value is `false`, meaning that the {transform} must be `stopped` before it can be deleted. +`delete_dest_index`:: +(Optional, Boolean) When `true`, the destination index is deleted together with +the {transform}. The default value is `false`, meaning that the destination +index will not be deleted. + `timeout`:: (Optional, time) Period to wait for a response. If no response is received before the timeout diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/transform.delete_transform.json b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.delete_transform.json index 124efcff17ab..1af53ec0e8c3 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/transform.delete_transform.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/transform.delete_transform.json @@ -31,6 +31,11 @@ "required":false, "description":"When `true`, the transform is deleted regardless of its current state. The default value is `false`, meaning that the transform must be `stopped` before it can be deleted." }, + "delete_dest_index":{ + "type":"boolean", + "required":false, + "description":"When `true`, the destination index is deleted together with the transform. The default value is `false`, meaning that the destination index will not be deleted." + }, "timeout":{ "type":"time", "required":false, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index 709ec58935f5..61018b790c30 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -36,6 +36,7 @@ public final class TransformField { public static final ParseField METADATA = new ParseField("_meta"); public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); + public static final ParseField DELETE_DEST_INDEX = new ParseField("delete_dest_index"); public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second"); public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformAction.java index 05661bd28f39..96c3312dfeb0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformAction.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.core.transform.action; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -31,17 +32,24 @@ public class DeleteTransformAction extends ActionType { public static class Request extends AcknowledgedRequest { private final String id; private final boolean force; + private final boolean deleteDestIndex; - public Request(String id, boolean force, TimeValue timeout) { + public Request(String id, boolean force, boolean deleteDestIndex, TimeValue timeout) { super(timeout); this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName()); this.force = force; + this.deleteDestIndex = deleteDestIndex; } public Request(StreamInput in) throws IOException { super(in); id = in.readString(); force = in.readBoolean(); + if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + deleteDestIndex = in.readBoolean(); + } else { + deleteDestIndex = false; + } } public String getId() { @@ -52,11 +60,18 @@ public class DeleteTransformAction extends ActionType { return force; } + public boolean isDeleteDestIndex() { + return deleteDestIndex; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeString(id); out.writeBoolean(force); + if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) { + out.writeBoolean(deleteDestIndex); + } } @Override @@ -67,7 +82,7 @@ public class DeleteTransformAction extends ActionType { @Override public int hashCode() { // the base class does not implement hashCode, therefore we need to hash timeout ourselves - return Objects.hash(timeout(), id, force); + return Objects.hash(timeout(), id, force, deleteDestIndex); } @Override @@ -81,7 +96,10 @@ public class DeleteTransformAction extends ActionType { } Request other = (Request) obj; // the base class does not implement equals, therefore we need to check timeout ourselves - return Objects.equals(id, other.id) && force == other.force && timeout().equals(other.timeout()); + return Objects.equals(id, other.id) + && force == other.force + && deleteDestIndex == other.deleteDestIndex + && timeout().equals(other.timeout()); } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformActionRequestTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformActionRequestTests.java index e0fcd471e28e..dfbd5009932e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformActionRequestTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/action/DeleteTransformActionRequestTests.java @@ -15,7 +15,12 @@ import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction.Reque public class DeleteTransformActionRequestTests extends AbstractWireSerializingTestCase { @Override protected Request createTestInstance() { - return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), TimeValue.parseTimeValue(randomTimeValue(), "timeout")); + return new Request( + randomAlphaOfLengthBetween(1, 20), + randomBoolean(), + randomBoolean(), + TimeValue.parseTimeValue(randomTimeValue(), "timeout") + ); } @Override @@ -27,15 +32,17 @@ public class DeleteTransformActionRequestTests extends AbstractWireSerializingTe protected Request mutateInstance(Request instance) { String id = instance.getId(); boolean force = instance.isForce(); + boolean deleteDestIndex = instance.isDeleteDestIndex(); TimeValue timeout = instance.timeout(); - switch (between(0, 2)) { + switch (between(0, 3)) { case 0 -> id += randomAlphaOfLengthBetween(1, 5); case 1 -> force ^= true; - case 2 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit()); + case 2 -> deleteDestIndex ^= true; + case 3 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit()); default -> throw new AssertionError("Illegal randomization branch"); } - return new Request(id, force, timeout); + return new Request(id, force, deleteDestIndex, timeout); } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDeleteIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDeleteIT.java new file mode 100644 index 000000000000..4463b335a753 --- /dev/null +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDeleteIT.java @@ -0,0 +1,193 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.transform.integration; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.ResponseException; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.common.settings.Settings; +import org.junit.Before; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class TransformDeleteIT extends TransformRestTestCase { + + private static final String TEST_USER_NAME = "transform_user"; + private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1"; + private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue( + TEST_ADMIN_USER_NAME_1, + TEST_PASSWORD_SECURE_STRING + ); + private static final String DATA_ACCESS_ROLE = "test_data_access"; + + private static boolean indicesCreated = false; + + // preserve indices in order to reuse source indices in several test cases + @Override + protected boolean preserveIndicesUponCompletion() { + return true; + } + + @Override + protected boolean enableWarningsCheck() { + return false; + } + + @Override + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); + configureClient(builder, settings); + builder.setStrictDeprecationMode(false); + return builder.build(); + } + + @Before + public void createIndexes() throws IOException { + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME); + setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE)); + setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE)); + + // it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack + if (indicesCreated) { + return; + } + + createReviewsIndex(); + indicesCreated = true; + } + + public void testDeleteDoesNotDeleteDestinationIndexByDefault() throws Exception { + String transformId = "transform-1"; + String transformDest = transformId + "_idx"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); + + createTransform(transformId, transformDest); + assertFalse(indexExists(transformDest)); + + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + stopTransform(transformId, false); + assertTrue(indexExists(transformDest)); + + deleteTransform(transformId); + assertTrue(indexExists(transformDest)); + } + + public void testDeleteWithParamDeletesAutoCreatedDestinationIndex() throws Exception { + String transformId = "transform-2"; + String transformDest = transformId + "_idx"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); + + createTransform(transformId, transformDest); + assertFalse(indexExists(transformDest)); + + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + + stopTransform(transformId, false); + assertTrue(indexExists(transformDest)); + + deleteTransform(transformId, true); + assertFalse(indexExists(transformDest)); + } + + public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception { + String transformId = "transform-3"; + String transformDest = transformId + "_idx"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); + + createIndex(transformDest); + assertTrue(indexExists(transformDest)); + + createTransform(transformId, transformDest); + + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + + stopTransform(transformId, false); + assertTrue(indexExists(transformDest)); + + deleteTransform(transformId, true); + assertFalse(indexExists(transformDest)); + } + + public void testDeleteWithParamDoesNotDeleteAlias() throws Exception { + String transformId = "transform-4"; + String transformDest = transformId + "_idx"; + String transformDestAlias = transformId + "_alias"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias); + + createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }"); + assertTrue(indexExists(transformDest)); + assertTrue(indexExists(transformDestAlias)); + + createTransform(transformId, transformDestAlias); + + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + + stopTransform(transformId, false); + assertTrue(indexExists(transformDest)); + + ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, true)); + assertThat( + e.getMessage(), + containsString( + String.format( + Locale.ROOT, + "The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.", + transformDestAlias + ) + ) + ); + } + + private void createTransform(String transformId, String destIndex) throws IOException { + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 + ); + String config = String.format(Locale.ROOT, """ + { + "dest": { + "index": "%s" + }, + "source": { + "index": "%s" + }, + "pivot": { + "group_by": { + "reviewer": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "avg_rating": { + "avg": { + "field": "stars" + } + } + } + } + }""", destIndex, REVIEWS_INDEX_NAME); + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + } +} diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java index d3dbc2d53a59..86e386e78c6a 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformPivotRestSpecialCasesIT.java @@ -10,17 +10,22 @@ package org.elasticsearch.xpack.transform.integration; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction; import org.elasticsearch.xcontent.XContentBuilder; import org.junit.Before; import java.io.IOException; import java.util.List; +import java.util.Locale; import java.util.Map; import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -360,6 +365,83 @@ public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase { } } + public void testDataStreamUnsupportedAsDestIndex() throws Exception { + String transformId = "transform-data-stream-unsupported-as-dest"; + String sourceIndex = REVIEWS_INDEX_NAME; + String dataStreamIndexTemplate = transformId + "_it"; + String destDataStream = transformId + "_ds"; + + // Create transform + final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId); + createTransformRequest.setJsonEntity(Strings.format(""" + { + "source": { + "index": "%s" + }, + "dest": { + "index": "%s" + }, + "frequency": "1m", + "pivot": { + "group_by": { + "user_id": { + "terms": { + "field": "user_id" + } + } + }, + "aggregations": { + "stars_sum": { + "sum": { + "field": "stars" + } + }, + "bs": { + "bucket_selector": { + "buckets_path": { + "stars_sum": "stars_sum.value" + }, + "script": "params.stars_sum > 20" + } + } + } + } + }""", sourceIndex, destDataStream)); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // Create index template for data stream + Request createIndexTemplateRequest = new Request("PUT", "_index_template/" + dataStreamIndexTemplate); + createIndexTemplateRequest.setJsonEntity(String.format(Locale.ROOT, """ + { + "index_patterns": [ "%s*" ], + "data_stream": {} + } + """, destDataStream)); + Response createIndexTemplateResponse = client().performRequest(createIndexTemplateRequest); + assertThat(createIndexTemplateResponse.getStatusLine().getStatusCode(), is(equalTo(RestStatus.OK.getStatus()))); + + // Create data stream + Request createDataStreamRequest = new Request("PUT", "_data_stream/" + destDataStream); + Response createDataStreamResponse = client().performRequest(createDataStreamRequest); + assertThat(createDataStreamResponse.getStatusLine().getStatusCode(), is(equalTo(RestStatus.OK.getStatus()))); + + // Try starting the transform, it fails because destination index cannot be created from the data stream template + ResponseException e = expectThrows(ResponseException.class, () -> startTransform(transformId)); + assertThat( + e.getMessage(), + containsString( + String.format( + Locale.ROOT, + "cannot create index with name [%s], because it matches with template [%s] that creates data streams only, " + + "use create data stream api instead", + destDataStream, + dataStreamIndexTemplate + ) + ) + ); + } + private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount) throws Exception { String transformIndex = transformId; diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformResetIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformResetIT.java index caa4fc8d0b0a..dd7ad718812a 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformResetIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformResetIT.java @@ -69,9 +69,8 @@ public class TransformResetIT extends TransformRestTestCase { indicesCreated = true; } - @SuppressWarnings("unchecked") public void testReset() throws Exception { - String transformId = "old_transform"; + String transformId = "transform-1"; String transformDest = transformId + "_idx"; setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); @@ -80,7 +79,60 @@ public class TransformResetIT extends TransformRestTestCase { getTransformEndpoint() + transformId, BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 ); - String config = Strings.format(""" + String config = createConfig(transformDest); + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // Verify that reset works on a new transform + resetTransform(transformId, false); + + // Start the transform + startTransform(transformId); + + // Verify that reset doesn't work when the transform is running + ResponseException e = expectThrows(ResponseException.class, () -> resetTransform(transformId, false)); + assertThat(e.getMessage(), containsString("Cannot reset transform [transform-1] as the task is running. Stop the task first")); + + // Verify that reset with [force=true] works even when the transform is running + resetTransform(transformId, true); + + // Start the transform again + startTransform(transformId); + + // Verify that reset works on a stopped transform + stopTransform(transformId, false); + resetTransform(transformId, false); + } + + public void testResetDeletesDestinationIndex() throws Exception { + String transformId = "transform-2"; + String transformDest = transformId + "_idx"; + setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest); + + final Request createTransformRequest = createRequestWithAuth( + "PUT", + getTransformEndpoint() + transformId, + BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 + ); + String config = createConfig(transformDest); + createTransformRequest.setJsonEntity(config); + Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); + assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + assertFalse(indexExists(transformDest)); + + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + stopTransform(transformId, false); + assertTrue(indexExists(transformDest)); + + resetTransform(transformId, false); + assertFalse(indexExists(transformDest)); + } + + private static String createConfig(String transformDestIndex) { + return Strings.format(""" { "dest": { "index": "%s" @@ -104,29 +156,6 @@ public class TransformResetIT extends TransformRestTestCase { } } } - }""", transformDest, REVIEWS_INDEX_NAME); - createTransformRequest.setJsonEntity(config); - Map createTransformResponse = entityAsMap(client().performRequest(createTransformRequest)); - assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE)); - - // Verify that reset works on a new transform - resetTransform(transformId, false); - - // Start the transform - startTransform(transformId); - - // Verify that reset doesn't work when the transform is running - ResponseException e = expectThrows(ResponseException.class, () -> resetTransform(transformId, false)); - assertThat(e.getMessage(), containsString("Cannot reset transform [old_transform] as the task is running. Stop the task first")); - - // Verify that reset with [force=true] works even when the transform is running - resetTransform(transformId, true); - - // Start the transform again - startTransform(transformId); - - // Verify that reset works on a stopped transform - stopTransform(transformId, false); - resetTransform(transformId, false); + }""", transformDestIndex, REVIEWS_INDEX_NAME); } } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 5e741bd662e8..012b35a28afc 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -584,8 +584,15 @@ public abstract class TransformRestTestCase extends ESRestTestCase { } protected static void deleteTransform(String transformId) throws IOException { + deleteTransform(transformId, false); + } + + protected static void deleteTransform(String transformId, boolean deleteDestIndex) throws IOException { Request request = new Request("DELETE", getTransformEndpoint() + transformId); request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this + if (deleteDestIndex) { + request.addParameter(TransformField.DELETE_DEST_INDEX.getPreferredName(), Boolean.TRUE.toString()); + } adminClient().performRequest(request); } @@ -611,7 +618,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { Map transformStatsAsMap = (Map) ((List) entityAsMap(statsResponse).get("transforms")).get(0); // assert that the transform did not fail - assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap)); + assertNotEquals("Stats were: " + transformStatsAsMap, "failed", XContentMapValues.extractValue("state", transformStatsAsMap)); return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap); } @@ -623,7 +630,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase { "indices": [ { "names": [ %s ], - "privileges": [ "create_index", "read", "write", "view_index_metadata" ] + "privileges": [ "create_index", "delete_index", "read", "write", "view_index_metadata" ] } ] }""", indicesStr)); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java index 7d585868d09f..52a2bfc01119 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportDeleteTransformAction.java @@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction; @@ -20,20 +22,26 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.core.Tuple; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction.Request; import org.elasticsearch.xpack.core.transform.action.StopTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; +import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; import org.elasticsearch.xpack.transform.persistence.TransformConfigManager; import org.elasticsearch.xpack.transform.transforms.TransformTask; import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN; import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin; +import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync; public class TransportDeleteTransformAction extends AcknowledgedTransportMasterNodeAction { @@ -70,6 +78,7 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN @Override protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { + final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId()); final boolean transformIsRunning = TransformTask.getTransformTask(request.getId(), state) != null; if (transformIsRunning && request.isForce() == false) { listener.onFailure( @@ -81,8 +90,9 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN return; } - ActionListener stopTransformActionListener = ActionListener.wrap( - unusedStopResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { + // <3> Delete transform config + ActionListener deleteDestIndexListener = ActionListener.wrap( + unusedAcknowledgedResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> { logger.debug("[{}] deleted transform", request.getId()); auditor.info(request.getId(), "Deleted transform."); listener.onResponse(AcknowledgedResponse.of(r)); @@ -90,12 +100,63 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN listener::onFailure ); + // <2> Delete destination index if requested + ActionListener stopTransformActionListener = ActionListener.wrap(unusedStopResponse -> { + if (request.isDeleteDestIndex()) { + deleteDestinationIndex(parentTaskId, request.getId(), request.timeout(), deleteDestIndexListener); + } else { + deleteDestIndexListener.onResponse(null); + } + }, listener::onFailure); + + // <1> Stop transform if it's currently running + stopTransform(transformIsRunning, parentTaskId, request.getId(), request.timeout(), stopTransformActionListener); + } + + private void stopTransform( + boolean transformIsRunning, + TaskId parentTaskId, + String transformId, + TimeValue timeout, + ActionListener listener + ) { if (transformIsRunning == false) { - stopTransformActionListener.onResponse(null); + listener.onResponse(null); return; } - StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(request.getId(), true, true, null, true, false); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener); + StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(transformId, true, true, timeout, true, false); + stopTransformRequest.setParentTask(parentTaskId); + executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, listener); + } + + private void deleteDestinationIndex( + TaskId parentTaskId, + String transformId, + TimeValue timeout, + ActionListener listener + ) { + // <2> Delete destination index + ActionListener> getTransformConfigurationListener = ActionListener.wrap( + transformConfigAndVersion -> { + TransformConfig config = transformConfigAndVersion.v1(); + String destIndex = config.getDestination().getIndex(); + DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex); + deleteDestIndexRequest.timeout(timeout); + deleteDestIndexRequest.setParentTask(parentTaskId); + executeWithHeadersAsync( + config.getHeaders(), + TRANSFORM_ORIGIN, + client, + DeleteIndexAction.INSTANCE, + deleteDestIndexRequest, + listener + ); + }, + listener::onFailure + ); + + // <1> Fetch transform configuration + transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener); } @Override diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java index 354af554826b..f0e8855913f4 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportResetTransformAction.java @@ -102,9 +102,7 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo return; } - final SetOnce> transformConfigAndVersionHolder = new SetOnce<>(); - - // <6> Reset transform + // <4> Reset transform ActionListener updateTransformListener = ActionListener.wrap( unusedUpdateResult -> transformConfigManager.resetTransform(request.getId(), ActionListener.wrap(resetResponse -> { logger.debug("[{}] reset transform", request.getId()); @@ -114,55 +112,33 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo listener::onFailure ); - // <5> Upgrade transform to the latest version - ActionListener deleteDestIndexListener = ActionListener.wrap(unusedDeleteDestIndexResponse -> { - final ClusterState clusterState = clusterService.state(); - TransformUpdater.updateTransform( - securityContext, - indexNameExpressionResolver, - clusterState, - settings, - client, - transformConfigManager, - transformConfigAndVersionHolder.get().v1(), - TransformConfigUpdate.EMPTY, - transformConfigAndVersionHolder.get().v2(), - false, // defer validation - false, // dry run - false, // check access - request.timeout(), - updateTransformListener - ); - }, listener::onFailure); - - // <4> Delete destination index if it was created by transform. - ActionListener isDestinationIndexCreatedByTransformListener = ActionListener.wrap(isDestinationIndexCreatedByTransform -> { - if (isDestinationIndexCreatedByTransform == false) { - // Destination index was created outside of transform, we don't delete it and just move on. - deleteDestIndexListener.onResponse(AcknowledgedResponse.TRUE); - return; - } - String destIndex = transformConfigAndVersionHolder.get().v1().getDestination().getIndex(); - DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex); - executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteDestIndexRequest, deleteDestIndexListener); - }, listener::onFailure); - - // <3> Check if the destination index was created by transform - ActionListener> getTransformConfigurationListener = ActionListener.wrap( + // <3> Upgrade transform to the latest version + ActionListener> deleteDestIndexListener = ActionListener.wrap( transformConfigAndVersion -> { - transformConfigAndVersionHolder.set(transformConfigAndVersion); - String destIndex = transformConfigAndVersion.v1().getDestination().getIndex(); - TransformIndex.isDestinationIndexCreatedByTransform(client, destIndex, isDestinationIndexCreatedByTransformListener); + final ClusterState clusterState = clusterService.state(); + TransformUpdater.updateTransform( + securityContext, + indexNameExpressionResolver, + clusterState, + settings, + client, + transformConfigManager, + transformConfigAndVersion.v1(), + TransformConfigUpdate.EMPTY, + transformConfigAndVersion.v2(), + false, // defer validation + false, // dry run + false, // check access + request.timeout(), + updateTransformListener + ); }, listener::onFailure ); - // <2> Fetch transform configuration + // <2> Delete destination index if it was created by the transform ActionListener stopTransformActionListener = ActionListener.wrap( - unusedStopResponse -> transformConfigManager.getTransformConfigurationForUpdate( - request.getId(), - getTransformConfigurationListener - ), + unusedStopResponse -> deleteDestinationIndexIfCreatedByTheTransform(request.getId(), deleteDestIndexListener), listener::onFailure ); @@ -175,6 +151,44 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener); } + private void deleteDestinationIndexIfCreatedByTheTransform( + String transformId, + ActionListener> listener + ) { + final SetOnce> transformConfigAndVersionHolder = new SetOnce<>(); + + // <4> Send the fetched config to the caller + ActionListener finalListener = ActionListener.wrap( + unusedDeleteIndexResponse -> listener.onResponse(transformConfigAndVersionHolder.get()), + listener::onFailure + ); + + // <3> Delete destination index if it was created by transform + ActionListener isDestinationIndexCreatedByTransformListener = ActionListener.wrap(isDestinationIndexCreatedByTransform -> { + if (isDestinationIndexCreatedByTransform == false) { + // Destination index was created outside of transform, we don't delete it and just move on. + finalListener.onResponse(AcknowledgedResponse.TRUE); + return; + } + String destIndex = transformConfigAndVersionHolder.get().v1().getDestination().getIndex(); + DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex); + executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteDestIndexRequest, finalListener); + }, listener::onFailure); + + // <2> Check if the destination index was created by transform + ActionListener> getTransformConfigurationListener = ActionListener.wrap( + transformConfigAndVersion -> { + transformConfigAndVersionHolder.set(transformConfigAndVersion); + String destIndex = transformConfigAndVersion.v1().getDestination().getIndex(); + TransformIndex.isDestinationIndexCreatedByTransform(client, destIndex, isDestinationIndexCreatedByTransformListener); + }, + listener::onFailure + ); + + // <1> Fetch transform configuration + transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener); + } + @Override protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformAction.java index 74297a76083f..1f0889874482 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformAction.java @@ -34,9 +34,10 @@ public class RestDeleteTransformAction extends BaseRestHandler { String id = restRequest.param(TransformField.ID.getPreferredName()); boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false); + boolean deleteDestIndex = restRequest.paramAsBoolean(TransformField.DELETE_DEST_INDEX.getPreferredName(), false); TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT); - DeleteTransformAction.Request request = new DeleteTransformAction.Request(id, force, timeout); + DeleteTransformAction.Request request = new DeleteTransformAction.Request(id, force, deleteDestIndex, timeout); return channel -> client.execute(DeleteTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformActionTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformActionTests.java index 328e7421e30a..429366baa8a2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformActionTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/rest/action/RestDeleteTransformActionTests.java @@ -9,20 +9,47 @@ package org.elasticsearch.xpack.transform.rest.action; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.rest.RestChannel; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction; +import org.junit.After; +import org.junit.Before; + +import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class RestDeleteTransformActionTests extends ESTestCase { + private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(30); + + private final RestDeleteTransformAction handler = new RestDeleteTransformAction(); + private RestChannel channel; + private NodeClient client; + + @Before + public void initializeMocks() { + channel = mock(RestChannel.class); + client = mock(NodeClient.class); + } + + @After + public void verifyNoMoreInteractionsWithClient() { + verifyNoMoreInteractions(client); + } + public void testBodyRejection() throws Exception { - final RestDeleteTransformAction handler = new RestDeleteTransformAction(); try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); { @@ -33,12 +60,51 @@ public class RestDeleteTransformActionTests extends ESTestCase { new BytesArray(builder.toString()), XContentType.JSON ).build(); - IllegalArgumentException e = expectThrows( - IllegalArgumentException.class, - () -> handler.prepareRequest(request, mock(NodeClient.class)) - ); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> handler.prepareRequest(request, client)); assertThat(e.getMessage(), equalTo("delete transform requests can not have a request body")); } } + public void testDefaults() throws Exception { + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of("id", "my-id")).build(); + handler.handleRequest(request, channel, client); + + DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", false, false, DEFAULT_TIMEOUT); + verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any()); + } + + public void testForce() throws Exception { + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams( + Map.of("id", "my-id", "force", "true") + ).build(); + handler.handleRequest(request, channel, client); + + DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", true, false, DEFAULT_TIMEOUT); + verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any()); + } + + public void testDeleteDestIndex() throws Exception { + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams( + Map.of("id", "my-id", "delete_dest_index", "true") + ).build(); + handler.handleRequest(request, channel, client); + + DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", false, true, DEFAULT_TIMEOUT); + verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any()); + } + + public void testTimeout() throws Exception { + FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams( + Map.of("id", "my-id", "timeout", "45s") + ).build(); + handler.handleRequest(request, channel, client); + + DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request( + "my-id", + false, + false, + TimeValue.timeValueSeconds(45) + ); + verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any()); + } }