[7.x] [Transform] Add _meta field to TransformConfig (#79003) (#79219)

This commit is contained in:
Przemysław Witek 2021-10-15 12:14:10 +02:00 committed by GitHub
parent 54c371603f
commit aee722dd39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 445 additions and 44 deletions

View file

@ -24,6 +24,7 @@ import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
@ -38,6 +39,7 @@ public class TransformConfig implements ToXContentObject {
public static final ParseField DESCRIPTION = new ParseField("description");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField METADATA = new ParseField("_meta");
public static final ParseField VERSION = new ParseField("version");
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
@ -51,6 +53,7 @@ public class TransformConfig implements ToXContentObject {
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final SettingsConfig settings;
private final Map<String, Object> metadata;
private final PivotConfig pivotConfig;
private final LatestConfig latestConfig;
private final String description;
@ -71,9 +74,11 @@ public class TransformConfig implements ToXContentObject {
LatestConfig latestConfig = (LatestConfig) args[6];
String description = (String) args[7];
SettingsConfig settings = (SettingsConfig) args[8];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[9];
Instant createTime = (Instant) args[10];
String transformVersion = (String) args[11];
@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) args[9];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[10];
Instant createTime = (Instant) args[11];
String transformVersion = (String) args[12];
return new TransformConfig(
id,
source,
@ -84,6 +89,7 @@ public class TransformConfig implements ToXContentObject {
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
createTime,
transformVersion
@ -106,6 +112,7 @@ public class TransformConfig implements ToXContentObject {
PARSER.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p), LATEST_TRANSFORM);
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), SETTINGS);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapOrdered(), METADATA);
PARSER.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
@ -136,7 +143,7 @@ public class TransformConfig implements ToXContentObject {
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null, null);
return new TransformConfig(null, source, null, null, null, pivotConfig, null, null, null, null, null, null, null);
}
/**
@ -151,7 +158,7 @@ public class TransformConfig implements ToXContentObject {
* @return A TransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
*/
public static TransformConfig forPreview(final SourceConfig source, final LatestConfig latestConfig) {
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null, null);
return new TransformConfig(null, source, null, null, null, null, latestConfig, null, null, null, null, null, null);
}
TransformConfig(
@ -164,6 +171,7 @@ public class TransformConfig implements ToXContentObject {
final LatestConfig latestConfig,
final String description,
final SettingsConfig settings,
final Map<String, Object> metadata,
final RetentionPolicyConfig retentionPolicyConfig,
final Instant createTime,
final String version
@ -177,6 +185,7 @@ public class TransformConfig implements ToXContentObject {
this.latestConfig = latestConfig;
this.description = description;
this.settings = settings;
this.metadata = metadata;
this.retentionPolicyConfig = retentionPolicyConfig;
this.createTime = createTime == null ? null : Instant.ofEpochMilli(createTime.toEpochMilli());
this.transformVersion = version == null ? null : Version.fromString(version);
@ -228,6 +237,11 @@ public class TransformConfig implements ToXContentObject {
return settings;
}
@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}
@Nullable
public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
@ -265,6 +279,9 @@ public class TransformConfig implements ToXContentObject {
if (settings != null) {
builder.field(SETTINGS.getPreferredName(), settings);
}
if (metadata != null) {
builder.field(METADATA.getPreferredName(), metadata);
}
if (retentionPolicyConfig != null) {
builder.startObject(RETENTION_POLICY.getPreferredName());
builder.field(retentionPolicyConfig.getName(), retentionPolicyConfig);
@ -300,6 +317,7 @@ public class TransformConfig implements ToXContentObject {
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.transformVersion, that.transformVersion)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.metadata, that.metadata)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.pivotConfig, that.pivotConfig)
&& Objects.equals(this.latestConfig, that.latestConfig)
@ -315,6 +333,7 @@ public class TransformConfig implements ToXContentObject {
frequency,
syncConfig,
settings,
metadata,
createTime,
transformVersion,
pivotConfig,
@ -343,6 +362,7 @@ public class TransformConfig implements ToXContentObject {
private PivotConfig pivotConfig;
private LatestConfig latestConfig;
private SettingsConfig settings;
private Map<String, Object> metadata;
private String description;
private RetentionPolicyConfig retentionPolicyConfig;
@ -391,6 +411,11 @@ public class TransformConfig implements ToXContentObject {
return this;
}
public Builder setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
}
public Builder setRetentionPolicyConfig(RetentionPolicyConfig retentionPolicyConfig) {
this.retentionPolicyConfig = retentionPolicyConfig;
return this;
@ -407,6 +432,7 @@ public class TransformConfig implements ToXContentObject {
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
null,
null

View file

@ -17,6 +17,7 @@ import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
@ -39,8 +40,10 @@ public class TransformConfigUpdate implements ToXContentObject {
SyncConfig syncConfig = (SyncConfig) args[3];
String description = (String) args[4];
SettingsConfig settings = (SettingsConfig) args[5];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[6];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, retentionPolicyConfig);
@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) args[6];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[7];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, metadata, retentionPolicyConfig);
}
);
@ -51,6 +54,7 @@ public class TransformConfigUpdate implements ToXContentObject {
PARSER.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), TransformConfig.SYNC);
PARSER.declareString(optionalConstructorArg(), TransformConfig.DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p), TransformConfig.SETTINGS);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapOrdered(), TransformConfig.METADATA);
PARSER.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
@ -64,6 +68,7 @@ public class TransformConfigUpdate implements ToXContentObject {
private final SyncConfig syncConfig;
private final String description;
private final SettingsConfig settings;
private final Map<String, Object> metadata;
public TransformConfigUpdate(
final SourceConfig source,
@ -72,6 +77,7 @@ public class TransformConfigUpdate implements ToXContentObject {
final SyncConfig syncConfig,
final String description,
final SettingsConfig settings,
final Map<String, Object> metadata,
final RetentionPolicyConfig retentionPolicyConfig
) {
this.source = source;
@ -80,6 +86,7 @@ public class TransformConfigUpdate implements ToXContentObject {
this.syncConfig = syncConfig;
this.description = description;
this.settings = settings;
this.metadata = metadata;
}
public SourceConfig getSource() {
@ -108,6 +115,11 @@ public class TransformConfigUpdate implements ToXContentObject {
return settings;
}
@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}
@Override
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
builder.startObject();
@ -131,6 +143,9 @@ public class TransformConfigUpdate implements ToXContentObject {
if (settings != null) {
builder.field(TransformConfig.SETTINGS.getPreferredName(), settings);
}
if (metadata != null) {
builder.field(TransformConfig.METADATA.getPreferredName(), metadata);
}
builder.endObject();
return builder;
@ -153,12 +168,13 @@ public class TransformConfigUpdate implements ToXContentObject {
&& Objects.equals(this.frequency, that.frequency)
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings);
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.metadata, that.metadata);
}
@Override
public int hashCode() {
return Objects.hash(source, dest, frequency, syncConfig, description, settings);
return Objects.hash(source, dest, frequency, syncConfig, description, settings, metadata);
}
@Override
@ -182,6 +198,7 @@ public class TransformConfigUpdate implements ToXContentObject {
private SyncConfig syncConfig;
private String description;
private SettingsConfig settings;
private Map<String, Object> metdata;
private RetentionPolicyConfig retentionPolicyConfig;
public Builder setSource(SourceConfig source) {
@ -214,13 +231,18 @@ public class TransformConfigUpdate implements ToXContentObject {
return this;
}
public Builder setMetadata(Map<String, Object> metadata) {
this.metdata = metdata;
return this;
}
public Builder setRetentionPolicyConfig(RetentionPolicyConfig retentionPolicyConfig) {
this.retentionPolicyConfig = retentionPolicyConfig;
return this;
}
public TransformConfigUpdate build() {
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, retentionPolicyConfig);
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, metdata, retentionPolicyConfig);
}
}
}

