[ML][Data Frame] Add optional defer_validation param to PUT (#44455)

* [ML][Data Frame] Add optional defer_validation param to PUT

* addressing PR comments

* reverting bad replace

* addressing pr comments

* Update put-transform.asciidoc

* Update put-transform.asciidoc

* Update put-transform.asciidoc
This commit is contained in:
Benjamin Trent 2019-07-22 09:07:53 -05:00 committed by GitHub
parent 73f8f1f46f
commit a9cc0e1520
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 321 additions and 122 deletions

View file

@ -39,6 +39,7 @@ import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TY
import static org.elasticsearch.client.RequestConverters.createEntity; import static org.elasticsearch.client.RequestConverters.createEntity;
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE; import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH; import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION;
final class DataFrameRequestConverters { final class DataFrameRequestConverters {
@ -51,6 +52,9 @@ final class DataFrameRequestConverters {
.build(); .build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint); Request request = new Request(HttpPut.METHOD_NAME, endpoint);
request.setEntity(createEntity(putRequest, REQUEST_BODY_CONTENT_TYPE)); request.setEntity(createEntity(putRequest, REQUEST_BODY_CONTENT_TYPE));
if (putRequest.getDeferValidation() != null) {
request.addParameter(DEFER_VALIDATION, Boolean.toString(putRequest.getDeferValidation()));
}
return request; return request;
} }

View file

@ -31,7 +31,9 @@ import java.util.Optional;
public class PutDataFrameTransformRequest implements ToXContentObject, Validatable { public class PutDataFrameTransformRequest implements ToXContentObject, Validatable {
public static final String DEFER_VALIDATION = "defer_validation";
private final DataFrameTransformConfig config; private final DataFrameTransformConfig config;
private Boolean deferValidation;
public PutDataFrameTransformRequest(DataFrameTransformConfig config) { public PutDataFrameTransformRequest(DataFrameTransformConfig config) {
this.config = config; this.config = config;
@ -41,6 +43,19 @@ public class PutDataFrameTransformRequest implements ToXContentObject, Validatab
return config; return config;
} }
public Boolean getDeferValidation() {
return deferValidation;
}
/**
* Indicates if deferrable validations should be skipped until the transform starts
*
* @param deferValidation {@code true} will cause validations to be deferred
*/
public void setDeferValidation(boolean deferValidation) {
this.deferValidation = deferValidation;
}
@Override @Override
public Optional<ValidationException> validate() { public Optional<ValidationException> validate() {
ValidationException validationException = new ValidationException(); ValidationException validationException = new ValidationException();

View file

@ -68,7 +68,7 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
PutDataFrameTransformRequest putRequest = new PutDataFrameTransformRequest( PutDataFrameTransformRequest putRequest = new PutDataFrameTransformRequest(
DataFrameTransformConfigTests.randomDataFrameTransformConfig()); DataFrameTransformConfigTests.randomDataFrameTransformConfig());
Request request = DataFrameRequestConverters.putDataFrameTransform(putRequest); Request request = DataFrameRequestConverters.putDataFrameTransform(putRequest);
assertThat(request.getParameters(), not(hasKey("defer_validation")));
assertEquals(HttpPut.METHOD_NAME, request.getMethod()); assertEquals(HttpPut.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + putRequest.getConfig().getId())); assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + putRequest.getConfig().getId()));
@ -76,6 +76,9 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null); DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null);
assertThat(parsedConfig, equalTo(putRequest.getConfig())); assertThat(parsedConfig, equalTo(putRequest.getConfig()));
} }
putRequest.setDeferValidation(true);
request = DataFrameRequestConverters.putDataFrameTransform(putRequest);
assertThat(request.getParameters(), hasEntry("defer_validation", Boolean.toString(putRequest.getDeferValidation())));
} }
public void testDeleteDataFrameTransform() { public void testDeleteDataFrameTransform() {

View file

@ -180,6 +180,22 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found")); assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
} }
public void testCreateDeleteWithDefer() throws IOException {
String sourceIndex = "missing-source-index";
String id = "test-with-defer";
DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest");
DataFrameClient client = highLevelClient().dataFrame();
PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(transform);
request.setDeferValidation(true);
AcknowledgedResponse ack = execute(request, client::putDataFrameTransform, client::putDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
ack = execute(new DeleteDataFrameTransformRequest(transform.getId()), client::deleteDataFrameTransform,
client::deleteDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
}
public void testGetTransform() throws IOException { public void testGetTransform() throws IOException {
String sourceIndex = "transform-source"; String sourceIndex = "transform-source";
createIndex(sourceIndex); createIndex(sourceIndex);

View file

@ -166,6 +166,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
// tag::put-data-frame-transform-request // tag::put-data-frame-transform-request
PutDataFrameTransformRequest request = PutDataFrameTransformRequest request =
new PutDataFrameTransformRequest(transformConfig); // <1> new PutDataFrameTransformRequest(transformConfig); // <1>
request.setDeferValidation(false); // <2>
// end::put-data-frame-transform-request // end::put-data-frame-transform-request
// tag::put-data-frame-transform-execute // tag::put-data-frame-transform-execute

View file

@ -20,6 +20,10 @@ A +{request}+ requires the following argument:
include-tagged::{doc-tests-file}[{api}-request] include-tagged::{doc-tests-file}[{api}-request]
-------------------------------------------------- --------------------------------------------------
<1> The configuration of the {dataframe-transform} to create <1> The configuration of the {dataframe-transform} to create
<2> Whether or not to wait to run deferrable validations until `_start` is called.
This option should be used with care as the created {dataframe-transform} will run
with the privileges of the user creating it. Meaning, if they do not have privileges,
such an error will not be visible until `_start` is called.
[id="{upid}-{api}-config"] [id="{upid}-{api}-config"]
==== Data Frame Transform Configuration ==== Data Frame Transform Configuration

View file

@ -45,6 +45,20 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and
underscores. It must start and end with alphanumeric characters. underscores. It must start and end with alphanumeric characters.
[[put-data-frame-transform-query-parms]]
==== {api-query-parms-title}
`defer_validation`::
(Optional, boolean) When `true`, this will cause deferrable validations to not run.
This behavior may be desired if the source index does not exist until
after the the {dataframe-transform} is created.
Deferred validations are always ran when the {dataframe-transform} is started,
with the exception of privilege checks. If the user who created the transform does
not have the required privileges on the source and destination indices then the
transform will start but then fail when it attempts the unauthorized operation.
The default value is `false`.
[[put-data-frame-transform-request-body]] [[put-data-frame-transform-request-body]]
==== {api-request-body-title} ==== {api-request-body-title}

View file

@ -33,6 +33,7 @@ public final class DataFrameField {
public static final ParseField SYNC = new ParseField("sync"); public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME_BASED_SYNC = new ParseField("time"); public static final ParseField TIME_BASED_SYNC = new ParseField("time");
public static final ParseField DELAY = new ParseField("delay"); public static final ParseField DELAY = new ParseField("delay");
public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
/** /**

View file

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.action; package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -13,8 +14,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xpack.core.dataframe.DataFrameField; import org.elasticsearch.xpack.core.dataframe.DataFrameField;
@ -41,21 +40,28 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
super(NAME, AcknowledgedResponse::new); super(NAME, AcknowledgedResponse::new);
} }
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject { public static class Request extends AcknowledgedRequest<Request> {
private final DataFrameTransformConfig config; private final DataFrameTransformConfig config;
private final boolean deferValidation;
public Request(DataFrameTransformConfig config) { public Request(DataFrameTransformConfig config, boolean deferValidation) {
this.config = config; this.config = config;
this.deferValidation = deferValidation;
} }
public Request(StreamInput in) throws IOException { public Request(StreamInput in) throws IOException {
super(in); super(in);
this.config = new DataFrameTransformConfig(in); this.config = new DataFrameTransformConfig(in);
if (in.getVersion().onOrAfter(Version.CURRENT)) {
this.deferValidation = in.readBoolean();
} else {
this.deferValidation = false;
}
} }
public static Request fromXContent(final XContentParser parser, final String id) throws IOException { public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
return new Request(DataFrameTransformConfig.fromXContent(parser, id, false)); return new Request(DataFrameTransformConfig.fromXContent(parser, id, false), deferValidation);
} }
/** /**
@ -111,24 +117,26 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
return validationException; return validationException;
} }
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return this.config.toXContent(builder, params);
}
public DataFrameTransformConfig getConfig() { public DataFrameTransformConfig getConfig() {
return config; return config;
} }
public boolean isDeferValidation() {
return deferValidation;
}
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
this.config.writeTo(out); this.config.writeTo(out);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeBoolean(this.deferValidation);
}
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(config); return Objects.hash(config, deferValidation);
} }
@Override @Override
@ -140,7 +148,7 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
return false; return false;
} }
Request other = (Request) obj; Request other = (Request) obj;
return Objects.equals(config, other.config); return Objects.equals(config, other.config) && this.deferValidation == other.deferValidation;
} }
} }

View file

@ -6,16 +6,24 @@
package org.elasticsearch.xpack.core.dataframe.action; package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request; import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests; import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.util.List;
public class PutDataFrameTransformActionRequestTests extends AbstractSerializingDataFrameTestCase<Request> { import static java.util.Collections.emptyList;
public class PutDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
private String transformId; private String transformId;
@Before @Before
@ -23,24 +31,24 @@ public class PutDataFrameTransformActionRequestTests extends AbstractSerializing
transformId = randomAlphaOfLengthBetween(1, 10); transformId = randomAlphaOfLengthBetween(1, 10);
} }
@Override
protected Request doParseInstance(XContentParser parser) throws IOException {
return Request.fromXContent(parser, transformId);
}
@Override @Override
protected Writeable.Reader<Request> instanceReader() { protected Writeable.Reader<Request> instanceReader() {
return Request::new; return Request::new;
} }
@Override @Override
protected boolean supportsUnknownFields() { protected Request createTestInstance() {
return false; DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config, randomBoolean());
} }
@Override @Override
protected Request createTestInstance() { protected NamedWriteableRegistry getNamedWriteableRegistry() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId); SearchModule searchModule = new SearchModule(Settings.EMPTY, emptyList());
return new Request(config);
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, DataFrameField.TIME_BASED_SYNC.getPreferredName(),
TimeSyncConfig::new));
return new NamedWriteableRegistry(namedWriteables);
} }
} }

View file

@ -57,8 +57,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final XPackLicenseState licenseState; private final XPackLicenseState licenseState;
private final Client client; private final Client client;
@ -93,8 +92,7 @@ public class TransportPutDataFrameTransformAction
} }
@Override @Override
protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener<AcknowledgedResponse> listener) protected void masterOperation(Task task, Request request, ClusterState clusterState, ActionListener<AcknowledgedResponse> listener) {
throws Exception {
if (!licenseState.isDataFrameAllowed()) { if (!licenseState.isDataFrameAllowed()) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME)); listener.onFailure(LicenseUtils.newComplianceException(XPackField.DATA_FRAME));
@ -121,14 +119,14 @@ public class TransportPutDataFrameTransformAction
return; return;
} }
try { try {
SourceDestValidator.check(config, clusterState, indexNameExpressionResolver); SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
} catch (ElasticsearchStatusException ex) { } catch (ElasticsearchStatusException ex) {
listener.onFailure(ex); listener.onFailure(ex);
return; return;
} }
// Early check to verify that the user can create the destination index and can read from the source // Early check to verify that the user can create the destination index and can read from the source
if (licenseState.isAuthAllowed()) { if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) {
final String destIndex = config.getDestination().getIndex(); final String destIndex = config.getDestination().getIndex();
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(), IndicesOptions.lenientExpandOpen(),
@ -163,12 +161,12 @@ public class TransportPutDataFrameTransformAction
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap( ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, config, r, listener), r -> handlePrivsResponse(username, request, r, listener),
listener::onFailure); listener::onFailure);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else { // No security enabled, just create the transform } else { // No security enabled, just create the transform
putDataFrame(config, listener); putDataFrame(request, listener);
} }
} }
@ -178,11 +176,11 @@ public class TransportPutDataFrameTransformAction
} }
private void handlePrivsResponse(String username, private void handlePrivsResponse(String username,
DataFrameTransformConfig config, Request request,
HasPrivilegesResponse privilegesResponse, HasPrivilegesResponse privilegesResponse,
ActionListener<AcknowledgedResponse> listener) throws IOException { ActionListener<AcknowledgedResponse> listener) {
if (privilegesResponse.isCompleteMatch()) { if (privilegesResponse.isCompleteMatch()) {
putDataFrame(config, listener); putDataFrame(request, listener);
} else { } else {
List<String> indices = privilegesResponse.getIndexPrivileges() List<String> indices = privilegesResponse.getIndexPrivileges()
.stream() .stream()
@ -191,14 +189,15 @@ public class TransportPutDataFrameTransformAction
listener.onFailure(Exceptions.authorizationError( listener.onFailure(Exceptions.authorizationError(
"Cannot create data frame transform [{}] because user {} lacks all the required permissions for indices: {}", "Cannot create data frame transform [{}] because user {} lacks all the required permissions for indices: {}",
config.getId(), request.getConfig().getId(),
username, username,
indices)); indices));
} }
} }
private void putDataFrame(DataFrameTransformConfig config, ActionListener<AcknowledgedResponse> listener) { private void putDataFrame(Request request, ActionListener<AcknowledgedResponse> listener) {
final DataFrameTransformConfig config = request.getConfig();
final Pivot pivot = new Pivot(config.getPivotConfig()); final Pivot pivot = new Pivot(config.getPivotConfig());
// <3> Return to the listener // <3> Return to the listener
@ -214,11 +213,23 @@ public class TransportPutDataFrameTransformAction
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap( ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
validationException -> listener.onFailure( validationException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException)) validationException))
); );
// <1> Validate our pivot try {
pivot.validate(client, config.getSource(), pivotValidationListener); pivot.validateConfig();
} catch (Exception e) {
listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
e));
return;
}
if (request.isDeferValidation()) {
pivotValidationListener.onResponse(true);
} else {
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
}
} }
} }

View file

@ -182,7 +182,7 @@ public class TransportStartDataFrameTransformAction extends
return; return;
} }
// Validate source and destination indices // Validate source and destination indices
SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver); SourceDestValidator.validate(config, clusterService.state(), indexNameExpressionResolver, false);
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency()));
final String destinationIndex = config.getDestination().getIndex(); final String destinationIndex = config.getDestination().getIndex();

View file

@ -141,7 +141,7 @@ public class TransportStopDataFrameTransformAction extends
} }
if (ids.contains(transformTask.getTransformId())) { if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we validate that none of the tasks are in a failed state earlier // This should not occur as we check that none of the tasks are in a failed state earlier
// Keep this check in here for insurance. // Keep this check in here for insurance.
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure( listener.onFailure(

View file

@ -35,7 +35,8 @@ public class RestPutDataFrameTransformAction extends BaseRestHandler {
String id = restRequest.param(DataFrameField.ID.getPreferredName()); String id = restRequest.param(DataFrameField.ID.getPreferredName());
XContentParser parser = restRequest.contentParser(); XContentParser parser = restRequest.contentParser();
PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id); boolean deferValidation = restRequest.paramAsBoolean(DataFrameField.DEFER_VALIDATION.getPreferredName(), false);
PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id, deferValidation);
return channel -> client.execute(PutDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); return channel -> client.execute(PutDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
} }

View file

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Set; import java.util.Set;
/** /**
@ -26,7 +27,14 @@ import java.util.Set;
*/ */
public final class SourceDestValidator { public final class SourceDestValidator {
private SourceDestValidator() {} interface SourceDestValidation {
boolean isDeferrable();
void validate(DataFrameTransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver);
}
private static final List<SourceDestValidation> VALIDATIONS = Arrays.asList(new SourceMissingValidation(),
new DestinationInSourceValidation(),
new DestinationSingleIndexValidation());
/** /**
* Validates the DataFrameTransformConfiguration source and destination indices. * Validates the DataFrameTransformConfiguration source and destination indices.
@ -41,52 +49,111 @@ public final class SourceDestValidator {
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
* @throws ElasticsearchStatusException when a validation fails * @throws ElasticsearchStatusException when a validation fails
*/ */
public static void check(DataFrameTransformConfig config, public static void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver,
boolean shouldDefer) {
for (SourceDestValidation validation : VALIDATIONS) {
if (shouldDefer && validation.isDeferrable()) {
continue;
}
validation.validate(config, clusterState, indexNameExpressionResolver);
}
}
static class SourceMissingValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return true;
}
@Override
public void validate(DataFrameTransformConfig config,
ClusterState clusterState, ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) { IndexNameExpressionResolver indexNameExpressionResolver) {
for(String src : config.getSource().getIndex()) {
final String destIndex = config.getDestination().getIndex(); String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
Set<String> concreteSourceIndexNames = new HashSet<>(); IndicesOptions.lenientExpandOpen(),
for(String src : config.getSource().getIndex()) { src);
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); if (concreteNames.length == 0) {
if (concreteNames.length == 0) { throw new ElasticsearchStatusException(
throw new ElasticsearchStatusException( DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), RestStatus.BAD_REQUEST);
RestStatus.BAD_REQUEST); }
} }
if (Regex.simpleMatch(src, destIndex)) { }
throw new ElasticsearchStatusException( }
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST); static class DestinationInSourceValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return true;
}
@Override
public void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
final String destIndex = config.getDestination().getIndex();
Set<String> concreteSourceIndexNames = new HashSet<>();
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
src);
if (Regex.simpleMatch(src, destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST);
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
} }
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
if (concreteSourceIndexNames.contains(destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
);
}
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
destIndex);
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
);
}
}
}
static class DestinationSingleIndexValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return false;
} }
if (concreteSourceIndexNames.contains(destIndex)) { @Override
throw new ElasticsearchStatusException( public void validate(DataFrameTransformConfig config,
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, ClusterState clusterState,
destIndex, IndexNameExpressionResolver indexNameExpressionResolver) {
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), final String destIndex = config.getDestination().getIndex();
RestStatus.BAD_REQUEST final String[] concreteDest =
); indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
}
final String[] concreteDest = if (concreteDest.length > 1) {
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
if (concreteDest.length > 1) { RestStatus.BAD_REQUEST
throw new ElasticsearchStatusException( );
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), }
RestStatus.BAD_REQUEST
);
}
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
);
} }
} }
} }

