[Transform] Add delete_dest_index parameter to the Delete Transform API (#94162)

This commit is contained in:
Przemysław Witek 2023-03-10 13:02:19 +01:00 committed by GitHub
parent ffc87137cf
commit a3f34a39c8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 587 additions and 93 deletions

View file

@ -0,0 +1,5 @@
pr: 94162
summary: Add `delete_destination_index` parameter to the `Delete Transform API`
area: Transform
type: enhancement
issues: []

View file

@ -36,6 +36,11 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id]
current state. The default value is `false`, meaning that the {transform} must be
`stopped` before it can be deleted.
`delete_dest_index`::
(Optional, Boolean) When `true`, the destination index is deleted together with
the {transform}. The default value is `false`, meaning that the destination
index will not be deleted.
`timeout`::
(Optional, time)
Period to wait for a response. If no response is received before the timeout

View file

@ -31,6 +31,11 @@
"required":false,
"description":"When `true`, the transform is deleted regardless of its current state. The default value is `false`, meaning that the transform must be `stopped` before it can be deleted."
},
"delete_dest_index":{
"type":"boolean",
"required":false,
"description":"When `true`, the destination index is deleted together with the transform. The default value is `false`, meaning that the destination index will not be deleted."
},
"timeout":{
"type":"time",
"required":false,

View file

@ -36,6 +36,7 @@ public final class TransformField {
public static final ParseField METADATA = new ParseField("_meta");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField DELETE_DEST_INDEX = new ParseField("delete_dest_index");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");

View file

@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -31,17 +32,24 @@ public class DeleteTransformAction extends ActionType<AcknowledgedResponse> {
public static class Request extends AcknowledgedRequest<Request> {
private final String id;
private final boolean force;
private final boolean deleteDestIndex;
public Request(String id, boolean force, TimeValue timeout) {
public Request(String id, boolean force, boolean deleteDestIndex, TimeValue timeout) {
super(timeout);
this.id = ExceptionsHelper.requireNonNull(id, TransformField.ID.getPreferredName());
this.force = force;
this.deleteDestIndex = deleteDestIndex;
}
public Request(StreamInput in) throws IOException {
super(in);
id = in.readString();
force = in.readBoolean();
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
deleteDestIndex = in.readBoolean();
} else {
deleteDestIndex = false;
}
}
public String getId() {
@ -52,11 +60,18 @@ public class DeleteTransformAction extends ActionType<AcknowledgedResponse> {
return force;
}
public boolean isDeleteDestIndex() {
return deleteDestIndex;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeBoolean(force);
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
out.writeBoolean(deleteDestIndex);
}
}
@Override
@ -67,7 +82,7 @@ public class DeleteTransformAction extends ActionType<AcknowledgedResponse> {
@Override
public int hashCode() {
// the base class does not implement hashCode, therefore we need to hash timeout ourselves
return Objects.hash(timeout(), id, force);
return Objects.hash(timeout(), id, force, deleteDestIndex);
}
@Override
@ -81,7 +96,10 @@ public class DeleteTransformAction extends ActionType<AcknowledgedResponse> {
}
Request other = (Request) obj;
// the base class does not implement equals, therefore we need to check timeout ourselves
return Objects.equals(id, other.id) && force == other.force && timeout().equals(other.timeout());
return Objects.equals(id, other.id)
&& force == other.force
&& deleteDestIndex == other.deleteDestIndex
&& timeout().equals(other.timeout());
}
}
}

View file

@ -15,7 +15,12 @@ import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction.Reque
public class DeleteTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Request createTestInstance() {
return new Request(randomAlphaOfLengthBetween(1, 20), randomBoolean(), TimeValue.parseTimeValue(randomTimeValue(), "timeout"));
return new Request(
randomAlphaOfLengthBetween(1, 20),
randomBoolean(),
randomBoolean(),
TimeValue.parseTimeValue(randomTimeValue(), "timeout")
);
}
@Override
@ -27,15 +32,17 @@ public class DeleteTransformActionRequestTests extends AbstractWireSerializingTe
protected Request mutateInstance(Request instance) {
String id = instance.getId();
boolean force = instance.isForce();
boolean deleteDestIndex = instance.isDeleteDestIndex();
TimeValue timeout = instance.timeout();
switch (between(0, 2)) {
switch (between(0, 3)) {
case 0 -> id += randomAlphaOfLengthBetween(1, 5);
case 1 -> force ^= true;
case 2 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
case 2 -> deleteDestIndex ^= true;
case 3 -> timeout = new TimeValue(timeout.duration() + randomLongBetween(1, 5), timeout.timeUnit());
default -> throw new AssertionError("Illegal randomization branch");
}
return new Request(id, force, timeout);
return new Request(id, force, deleteDestIndex, timeout);
}
}

View file

@ -0,0 +1,193 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.integration;
import org.apache.http.HttpHost;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.settings.Settings;
import org.junit.Before;
import java.io.IOException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
public class TransformDeleteIT extends TransformRestTestCase {
private static final String TEST_USER_NAME = "transform_user";
private static final String TEST_ADMIN_USER_NAME_1 = "transform_admin_1";
private static final String BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1 = basicAuthHeaderValue(
TEST_ADMIN_USER_NAME_1,
TEST_PASSWORD_SECURE_STRING
);
private static final String DATA_ACCESS_ROLE = "test_data_access";
private static boolean indicesCreated = false;
// preserve indices in order to reuse source indices in several test cases
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}
@Override
protected boolean enableWarningsCheck() {
return false;
}
@Override
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
RestClientBuilder builder = RestClient.builder(hosts);
configureClient(builder, settings);
builder.setStrictDeprecationMode(false);
return builder.build();
}
@Before
public void createIndexes() throws IOException {
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME);
setupUser(TEST_USER_NAME, Arrays.asList("transform_user", DATA_ACCESS_ROLE));
setupUser(TEST_ADMIN_USER_NAME_1, Arrays.asList("transform_admin", DATA_ACCESS_ROLE));
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
if (indicesCreated) {
return;
}
createReviewsIndex();
indicesCreated = true;
}
public void testDeleteDoesNotDeleteDestinationIndexByDefault() throws Exception {
String transformId = "transform-1";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
createTransform(transformId, transformDest);
assertFalse(indexExists(transformDest));
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));
deleteTransform(transformId);
assertTrue(indexExists(transformDest));
}
public void testDeleteWithParamDeletesAutoCreatedDestinationIndex() throws Exception {
String transformId = "transform-2";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
createTransform(transformId, transformDest);
assertFalse(indexExists(transformDest));
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));
deleteTransform(transformId, true);
assertFalse(indexExists(transformDest));
}
public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception {
String transformId = "transform-3";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
createIndex(transformDest);
assertTrue(indexExists(transformDest));
createTransform(transformId, transformDest);
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));
deleteTransform(transformId, true);
assertFalse(indexExists(transformDest));
}
public void testDeleteWithParamDoesNotDeleteAlias() throws Exception {
String transformId = "transform-4";
String transformDest = transformId + "_idx";
String transformDestAlias = transformId + "_alias";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }");
assertTrue(indexExists(transformDest));
assertTrue(indexExists(transformDestAlias));
createTransform(transformId, transformDestAlias);
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));
ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, true));
assertThat(
e.getMessage(),
containsString(
String.format(
Locale.ROOT,
"The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.",
transformDestAlias
)
)
);
}
private void createTransform(String transformId, String destIndex) throws IOException {
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = String.format(Locale.ROOT, """
{
"dest": {
"index": "%s"
},
"source": {
"index": "%s"
},
"pivot": {
"group_by": {
"reviewer": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"avg_rating": {
"avg": {
"field": "stars"
}
}
}
}
}""", destIndex, REVIEWS_INDEX_NAME);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
}
}