View file

@ -16,6 +16,7 @@ import org.elasticsearch.client.transform.transforms.pivot.PivotConfig;
import org.elasticsearch.client.transform.transforms.pivot.PivotConfigTests;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.search.SearchModule;
@ -25,6 +26,7 @@ import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import static org.elasticsearch.client.transform.transforms.DestConfigTests.randomDestConfig;
@ -52,6 +54,7 @@ public class TransformConfigTests extends AbstractXContentTestCase<TransformConf
latestConfig,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
SettingsConfigTests.randomSettingsConfig(),
randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
@ -66,6 +69,30 @@ public class TransformConfigTests extends AbstractXContentTestCase<TransformConf
return TimeRetentionPolicyConfigTests.randomTimeRetentionPolicyConfig();
}
public static Map<String, Object> randomMetadata() {
return randomMap(0, 10, () -> {
String key = randomAlphaOfLengthBetween(1, 10);
Object value;
switch (randomIntBetween(0, 3)) {
case 0:
value = null;
break;
case 1:
value = randomLong();
break;
case 2:
value = randomAlphaOfLengthBetween(1, 10);
break;
case 3:
value = randomMap(0, 10, () -> Tuple.tuple(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
break;
default:
throw new AssertionError();
}
return Tuple.tuple(key, value);
});
}
@Override
protected TransformConfig createTestInstance() {
return randomTransformConfig();

View file

@ -23,6 +23,7 @@ import java.util.List;
import static org.elasticsearch.client.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.client.transform.transforms.SettingsConfigTests.randomSettingsConfig;
import static org.elasticsearch.client.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.client.transform.transforms.TransformConfigTests.randomMetadata;
import static org.elasticsearch.client.transform.transforms.TransformConfigTests.randomRetentionPolicyConfig;
import static org.elasticsearch.client.transform.transforms.TransformConfigTests.randomSyncConfig;
@ -36,6 +37,7 @@ public class TransformConfigUpdateTests extends AbstractXContentTestCase<Transfo
randomBoolean() ? null : randomSyncConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : randomSettingsConfig(),
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig()
);
}

View file

@ -984,6 +984,10 @@ The `latest` method transforms the data by finding the latest document for each
unique key.
end::transform-latest[]
tag::transform-metadata[]
Defines optional {transform} metadata.
end::transform-metadata[]
tag::transform-retention[]
Defines a retention policy for the {transform}. Data that meets the defined
criteria is deleted from the destination index.

View file

@ -126,6 +126,12 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-unique-key]
====
//End latest
//Begin _meta
`_meta`::
(Optional, object)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-metadata]
//End _meta
//Begin pivot
`pivot`::
(Required^*^, object)

View file

@ -98,6 +98,12 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-pipeline]
(Optional, <<time-units, time units>>)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=frequency]
//Begin _meta
`_meta`::
(Optional, object)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-metadata]
//End _meta
//Begin retention policy
`retention_policy`::
(Optional, object)

View file

@ -32,6 +32,7 @@ public final class TransformField {
public static final ParseField CREATE_TIME = new ParseField("create_time");
public static final ParseField DESTINATION = new ParseField("dest");
public static final ParseField SETTINGS = new ParseField("settings");
public static final ParseField METADATA = new ParseField("_meta");
public static final ParseField FREQUENCY = new ParseField("frequency");
public static final ParseField FORCE = new ParseField("force");
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");

View file

@ -84,6 +84,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
private final TimeValue frequency;
private final SyncConfig syncConfig;
private final SettingsConfig settings;
private final Map<String, Object> metadata;
private final RetentionPolicyConfig retentionPolicyConfig;
private final String description;
// headers store the user context from the creating user, which allows us to run the transform as this user
@ -127,8 +128,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
if (lenient == false) {
// on strict parsing do not allow injection of headers, transform version, or create time
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
validateStrictParsingParams(args[12], TransformField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[13], TransformField.VERSION.getPreferredName());
validateStrictParsingParams(args[13], TransformField.CREATE_TIME.getPreferredName());
validateStrictParsingParams(args[14], TransformField.VERSION.getPreferredName());
// exactly one function must be defined
if ((args[7] == null) == (args[8] == null)) {
throw new IllegalArgumentException(TransformMessages.TRANSFORM_CONFIGURATION_BAD_FUNCTION_COUNT);
@ -143,7 +144,13 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
LatestConfig latestConfig = (LatestConfig) args[8];
String description = (String) args[9];
SettingsConfig settings = (SettingsConfig) args[10];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[11];
@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) args[11];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[12];
Instant createTime = (Instant) args[13];
String version = (String) args[14];
return new TransformConfig(
id,
@ -156,9 +163,10 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
(Instant) args[12],
(String) args[13]
createTime,
version
);
});
@ -173,6 +181,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
parser.declareObject(optionalConstructorArg(), (p, c) -> LatestConfig.fromXContent(p, lenient), Function.LATEST.getParseField());
parser.declareString(optionalConstructorArg(), TransformField.DESCRIPTION);
parser.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p, lenient), TransformField.SETTINGS);
parser.declareObject(optionalConstructorArg(), (p, c) -> p.mapOrdered(), TransformField.METADATA);
parser.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
@ -203,6 +212,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
final LatestConfig latestConfig,
final String description,
final SettingsConfig settings,
final Map<String, Object> metadata,
final RetentionPolicyConfig retentionPolicyConfig,
final Instant createTime,
final String version
@ -217,6 +227,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
this.latestConfig = latestConfig;
this.description = description;
this.settings = settings == null ? new SettingsConfig() : settings;
this.metadata = metadata;
this.retentionPolicyConfig = retentionPolicyConfig;
if (this.description != null && this.description.length() > MAX_DESCRIPTION_LENGTH) {
throw new IllegalArgumentException("[description] must be less than 1000 characters in length.");
@ -256,6 +267,11 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
} else {
settings = new SettingsConfig();
}
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
metadata = in.readMap();
} else {
metadata = null;
}
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
retentionPolicyConfig = in.readOptionalNamedWriteable(RetentionPolicyConfig.class);
} else {
@ -328,6 +344,10 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return settings;
}
public Map<String, Object> getMetadata() {
return metadata;
}
@Nullable
public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
@ -432,6 +452,9 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
settings.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
out.writeMap(metadata);
}
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeOptionalNamedWriteable(retentionPolicyConfig);
}
@ -483,6 +506,9 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
builder.field(TransformField.DESCRIPTION.getPreferredName(), description);
}
builder.field(TransformField.SETTINGS.getPreferredName(), settings);
if (metadata != null) {
builder.field(TransformField.METADATA.getPreferredName(), metadata);
}
if (retentionPolicyConfig != null) {
builder.startObject(TransformField.RETENTION_POLICY.getPreferredName());
builder.field(retentionPolicyConfig.getWriteableName(), retentionPolicyConfig);
@ -514,6 +540,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
&& Objects.equals(this.latestConfig, that.latestConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.metadata, that.metadata)
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.transformVersion, that.transformVersion);
@ -532,6 +559,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
createTime,
transformVersion
@ -639,6 +667,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
private PivotConfig pivotConfig;
private LatestConfig latestConfig;
private SettingsConfig settings;
private Map<String, Object> metadata;
private RetentionPolicyConfig retentionPolicyConfig;
public Builder() {}
@ -655,6 +684,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
this.pivotConfig = config.pivotConfig;
this.latestConfig = config.latestConfig;
this.settings = config.settings;
this.metadata = config.metadata;
this.retentionPolicyConfig = config.retentionPolicyConfig;
}
@ -721,6 +751,15 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
return settings;
}
public Builder setMetadata(Map<String, Object> metadata) {
this.metadata = metadata;
return this;
}
Map<String, Object> getMetadata() {
return metadata;
}
public Builder setHeaders(Map<String, String> headers) {
this.headers = headers;
return this;
@ -774,6 +813,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
createTime,
transformVersion == null ? null : transformVersion.toString()
@ -802,6 +842,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
&& Objects.equals(this.latestConfig, that.latestConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.metadata, that.metadata)
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig)
&& Objects.equals(this.createTime, that.createTime)
&& Objects.equals(this.transformVersion, that.transformVersion);
@ -820,6 +861,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
latestConfig,
description,
settings,
metadata,
retentionPolicyConfig,
createTime,
transformVersion

View file

@ -12,11 +12,11 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
@ -34,7 +34,7 @@ public class TransformConfigUpdate implements Writeable {
public static final String NAME = "data_frame_transform_config_update";
public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null);
public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null, null);
private static final ConstructingObjectParser<TransformConfigUpdate, String> PARSER = new ConstructingObjectParser<>(
NAME,
@ -48,8 +48,10 @@ public class TransformConfigUpdate implements Writeable {
SyncConfig syncConfig = (SyncConfig) args[3];
String description = (String) args[4];
SettingsConfig settings = (SettingsConfig) args[5];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[6];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, retentionPolicyConfig);
@SuppressWarnings("unchecked")
Map<String, Object> metadata = (Map<String, Object>) args[6];
RetentionPolicyConfig retentionPolicyConfig = (RetentionPolicyConfig) args[7];
return new TransformConfigUpdate(source, dest, frequency, syncConfig, description, settings, metadata, retentionPolicyConfig);
}
);
@ -60,6 +62,7 @@ public class TransformConfigUpdate implements Writeable {
PARSER.declareNamedObject(optionalConstructorArg(), (p, c, n) -> p.namedObject(SyncConfig.class, n, c), TransformField.SYNC);
PARSER.declareString(optionalConstructorArg(), TransformField.DESCRIPTION);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> SettingsConfig.fromXContent(p, false), TransformField.SETTINGS);
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapOrdered(), TransformField.METADATA);
PARSER.declareNamedObject(
optionalConstructorArg(),
(p, c, n) -> p.namedObject(RetentionPolicyConfig.class, n, c),
@ -73,6 +76,7 @@ public class TransformConfigUpdate implements Writeable {
private final SyncConfig syncConfig;
private final String description;
private final SettingsConfig settings;
private final Map<String, Object> metadata;
private final RetentionPolicyConfig retentionPolicyConfig;
private Map<String, String> headers;
@ -83,6 +87,7 @@ public class TransformConfigUpdate implements Writeable {
final SyncConfig syncConfig,
final String description,
final SettingsConfig settings,
final Map<String, Object> metadata,
final RetentionPolicyConfig retentionPolicyConfig
) {
this.source = source;
@ -94,6 +99,7 @@ public class TransformConfigUpdate implements Writeable {
throw new IllegalArgumentException("[description] must be less than 1000 characters in length.");
}
this.settings = settings;
this.metadata = metadata;
this.retentionPolicyConfig = retentionPolicyConfig;
}
@ -111,6 +117,11 @@ public class TransformConfigUpdate implements Writeable {
} else {
settings = null;
}
if (in.getVersion().onOrAfter(Version.V_7_16_0)) {
metadata = in.readMap();
} else {
metadata = null;
}
if (in.getVersion().onOrAfter(Version.V_7_12_0)) {
retentionPolicyConfig = in.readOptionalNamedWriteable(RetentionPolicyConfig.class);
} else {
@ -144,6 +155,11 @@ public class TransformConfigUpdate implements Writeable {
return settings;
}
@Nullable
public Map<String, Object> getMetadata() {
return metadata;
}
@Nullable
public RetentionPolicyConfig getRetentionPolicyConfig() {
return retentionPolicyConfig;
@ -173,6 +189,9 @@ public class TransformConfigUpdate implements Writeable {
if (out.getVersion().onOrAfter(Version.V_7_8_0)) {
out.writeOptionalWriteable(settings);
}
if (out.getVersion().onOrAfter(Version.V_7_16_0)) {
out.writeMap(metadata);
}
if (out.getVersion().onOrAfter(Version.V_7_12_0)) {
out.writeOptionalNamedWriteable(retentionPolicyConfig);
}
@ -196,13 +215,14 @@ public class TransformConfigUpdate implements Writeable {
&& Objects.equals(this.syncConfig, that.syncConfig)
&& Objects.equals(this.description, that.description)
&& Objects.equals(this.settings, that.settings)
&& Objects.equals(this.metadata, that.metadata)
&& Objects.equals(this.retentionPolicyConfig, that.retentionPolicyConfig)
&& Objects.equals(this.headers, that.headers);
}
@Override
public int hashCode() {
return Objects.hash(source, dest, frequency, syncConfig, description, settings, retentionPolicyConfig, headers);
return Objects.hash(source, dest, frequency, syncConfig, description, settings, metadata, retentionPolicyConfig, headers);
}
public static TransformConfigUpdate fromXContent(final XContentParser parser) {
@ -220,6 +240,7 @@ public class TransformConfigUpdate implements Writeable {
&& isNullOrEqual(syncConfig, config.getSyncConfig())
&& isNullOrEqual(description, config.getDescription())
&& isNullOrEqual(settings, config.getSettings())
&& isNullOrEqual(metadata, config.getMetadata())
&& isNullOrEqual(retentionPolicyConfig, config.getRetentionPolicyConfig())
&& isNullOrEqual(headers, config.getHeaders());
}
@ -273,6 +294,10 @@ public class TransformConfigUpdate implements Writeable {
settingsBuilder.update(settings);
builder.setSettings(settingsBuilder.build());
}
if (metadata != null) {
// Unlike with settings, we fully replace the old metadata with the new metadata
builder.setMetadata(metadata);
}
if (retentionPolicyConfig != null) {
builder.setRetentionPolicyConfig(retentionPolicyConfig);
}

View file

@ -57,6 +57,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans
null,
null,
null,
null,
null
);
return new Request(config);

View file

@ -10,15 +10,19 @@ package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.DeprecationHandler;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.RemoteClusterMinimumVersionValidation;
import org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SourceDestValidation;
import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
@ -84,6 +88,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
latestConfig,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig(),
null,
null
@ -128,6 +133,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
latestConfig,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
version == null ? null : version.toString()
@ -157,6 +163,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
latestConfig,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig(),
null,
null
@ -173,6 +180,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig(),
null,
null
@ -187,6 +195,30 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
return TimeRetentionPolicyConfigTests.randomTimeRetentionPolicyConfig();
}
public static Map<String, Object> randomMetadata() {
return randomMap(0, 10, () -> {
String key = randomAlphaOfLengthBetween(1, 10);
Object value;
switch (randomIntBetween(0, 3)) {
case 0:
value = null;
break;
case 1:
value = randomLong();
break;
case 2:
value = randomAlphaOfLengthBetween(1, 10);
break;
case 3:
value = randomMap(0, 10, () -> Tuple.tuple(randomAlphaOfLengthBetween(1, 10), randomAlphaOfLengthBetween(1, 10)));
break;
default:
throw new AssertionError();
}
return Tuple.tuple(key, value);
});
}
@Before
public void setUpOptionalId() {
transformId = randomAlphaOfLengthBetween(1, 10);
@ -390,6 +422,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
null,
null,
null,
null
)
);
@ -408,6 +441,7 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
null,
null,
null,
null,
null
);
assertThat(description, equalTo(config.getDescription()));
@ -775,7 +809,55 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
)
)
);
}
public void testSerializingMetadataPreservesOrder() throws IOException {
String json = "{"
+ " \"id\" : \""
+ transformId
+ "\","
+ " \"_meta\": {"
+ " \"d\": 4,"
+ " \"a\": 1,"
+ " \"c\": 3,"
+ " \"e\": 5,"
+ " \"b\": 2"
+ "},"
+ " \"source\" : {\"index\":\"src\"},"
+ " \"dest\" : {\"index\": \"dest\"},"
+ " \"pivot\" : {"
+ " \"group_by\": {"
+ " \"time\": {"
+ " \"date_histogram\": {"
+ " \"field\": \"timestamp\","
+ " \"fixed_interval\": \"1d\""
+ "} } },"
+ " \"aggs\": {"
+ " \"avg\": {"
+ " \"avg\": {"
+ " \"field\": \"points\""
+ "} } } } }";
// Read TransformConfig from JSON and verify that metadata keys are in the same order as in JSON
TransformConfig transformConfig = createTransformConfigFromString(json, transformId, true);
assertThat(
new ArrayList<>(transformConfig.getMetadata().keySet()), is(equalTo(org.elasticsearch.core.List.of("d", "a", "c", "e", "b"))));
// Write TransformConfig to JSON, read it again and verify that metadata keys are still in the same order
json = XContentHelper.toXContent(transformConfig, XContentType.JSON, TO_XCONTENT_PARAMS, false).utf8ToString();
transformConfig = createTransformConfigFromString(json, transformId, true);
assertThat(
new ArrayList<>(transformConfig.getMetadata().keySet()), is(equalTo(org.elasticsearch.core.List.of("d", "a", "c", "e", "b"))));
// Write TransformConfig to wire, read it again and verify that metadata keys are still in the same order
try (BytesStreamOutput output = new BytesStreamOutput()) {
transformConfig.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), getNamedWriteableRegistry())) {
transformConfig = new TransformConfig(in);
}
}
assertThat(
new ArrayList<>(transformConfig.getMetadata().keySet()), is(equalTo(org.elasticsearch.core.List.of("d", "a", "c", "e", "b"))));
}
private TransformConfig createTransformConfigFromString(String json, String id) throws IOException {
@ -787,5 +869,4 @@ public class TransformConfigTests extends AbstractSerializingTransformTestCase<T
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, json);
return TransformConfig.fromXContent(parser, id, lenient);
}
}

View file

@ -21,11 +21,15 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
import static org.elasticsearch.xpack.core.transform.transforms.DestConfigTests.randomDestConfig;
import static org.elasticsearch.xpack.core.transform.transforms.SourceConfigTests.randomSourceConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomMetadata;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomRetentionPolicyConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomSyncConfig;
import static org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests.randomTransformConfig;
import static org.hamcrest.Matchers.equalTo;
@ -39,6 +43,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
randomBoolean() ? null : randomSyncConfig(),
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
randomBoolean() ? null : SettingsConfigTests.randomSettingsConfig(),
randomBoolean() ? null : randomMetadata(),
randomBoolean() ? null : randomRetentionPolicyConfig()
);
}
@ -65,6 +70,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
config.getSyncConfig(),
config.getDescription(),
config.getSettings(),
config.getMetadata(),
config.getRetentionPolicyConfig()
);
assertTrue("equal update is not noop", update.isNoop(config));
@ -76,6 +82,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
config.getSyncConfig(),
"this is a new description",
config.getSettings(),
config.getMetadata(),
config.getRetentionPolicyConfig()
);
assertFalse("true update is noop", update.isNoop(config));
@ -94,11 +101,12 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomMetadata(),
randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString()
);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null);
assertThat(config, equalTo(update.apply(config)));
SourceConfig sourceConfig = new SourceConfig("the_new_index");
@ -107,6 +115,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30));
String newDescription = "new description";
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true);
Map<String, Object> newMetadata = randomMetadata();
RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000));
update = new TransformConfigUpdate(
sourceConfig,
@ -115,6 +124,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
syncConfig,
newDescription,
settings,
newMetadata,
retentionPolicyConfig
);
@ -128,6 +138,8 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getSyncConfig(), equalTo(syncConfig));
assertThat(updatedConfig.getDescription(), equalTo(newDescription));
assertThat(updatedConfig.getSettings(), equalTo(settings));
// We only check for the existence of new entries. The map can also contain the old (random) entries.
assertThat(updatedConfig.getMetadata(), equalTo(newMetadata));
assertThat(updatedConfig.getRetentionPolicyConfig(), equalTo(retentionPolicyConfig));
assertThat(updatedConfig.getHeaders(), equalTo(headers));
assertThat(updatedConfig.getVersion(), equalTo(Version.CURRENT));
@ -145,13 +157,14 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomMetadata(),
randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString()
);
TransformConfigUpdate update =
new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(4_000, null, (Boolean) null, null), null);
new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(4_000, null, (Boolean) null, null), null, null);
TransformConfig updatedConfig = update.apply(config);
// for settings we allow partial updates, so changing 1 setting should not overwrite the other
@ -161,7 +174,8 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis()));
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(null, 43.244F, (Boolean) null, null), null);
update =
new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(null, 43.244F, (Boolean) null, null), null, null);
updatedConfig = update.apply(updatedConfig);
assertThat(updatedConfig.getSettings().getMaxPageSearchSize(), equalTo(4_000));
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F));
@ -169,14 +183,14 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
// now reset to default using the magic -1
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null, null), null);
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null, null), null, null);
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F));
assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis()));
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null, null), null);
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null, null), null, null);
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertNull(updatedConfig.getSettings().getDocsPerSecond());
@ -184,6 +198,37 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
}
public void testApplyMetadata() {
Map<String, Object> oldMetadata = new HashMap<String, Object>();
oldMetadata.put("foo", 123);
oldMetadata.put("bar", 456);
TransformConfig config = new TransformConfig(
"time-transform",
randomSourceConfig(),
randomDestConfig(),
TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
randomSyncConfig(),
Collections.singletonMap("key", "value"),
PivotConfigTests.randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
oldMetadata,
randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.V_7_2_0.toString()
);
Map<String, Object> newMetadata = new HashMap<String, Object>();
newMetadata.put("bar", 789);
newMetadata.put("baz", 1000);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, newMetadata, null);
TransformConfig updatedConfig = update.apply(config);
// For metadata we apply full replace rather than partial update, so "foo" disappears.
assertThat(updatedConfig.getMetadata(), equalTo(newMetadata));
}
public void testApplyWithSyncChange() {
TransformConfig batchConfig = new TransformConfig(
"batch-transform",
@ -196,6 +241,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomMetadata(),
randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
@ -208,6 +254,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
TimeSyncConfigTests.randomTimeSyncConfig(),
null,
null,
null,
null
);
@ -228,12 +275,13 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
SettingsConfigTests.randomNonEmptySettingsConfig(),
randomMetadata(),
randomRetentionPolicyConfig(),
randomBoolean() ? null : Instant.now(),
randomBoolean() ? null : Version.CURRENT.toString()
);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null, null, null, new FooSync(), null, null, null);
TransformConfigUpdate fooSyncUpdate = new TransformConfigUpdate(null, null, null, new FooSync(), null, null, null, null);
ex = expectThrows(ElasticsearchStatusException.class, () -> fooSyncUpdate.apply(timeSyncedConfig));
assertThat(
ex.getMessage(),
@ -273,6 +321,9 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
if (update.getSettings() != null) {
builder.field(TransformField.SETTINGS.getPreferredName(), update.getSettings());
}
if (update.getMetadata() != null) {
builder.field(TransformField.METADATA.getPreferredName(), update.getMetadata());
}
if (update.getRetentionPolicyConfig() != null) {
builder.startObject(TransformField.RETENTION_POLICY.getPreferredName());
builder.field(update.getRetentionPolicyConfig().getWriteableName(), update.getRetentionPolicyConfig());
@ -282,14 +333,6 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
builder.endObject();
}
private static SyncConfig randomSyncConfig() {
return TimeSyncConfigTests.randomTimeSyncConfig();
}
private static RetentionPolicyConfig randomRetentionPolicyConfig() {
return TimeRetentionPolicyConfigTests.randomTimeRetentionPolicyConfig();
}
static class FooSync implements SyncConfig {
@Override

View file

@ -194,6 +194,11 @@
}
}
},
"_meta": {
"$id": "#root/_meta",
"title": "Metadata",
"type": "object"
},
"retention_policy": {
"$id": "#root/retention_policy",
"additionalProperties": false,

View file

@ -315,3 +315,75 @@ setup:
{
"dest": { "index": "destination#dest" }
}
---
"Test update transform metadata":
- do:
transform.get_transform:
transform_id: "updating-airline-transform"
- match: { count: 1 }
- match: { transforms.0.id: "updating-airline-transform" }
- is_false: transforms.0._meta # no metadata exists yet
- do:
transform.update_transform:
transform_id: "updating-airline-transform"
body: >
{
"_meta": {
"foo": 123,
"bar": 456,
"baz": {
"a1": 11,
"a2": 22
}
}
}
- match: { id: "updating-airline-transform" }
- match: { _meta.foo: 123 }
- match: { _meta.bar: 456 }
- match: { _meta.baz.a1: 11 }
- match: { _meta.baz.a2: 22 }
- is_false: _meta.baz.a3
- do:
transform.get_transform:
transform_id: "updating-airline-transform"
- match: { count: 1 }
- match: { transforms.0.id: "updating-airline-transform" }
- match: { transforms.0._meta.foo: 123 }
- match: { transforms.0._meta.bar: 456 }
- match: { transforms.0._meta.baz.a1: 11 }
- match: { transforms.0._meta.baz.a2: 22 }
- is_false: transforms.0._meta.baz.a3
- do:
transform.update_transform:
transform_id: "updating-airline-transform"
body: >
{
"_meta": {
"bar": "some bar note",
"baz": {
"a2": 222,
"a3": 333
}
}
}
- match: { id: "updating-airline-transform" }
- is_false: _meta.foo # "foo" disappeared as the metadata update is implemented as full replace
- match: { _meta.bar: "some bar note" } # "bar" value type has changed from int to string
- is_false: _meta.baz.a1 # "baz.a1" disappeared as the metadata update is implemented as full replace
- match: { _meta.baz.a2: 222 }
- match: { _meta.baz.a3: 333 }
- do:
transform.get_transform:
transform_id: "updating-airline-transform"
- match: { count: 1 }
- match: { transforms.0.id: "updating-airline-transform" }
- is_false: transforms.0._meta.foo
- match: { transforms.0._meta.bar: "some bar note" }
- is_false: transforms.0._meta.baz.a1
- match: { transforms.0._meta.baz.a2: 222 }
- match: { transforms.0._meta.baz.a3: 333 }

View file

@ -165,6 +165,7 @@ public class TransformProgressIT extends ESRestTestCase {
null,
null,
null,
null,
null
);

View file

@ -85,7 +85,7 @@ public class TransformInternalIndexIT extends TransformSingleNodeTestCase {
assertThat(getTransformResponse.getTransformConfigurations().get(0).getId(), equalTo(transformId));
UpdateTransformAction.Request updateTransformActionRequest = new UpdateTransformAction.Request(
new TransformConfigUpdate(null, null, null, null, "updated", null, null),
new TransformConfigUpdate(null, null, null, null, "updated", null, null, null),
transformId, false);
UpdateTransformAction.Response updateTransformActionResponse =
client().execute(UpdateTransformAction.INSTANCE, updateTransformActionRequest).actionGet();

View file

@ -78,7 +78,7 @@ public class TransformNoRemoteClusterClientNodeIT extends TransformSingleNodeTes
}
TransformConfigUpdate update =
new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null);
new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null, null);
UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, true);
client().execute(UpdateTransformAction.INSTANCE, request).actionGet();
}
@ -93,7 +93,7 @@ public class TransformNoRemoteClusterClientNodeIT extends TransformSingleNodeTes
}
TransformConfigUpdate update =
new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null);
new TransformConfigUpdate(new SourceConfig("remote_cluster:my-index"), null, null, null, null, null, null, null);
UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, false);
ElasticsearchStatusException e =
expectThrows(

View file

@ -93,7 +93,7 @@ public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
}
TransformConfigUpdate update =
new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null);
new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null, null);
UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, true);
client().execute(UpdateTransformAction.INSTANCE, request).actionGet();
@ -111,7 +111,7 @@ public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
}
TransformConfigUpdate update =
new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null);
new TransformConfigUpdate(new SourceConfig("my-index", "my-index-2"), null, null, null, null, null, null, null);
UpdateTransformAction.Request request = new UpdateTransformAction.Request(update, transformId, false);
ElasticsearchStatusException e =
expectThrows(

View file

@ -161,7 +161,7 @@ public class TransportUpgradeTransformsAction extends TransportMasterNodeAction<
final ClusterState clusterState = clusterService.state();
transformConfigManager.getTransformConfigurationForUpdate(id, ActionListener.wrap(configAndVersion -> {
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null);
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null, null);
TransformConfig config = configAndVersion.v1();
/*

View file

@ -354,6 +354,9 @@ public final class TransformInternalIndex {
.startObject(TransformConfig.Function.LATEST.getParseField().getPreferredName())
.field(TYPE, FLATTENED)
.endObject()
.startObject(TransformField.METADATA.getPreferredName())
.field(TYPE, FLATTENED)
.endObject()
.startObject(TransformField.RETENTION_POLICY.getPreferredName())
.field(TYPE, FLATTENED)
.endObject()

View file

@ -8,10 +8,12 @@
package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.PutTransformAction;
@ -23,6 +25,15 @@ import static org.elasticsearch.rest.RestRequest.Method.PUT;
public class RestPutTransformAction extends BaseRestHandler {
/**
* Maximum allowed size of the REST request.
*
* It is set so that the user is able to provide elaborate painless scripts but not able to provide TransformConfig._meta map of
* arbitrary size. Such transform configs of an arbitrary size could be a problem upon fetch so it's better to prevent them on Put and
* Update actions.
*/
static final ByteSizeValue MAX_REQUEST_SIZE = ByteSizeValue.ofMb(5);
@Override
public List<Route> routes() {
return singletonList(new Route(PUT, TransformField.REST_BASE_PATH_TRANSFORMS_BY_ID));
@ -35,6 +46,11 @@ public class RestPutTransformAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.contentLength() > MAX_REQUEST_SIZE.getBytes()) {
throw ExceptionsHelper.badRequestException(
"Request is too large: was [{}b], expected at most [{}]", restRequest.contentLength(), MAX_REQUEST_SIZE);
}
String id = restRequest.param(TransformField.ID.getPreferredName());
XContentParser parser = restRequest.contentParser();

View file

@ -12,6 +12,7 @@ import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
@ -20,6 +21,7 @@ import java.util.List;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.xpack.transform.rest.action.RestPutTransformAction.MAX_REQUEST_SIZE;
public class RestUpdateTransformAction extends BaseRestHandler {
@ -35,6 +37,11 @@ public class RestUpdateTransformAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.contentLength() > MAX_REQUEST_SIZE.getBytes()) {
throw ExceptionsHelper.badRequestException(
"Request is too large: was [{}b], expected at most [{}]", restRequest.contentLength(), MAX_REQUEST_SIZE);
}
String id = restRequest.param(TransformField.ID.getPreferredName());
boolean deferValidation = restRequest.paramAsBoolean(TransformField.DEFER_VALIDATION.getPreferredName(), false);
XContentParser parser = restRequest.contentParser();

View file

@ -312,6 +312,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
new SettingsConfig(pageSize, null, (Boolean) null, null),
null,
null,
null,
null
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
@ -386,6 +387,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
new SettingsConfig(pageSize, null, (Boolean) null, null),
null,
null,
null,
null
);
SearchResponse searchResponse = new SearchResponse(
@ -449,6 +451,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
new SettingsConfig(pageSize, null, (Boolean) null, null),
null,
null,
null,
null
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
@ -530,6 +533,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
new TimeRetentionPolicyConfig(randomAlphaOfLength(10), TimeValue.timeValueSeconds(10)),
null,
null
@ -631,6 +635,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
new TimeRetentionPolicyConfig(randomAlphaOfLength(10), TimeValue.timeValueSeconds(10)),
null,
null
@ -732,6 +737,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
null,
null,
null,
null,
null
);

View file

@ -276,6 +276,7 @@ public class TransformIndexerStateTests extends ESTestCase {
null,
null,
null,
null,
null
);
@ -508,6 +509,7 @@ public class TransformIndexerStateTests extends ESTestCase {
new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null),
null,
null,
null,
null
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);

View file

@ -300,6 +300,7 @@ public class TransformIndexerTests extends ESTestCase {
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
null,
null,
TimeRetentionPolicyConfigTests.randomTimeRetentionPolicyConfig(),
null,
null
@ -344,6 +345,7 @@ public class TransformIndexerTests extends ESTestCase {
null,
null,
null,
null,
null
);
@ -397,6 +399,7 @@ public class TransformIndexerTests extends ESTestCase {
null,
null,
null,
null,
null
);
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STARTED);