View file

@ -69,17 +69,28 @@ public class Pivot {
this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate; this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate;
} }
public void validate(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) { public void validateConfig() {
// step 1: check if used aggregations are supported
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
if (Aggregations.isSupportedByDataframe(agg.getType()) == false) { if (Aggregations.isSupportedByDataframe(agg.getType()) == false) {
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]")); throw new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]");
return;
} }
} }
}
// step 2: run a query to validate that config is valid public void validateQuery(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
runTestQuery(client, sourceConfig, listener); SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
if (response == null) {
listener.onFailure(new RuntimeException("Unexpected null response from test query"));
return;
}
if (response.status() != RestStatus.OK) {
listener.onFailure(new RuntimeException("Unexpected status from response of test query: "+ response.status()));
return;
}
listener.onResponse(true);
}, e -> listener.onFailure(new RuntimeException("Failed to test query", e))));
} }
public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener<Map<String, String>> listener) { public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener<Map<String, String>> listener) {
@ -164,24 +175,6 @@ public class Pivot {
dataFrameIndexerTransformStats); dataFrameIndexerTransformStats);
} }
private void runTestQuery(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
if (response == null) {
listener.onFailure(new RuntimeException("Unexpected null response from test query"));
return;
}
if (response.status() != RestStatus.OK) {
listener.onFailure(new RuntimeException("Unexpected status from response of test query: " + response.status()));
return;
}
listener.onResponse(true);
}, e->{
listener.onFailure(new RuntimeException("Failed to test query", e));
}));
}
public QueryBuilder filterBuckets(Map<String, Set<String>> changedBuckets) { public QueryBuilder filterBuckets(Map<String, Set<String>> changedBuckets) {
if (changedBuckets == null || changedBuckets.isEmpty()) { if (changedBuckets == null || changedBuckets.isEmpty()) {
@ -247,4 +240,5 @@ public class Pivot {
} }
return compositeAggregation; return compositeAggregation;
} }
} }