View file

@ -10,17 +10,22 @@ package org.elasticsearch.xpack.transform.integration;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.RestPutIndexTemplateAction;
import org.elasticsearch.xcontent.XContentBuilder;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -360,6 +365,83 @@ public class TransformPivotRestSpecialCasesIT extends TransformRestTestCase {
}
}
public void testDataStreamUnsupportedAsDestIndex() throws Exception {
String transformId = "transform-data-stream-unsupported-as-dest";
String sourceIndex = REVIEWS_INDEX_NAME;
String dataStreamIndexTemplate = transformId + "_it";
String destDataStream = transformId + "_ds";
// Create transform
final Request createTransformRequest = new Request("PUT", getTransformEndpoint() + transformId);
createTransformRequest.setJsonEntity(Strings.format("""
{
"source": {
"index": "%s"
},
"dest": {
"index": "%s"
},
"frequency": "1m",
"pivot": {
"group_by": {
"user_id": {
"terms": {
"field": "user_id"
}
}
},
"aggregations": {
"stars_sum": {
"sum": {
"field": "stars"
}
},
"bs": {
"bucket_selector": {
"buckets_path": {
"stars_sum": "stars_sum.value"
},
"script": "params.stars_sum > 20"
}
}
}
}
}""", sourceIndex, destDataStream));
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// Create index template for data stream
Request createIndexTemplateRequest = new Request("PUT", "_index_template/" + dataStreamIndexTemplate);
createIndexTemplateRequest.setJsonEntity(String.format(Locale.ROOT, """
{
"index_patterns": [ "%s*" ],
"data_stream": {}
}
""", destDataStream));
Response createIndexTemplateResponse = client().performRequest(createIndexTemplateRequest);
assertThat(createIndexTemplateResponse.getStatusLine().getStatusCode(), is(equalTo(RestStatus.OK.getStatus())));
// Create data stream
Request createDataStreamRequest = new Request("PUT", "_data_stream/" + destDataStream);
Response createDataStreamResponse = client().performRequest(createDataStreamRequest);
assertThat(createDataStreamResponse.getStatusLine().getStatusCode(), is(equalTo(RestStatus.OK.getStatus())));
// Try starting the transform, it fails because destination index cannot be created from the data stream template
ResponseException e = expectThrows(ResponseException.class, () -> startTransform(transformId));
assertThat(
e.getMessage(),
containsString(
String.format(
Locale.ROOT,
"cannot create index with name [%s], because it matches with template [%s] that creates data streams only, "
+ "use create data stream api instead",
destDataStream,
dataStreamIndexTemplate
)
)
);
}
private void verifyDestIndexHitsCount(String sourceIndex, String transformId, int maxPageSearchSize, long expectedDestIndexCount)
throws Exception {
String transformIndex = transformId;

View file

@ -69,9 +69,8 @@ public class TransformResetIT extends TransformRestTestCase {
indicesCreated = true;
}
@SuppressWarnings("unchecked")
public void testReset() throws Exception {
String transformId = "old_transform";
String transformId = "transform-1";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
@ -80,7 +79,60 @@ public class TransformResetIT extends TransformRestTestCase {
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = Strings.format("""
String config = createConfig(transformDest);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// Verify that reset works on a new transform
resetTransform(transformId, false);
// Start the transform
startTransform(transformId);
// Verify that reset doesn't work when the transform is running
ResponseException e = expectThrows(ResponseException.class, () -> resetTransform(transformId, false));
assertThat(e.getMessage(), containsString("Cannot reset transform [transform-1] as the task is running. Stop the task first"));
// Verify that reset with [force=true] works even when the transform is running
resetTransform(transformId, true);
// Start the transform again
startTransform(transformId);
// Verify that reset works on a stopped transform
stopTransform(transformId, false);
resetTransform(transformId, false);
}
public void testResetDeletesDestinationIndex() throws Exception {
String transformId = "transform-2";
String transformDest = transformId + "_idx";
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
final Request createTransformRequest = createRequestWithAuth(
"PUT",
getTransformEndpoint() + transformId,
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
);
String config = createConfig(transformDest);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
assertFalse(indexExists(transformDest));
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);
stopTransform(transformId, false);
assertTrue(indexExists(transformDest));
resetTransform(transformId, false);
assertFalse(indexExists(transformDest));
}
private static String createConfig(String transformDestIndex) {
return Strings.format("""
{
"dest": {
"index": "%s"
@ -104,29 +156,6 @@ public class TransformResetIT extends TransformRestTestCase {
}
}
}
}""", transformDest, REVIEWS_INDEX_NAME);
createTransformRequest.setJsonEntity(config);
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
// Verify that reset works on a new transform
resetTransform(transformId, false);
// Start the transform
startTransform(transformId);
// Verify that reset doesn't work when the transform is running
ResponseException e = expectThrows(ResponseException.class, () -> resetTransform(transformId, false));
assertThat(e.getMessage(), containsString("Cannot reset transform [old_transform] as the task is running. Stop the task first"));
// Verify that reset with [force=true] works even when the transform is running
resetTransform(transformId, true);
// Start the transform again
startTransform(transformId);
// Verify that reset works on a stopped transform
stopTransform(transformId, false);
resetTransform(transformId, false);
}""", transformDestIndex, REVIEWS_INDEX_NAME);
}
}

View file

@ -584,8 +584,15 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
}
protected static void deleteTransform(String transformId) throws IOException {
deleteTransform(transformId, false);
}
protected static void deleteTransform(String transformId, boolean deleteDestIndex) throws IOException {
Request request = new Request("DELETE", getTransformEndpoint() + transformId);
request.addParameter("ignore", "404"); // Ignore 404s because they imply someone was racing us to delete this
if (deleteDestIndex) {
request.addParameter(TransformField.DELETE_DEST_INDEX.getPreferredName(), Boolean.TRUE.toString());
}
adminClient().performRequest(request);
}
@ -611,7 +618,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
Map<?, ?> transformStatsAsMap = (Map<?, ?>) ((List<?>) entityAsMap(statsResponse).get("transforms")).get(0);
// assert that the transform did not fail
assertNotEquals("failed", XContentMapValues.extractValue("state", transformStatsAsMap));
assertNotEquals("Stats were: " + transformStatsAsMap, "failed", XContentMapValues.extractValue("state", transformStatsAsMap));
return (int) XContentMapValues.extractValue("checkpointing.last.checkpoint", transformStatsAsMap);
}
@ -623,7 +630,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
"indices": [
{
"names": [ %s ],
"privileges": [ "create_index", "read", "write", "view_index_metadata" ]
"privileges": [ "create_index", "delete_index", "read", "write", "view_index_metadata" ]
}
]
}""", indicesStr));

View file

@ -10,6 +10,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
@ -20,20 +22,26 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
import static org.elasticsearch.xpack.core.ClientHelper.executeWithHeadersAsync;
public class TransportDeleteTransformAction extends AcknowledgedTransportMasterNodeAction<Request> {
@ -70,6 +78,7 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN
@Override
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), task.getId());
final boolean transformIsRunning = TransformTask.getTransformTask(request.getId(), state) != null;
if (transformIsRunning && request.isForce() == false) {
listener.onFailure(
@ -81,8 +90,9 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN
return;
}
ActionListener<StopTransformAction.Response> stopTransformActionListener = ActionListener.wrap(
unusedStopResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
// <3> Delete transform config
ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(
unusedAcknowledgedResponse -> transformConfigManager.deleteTransform(request.getId(), ActionListener.wrap(r -> {
logger.debug("[{}] deleted transform", request.getId());
auditor.info(request.getId(), "Deleted transform.");
listener.onResponse(AcknowledgedResponse.of(r));
@ -90,12 +100,63 @@ public class TransportDeleteTransformAction extends AcknowledgedTransportMasterN
listener::onFailure
);
// <2> Delete destination index if requested
ActionListener<StopTransformAction.Response> stopTransformActionListener = ActionListener.wrap(unusedStopResponse -> {
if (request.isDeleteDestIndex()) {
deleteDestinationIndex(parentTaskId, request.getId(), request.timeout(), deleteDestIndexListener);
} else {
deleteDestIndexListener.onResponse(null);
}
}, listener::onFailure);
// <1> Stop transform if it's currently running
stopTransform(transformIsRunning, parentTaskId, request.getId(), request.timeout(), stopTransformActionListener);
}
private void stopTransform(
boolean transformIsRunning,
TaskId parentTaskId,
String transformId,
TimeValue timeout,
ActionListener<StopTransformAction.Response> listener
) {
if (transformIsRunning == false) {
stopTransformActionListener.onResponse(null);
listener.onResponse(null);
return;
}
StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(request.getId(), true, true, null, true, false);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener);
StopTransformAction.Request stopTransformRequest = new StopTransformAction.Request(transformId, true, true, timeout, true, false);
stopTransformRequest.setParentTask(parentTaskId);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, listener);
}
private void deleteDestinationIndex(
TaskId parentTaskId,
String transformId,
TimeValue timeout,
ActionListener<AcknowledgedResponse> listener
) {
// <2> Delete destination index
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
transformConfigAndVersion -> {
TransformConfig config = transformConfigAndVersion.v1();
String destIndex = config.getDestination().getIndex();
DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
deleteDestIndexRequest.timeout(timeout);
deleteDestIndexRequest.setParentTask(parentTaskId);
executeWithHeadersAsync(
config.getHeaders(),
TRANSFORM_ORIGIN,
client,
DeleteIndexAction.INSTANCE,
deleteDestIndexRequest,
listener
);
},
listener::onFailure
);
// <1> Fetch transform configuration
transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener);
}
@Override

View file

@ -102,9 +102,7 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
return;
}
final SetOnce<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> transformConfigAndVersionHolder = new SetOnce<>();
// <6> Reset transform
// <4> Reset transform
ActionListener<TransformUpdater.UpdateResult> updateTransformListener = ActionListener.wrap(
unusedUpdateResult -> transformConfigManager.resetTransform(request.getId(), ActionListener.wrap(resetResponse -> {
logger.debug("[{}] reset transform", request.getId());
@ -114,8 +112,9 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
listener::onFailure
);
// <5> Upgrade transform to the latest version
ActionListener<AcknowledgedResponse> deleteDestIndexListener = ActionListener.wrap(unusedDeleteDestIndexResponse -> {
// <3> Upgrade transform to the latest version
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> deleteDestIndexListener = ActionListener.wrap(
transformConfigAndVersion -> {
final ClusterState clusterState = clusterService.state();
TransformUpdater.updateTransform(
securityContext,
@ -124,45 +123,22 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
settings,
client,
transformConfigManager,
transformConfigAndVersionHolder.get().v1(),
transformConfigAndVersion.v1(),
TransformConfigUpdate.EMPTY,
transformConfigAndVersionHolder.get().v2(),
transformConfigAndVersion.v2(),
false, // defer validation
false, // dry run
false, // check access
request.timeout(),
updateTransformListener
);
}, listener::onFailure);
// <4> Delete destination index if it was created by transform.
ActionListener<Boolean> isDestinationIndexCreatedByTransformListener = ActionListener.wrap(isDestinationIndexCreatedByTransform -> {
if (isDestinationIndexCreatedByTransform == false) {
// Destination index was created outside of transform, we don't delete it and just move on.
deleteDestIndexListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String destIndex = transformConfigAndVersionHolder.get().v1().getDestination().getIndex();
DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteDestIndexRequest, deleteDestIndexListener);
}, listener::onFailure);
// <3> Check if the destination index was created by transform
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
transformConfigAndVersion -> {
transformConfigAndVersionHolder.set(transformConfigAndVersion);
String destIndex = transformConfigAndVersion.v1().getDestination().getIndex();
TransformIndex.isDestinationIndexCreatedByTransform(client, destIndex, isDestinationIndexCreatedByTransformListener);
},
listener::onFailure
);
// <2> Fetch transform configuration
// <2> Delete destination index if it was created by the transform
ActionListener<StopTransformAction.Response> stopTransformActionListener = ActionListener.wrap(
unusedStopResponse -> transformConfigManager.getTransformConfigurationForUpdate(
request.getId(),
getTransformConfigurationListener
),
unusedStopResponse -> deleteDestinationIndexIfCreatedByTheTransform(request.getId(), deleteDestIndexListener),
listener::onFailure
);
@ -175,6 +151,44 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, StopTransformAction.INSTANCE, stopTransformRequest, stopTransformActionListener);
}
private void deleteDestinationIndexIfCreatedByTheTransform(
String transformId,
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> listener
) {
final SetOnce<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> transformConfigAndVersionHolder = new SetOnce<>();
// <4> Send the fetched config to the caller
ActionListener<AcknowledgedResponse> finalListener = ActionListener.wrap(
unusedDeleteIndexResponse -> listener.onResponse(transformConfigAndVersionHolder.get()),
listener::onFailure
);
// <3> Delete destination index if it was created by transform
ActionListener<Boolean> isDestinationIndexCreatedByTransformListener = ActionListener.wrap(isDestinationIndexCreatedByTransform -> {
if (isDestinationIndexCreatedByTransform == false) {
// Destination index was created outside of transform, we don't delete it and just move on.
finalListener.onResponse(AcknowledgedResponse.TRUE);
return;
}
String destIndex = transformConfigAndVersionHolder.get().v1().getDestination().getIndex();
DeleteIndexRequest deleteDestIndexRequest = new DeleteIndexRequest(destIndex);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteDestIndexRequest, finalListener);
}, listener::onFailure);
// <2> Check if the destination index was created by transform
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> getTransformConfigurationListener = ActionListener.wrap(
transformConfigAndVersion -> {
transformConfigAndVersionHolder.set(transformConfigAndVersion);
String destIndex = transformConfigAndVersion.v1().getDestination().getIndex();
TransformIndex.isDestinationIndexCreatedByTransform(client, destIndex, isDestinationIndexCreatedByTransformListener);
},
listener::onFailure
);
// <1> Fetch transform configuration
transformConfigManager.getTransformConfigurationForUpdate(transformId, getTransformConfigurationListener);
}
@Override
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);

View file

@ -34,9 +34,10 @@ public class RestDeleteTransformAction extends BaseRestHandler {
String id = restRequest.param(TransformField.ID.getPreferredName());
boolean force = restRequest.paramAsBoolean(TransformField.FORCE.getPreferredName(), false);
boolean deleteDestIndex = restRequest.paramAsBoolean(TransformField.DELETE_DEST_INDEX.getPreferredName(), false);
TimeValue timeout = restRequest.paramAsTime(TransformField.TIMEOUT.getPreferredName(), AcknowledgedRequest.DEFAULT_ACK_TIMEOUT);
DeleteTransformAction.Request request = new DeleteTransformAction.Request(id, force, timeout);
DeleteTransformAction.Request request = new DeleteTransformAction.Request(id, force, deleteDestIndex, timeout);
return channel -> client.execute(DeleteTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View file

@ -9,20 +9,47 @@ package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.core.transform.action.DeleteTransformAction;
import org.junit.After;
import org.junit.Before;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class RestDeleteTransformActionTests extends ESTestCase {
private static final TimeValue DEFAULT_TIMEOUT = TimeValue.timeValueSeconds(30);
private final RestDeleteTransformAction handler = new RestDeleteTransformAction();
private RestChannel channel;
private NodeClient client;
@Before
public void initializeMocks() {
channel = mock(RestChannel.class);
client = mock(NodeClient.class);
}
@After
public void verifyNoMoreInteractionsWithClient() {
verifyNoMoreInteractions(client);
}
public void testBodyRejection() throws Exception {
final RestDeleteTransformAction handler = new RestDeleteTransformAction();
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();
{
@ -33,12 +60,51 @@ public class RestDeleteTransformActionTests extends ESTestCase {
new BytesArray(builder.toString()),
XContentType.JSON
).build();
IllegalArgumentException e = expectThrows(
IllegalArgumentException.class,
() -> handler.prepareRequest(request, mock(NodeClient.class))
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> handler.prepareRequest(request, client));
assertThat(e.getMessage(), equalTo("delete transform requests can not have a request body"));
}
}
public void testDefaults() throws Exception {
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(Map.of("id", "my-id")).build();
handler.handleRequest(request, channel, client);
DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", false, false, DEFAULT_TIMEOUT);
verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any());
}
public void testForce() throws Exception {
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(
Map.of("id", "my-id", "force", "true")
).build();
handler.handleRequest(request, channel, client);
DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", true, false, DEFAULT_TIMEOUT);
verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any());
}
public void testDeleteDestIndex() throws Exception {
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(
Map.of("id", "my-id", "delete_dest_index", "true")
).build();
handler.handleRequest(request, channel, client);
DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request("my-id", false, true, DEFAULT_TIMEOUT);
verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any());
}
public void testTimeout() throws Exception {
FakeRestRequest request = new FakeRestRequest.Builder(NamedXContentRegistry.EMPTY).withParams(
Map.of("id", "my-id", "timeout", "45s")
).build();
handler.handleRequest(request, channel, client);
DeleteTransformAction.Request expectedActionRequest = new DeleteTransformAction.Request(
"my-id",
false,
false,
TimeValue.timeValueSeconds(45)
);
verify(client).execute(eq(DeleteTransformAction.INSTANCE), eq(expectedActionRequest), any());
}
}