View file

@ -65,43 +65,47 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null));
SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()); SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false);
} }
public void testCheck_GivenMissingConcreteSourceIndex() { public void testCheck_GivenMissingConcreteSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing] does not exist")); assertThat(e.getMessage(), equalTo("Source index [missing] does not exist"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
public void testCheck_GivenMissingWildcardSourceIndex() { public void testCheck_GivenMissingWildcardSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist")); assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
public void testCheck_GivenDestIndexSameAsSourceIndex() { public void testCheck_GivenDestIndexSameAsSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]")); assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
public void testCheck_GivenDestIndexMatchesSourceIndex() { public void testCheck_GivenDestIndexMatchesSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
@ -109,16 +113,23 @@ public class SourceDestValidatorTests extends ESTestCase {
new DestConfig(SOURCE_2, null)); new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should refer to a single index"));
e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should refer to a single index")); equalTo("Destination index [dest-alias] should refer to a single index"));
@ -128,10 +139,12 @@ public class SourceDestValidatorTests extends ESTestCase {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null)); DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), assertThat(e.getMessage(),
equalTo("Destination index [source-1] is included in source expression [source-1]")); equalTo("Destination index [source-1] is included in source expression [source-1]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
} }
private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) { private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) {

View file

@ -47,7 +47,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class PivotTests extends ESTestCase { public class PivotTests extends ESTestCase {
@ -140,10 +142,10 @@ public class PivotTests extends ESTestCase {
public void testValidateAllUnsupportedAggregations() throws Exception { public void testValidateAllUnsupportedAggregations() throws Exception {
for (String agg : unsupportedAggregations) { for (String agg : unsupportedAggregations) {
AggregationConfig aggregationConfig = getAggregationConfig(agg); AggregationConfig aggregationConfig = getAggregationConfig(agg);
SourceConfig source = new SourceConfig(new String[]{"existing_source"}, QueryConfig.matchAll());
Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig));
assertInvalidTransform(client, source, pivot); RuntimeException ex = expectThrows(RuntimeException.class, pivot::validateConfig);
assertThat("expected aggregations to be unsupported, but they were", ex, is(notNullValue()));
} }
} }
@ -248,7 +250,7 @@ public class PivotTests extends ESTestCase {
private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception { private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception {
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionHolder = new AtomicReference<>(); final AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
pivot.validate(client, source, ActionListener.wrap(validity -> { pivot.validateQuery(client, source, ActionListener.wrap(validity -> {
assertEquals(expectValid, validity); assertEquals(expectValid, validity);
latch.countDown(); latch.countDown();
}, e -> { }, e -> {

View file

@ -11,6 +11,13 @@
"required": true, "required": true,
"description": "The id of the new transform." "description": "The id of the new transform."
} }
},
"params": {
"defer_validation": {
"type": "boolean",
"required": false,
"description": "If validations should be deferred until data frame transform starts, defaults to false."
}
} }
}, },
"body": { "body": {

View file

@ -91,6 +91,20 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}} "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
} }
} }
- do:
data_frame.put_data_frame_transform:
transform_id: "missing-source-transform"
defer_validation: true
body: >
{
"source": { "index": "missing-index" },
"dest": { "index": "missing-source-dest" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }
--- ---
"Test basic transform crud": "Test basic transform crud":
- do: - do:
@ -316,6 +330,22 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}} "aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
} }
} }
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
defer_validation: true
body: >
{
"source": {
"index": ["airline-data*"]
},
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
--- ---
"Test alias scenarios": "Test alias scenarios":
- do: - do: