mirror of
https://github.com/elastic/elasticsearch.git
synced 2025-04-24 23:27:25 -04:00
[Transform] Allow specifying destination index aliases in the Transform's dest config (#94943)
This commit is contained in:
parent
32c17d79c5
commit
2b70165ffd
41 changed files with 1076 additions and 299 deletions
5
docs/changelog/94943.yaml
Normal file
5
docs/changelog/94943.yaml
Normal file
|
@ -0,0 +1,5 @@
|
|||
pr: 94943
|
||||
summary: Allow specifying destination index aliases in the Transform's `dest` config
|
||||
area: Transform
|
||||
type: enhancement
|
||||
issues: []
|
|
@ -193,6 +193,25 @@ mappings for the destination index are undesirable, use the
|
|||
<<indices-create-index,Create index API>> prior to starting the {transform}.
|
||||
end::dest-index[]
|
||||
|
||||
tag::dest-aliases[]
|
||||
The aliases that the destination index for the {transform} should have.
|
||||
Aliases are manipulated using the stored credentials of the transform, which means the secondary credentials supplied
|
||||
at creation time (if both primary and secondary credentials are specified).
|
||||
|
||||
The destination index is added to the aliases regardless whether the destination
|
||||
index was created by the transform or pre-created by the user.
|
||||
end::dest-aliases[]
|
||||
|
||||
tag::dest-aliases-alias[]
|
||||
The name of the alias.
|
||||
end::dest-aliases-alias[]
|
||||
|
||||
tag::dest-aliases-move-on-creation[]
|
||||
Whether or not the destination index should be the **only** index in this alias.
|
||||
If `true`, all the other indices will be removed from this alias before adding the destination index to this alias.
|
||||
Defaults to `false`.
|
||||
end::dest-aliases-move-on-creation[]
|
||||
|
||||
tag::dest-pipeline[]
|
||||
The unique identifier for an <<ingest,ingest pipeline>>.
|
||||
end::dest-pipeline[]
|
||||
|
|
|
@ -111,6 +111,26 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest]
|
|||
(Required, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
|
||||
|
||||
//Begin aliases
|
||||
`aliases`:::
|
||||
(Optional, array of objects)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases]
|
||||
+
|
||||
.Properties of `aliases`
|
||||
[%collapsible%open]
|
||||
=====
|
||||
|
||||
`alias`::::
|
||||
(Required, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases-alias]
|
||||
|
||||
`move_on_creation`::::
|
||||
(Optional, boolean)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases-move-on-creation]
|
||||
|
||||
=====
|
||||
//End aliases
|
||||
|
||||
`pipeline`:::
|
||||
(Optional, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-pipeline]
|
||||
|
|
|
@ -92,6 +92,26 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest]
|
|||
(Required, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-index]
|
||||
|
||||
//Begin aliases
|
||||
`aliases`:::
|
||||
(Optional, array of objects)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases]
|
||||
+
|
||||
.Properties of `aliases`
|
||||
[%collapsible%open]
|
||||
=====
|
||||
|
||||
`alias`::::
|
||||
(Required, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases-alias]
|
||||
|
||||
`move_on_creation`::::
|
||||
(Optional, boolean)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-aliases-move-on-creation]
|
||||
|
||||
=====
|
||||
//End aliases
|
||||
|
||||
`pipeline`:::
|
||||
(Optional, string)
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=dest-pipeline]
|
||||
|
|
|
@ -40,6 +40,8 @@ public class TransformMessages {
|
|||
+ "Use force stop and then restart the transform once error is resolved.";
|
||||
|
||||
public static final String FAILED_TO_CREATE_DESTINATION_INDEX = "Could not create destination index [{0}] for transform [{1}]";
|
||||
public static final String FAILED_TO_SET_UP_DESTINATION_ALIASES =
|
||||
"Could not set up aliases for destination index [{0}] for transform [{1}]";
|
||||
public static final String FAILED_TO_RELOAD_TRANSFORM_CONFIGURATION = "Failed to reload transform configuration for transform [{0}]";
|
||||
public static final String FAILED_TO_LOAD_TRANSFORM_CONFIGURATION = "Failed to load transform configuration for transform [{0}]";
|
||||
public static final String FAILED_TO_PARSE_TRANSFORM_CONFIGURATION = "Failed to parse transform configuration for transform [{0}]";
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.transform.transforms;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.xcontent.ConstructingObjectParser;
|
||||
import org.elasticsearch.xcontent.ParseField;
|
||||
import org.elasticsearch.xcontent.ToXContentObject;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
|
||||
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
|
||||
|
||||
public class DestAlias implements Writeable, ToXContentObject {
|
||||
|
||||
public static final ParseField ALIAS = new ParseField("alias");
|
||||
public static final ParseField MOVE_ON_CREATION = new ParseField("move_on_creation");
|
||||
|
||||
public static final ConstructingObjectParser<DestAlias, Void> STRICT_PARSER = createParser(false);
|
||||
public static final ConstructingObjectParser<DestAlias, Void> LENIENT_PARSER = createParser(true);
|
||||
|
||||
private static ConstructingObjectParser<DestAlias, Void> createParser(boolean lenient) {
|
||||
ConstructingObjectParser<DestAlias, Void> parser = new ConstructingObjectParser<>(
|
||||
"data_frame_config_dest_alias",
|
||||
lenient,
|
||||
args -> new DestAlias((String) args[0], (Boolean) args[1])
|
||||
);
|
||||
parser.declareString(constructorArg(), ALIAS);
|
||||
parser.declareBoolean(optionalConstructorArg(), MOVE_ON_CREATION);
|
||||
return parser;
|
||||
}
|
||||
|
||||
private final String alias;
|
||||
private final boolean moveOnCreation;
|
||||
|
||||
public DestAlias(String alias, Boolean moveOnCreation) {
|
||||
this.alias = ExceptionsHelper.requireNonNull(alias, ALIAS.getPreferredName());
|
||||
this.moveOnCreation = moveOnCreation != null ? moveOnCreation : false;
|
||||
}
|
||||
|
||||
public DestAlias(final StreamInput in) throws IOException {
|
||||
alias = in.readString();
|
||||
moveOnCreation = in.readBoolean();
|
||||
}
|
||||
|
||||
public String getAlias() {
|
||||
return alias;
|
||||
}
|
||||
|
||||
public boolean isMoveOnCreation() {
|
||||
return moveOnCreation;
|
||||
}
|
||||
|
||||
public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
|
||||
if (alias.isEmpty()) {
|
||||
validationException = addValidationError("dest.aliases.alias must not be empty", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(alias);
|
||||
out.writeBoolean(moveOnCreation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(ALIAS.getPreferredName(), alias);
|
||||
builder.field(MOVE_ON_CREATION.getPreferredName(), moveOnCreation);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (other == this) {
|
||||
return true;
|
||||
}
|
||||
if (other == null || other.getClass() != getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DestAlias that = (DestAlias) other;
|
||||
return Objects.equals(alias, that.alias) && moveOnCreation == that.moveOnCreation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(alias, moveOnCreation);
|
||||
}
|
||||
|
||||
public static DestAlias fromXContent(final XContentParser parser, boolean lenient) throws IOException {
|
||||
return lenient ? LENIENT_PARSER.apply(parser, null) : STRICT_PARSER.apply(parser, null);
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
package org.elasticsearch.xpack.core.transform.transforms;
|
||||
|
||||
import org.elasticsearch.TransportVersion;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -21,6 +22,7 @@ import org.elasticsearch.xpack.core.deprecation.DeprecationIssue;
|
|||
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
|
@ -31,32 +33,42 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
|
|||
public class DestConfig implements Writeable, ToXContentObject {
|
||||
|
||||
public static final ParseField INDEX = new ParseField("index");
|
||||
public static final ParseField ALIASES = new ParseField("aliases");
|
||||
public static final ParseField PIPELINE = new ParseField("pipeline");
|
||||
|
||||
public static final ConstructingObjectParser<DestConfig, Void> STRICT_PARSER = createParser(false);
|
||||
public static final ConstructingObjectParser<DestConfig, Void> LENIENT_PARSER = createParser(true);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static ConstructingObjectParser<DestConfig, Void> createParser(boolean lenient) {
|
||||
ConstructingObjectParser<DestConfig, Void> parser = new ConstructingObjectParser<>(
|
||||
"data_frame_config_dest",
|
||||
lenient,
|
||||
args -> new DestConfig((String) args[0], (String) args[1])
|
||||
args -> new DestConfig((String) args[0], (List<DestAlias>) args[1], (String) args[2])
|
||||
);
|
||||
parser.declareString(constructorArg(), INDEX);
|
||||
parser.declareObjectArray(optionalConstructorArg(), lenient ? DestAlias.LENIENT_PARSER : DestAlias.STRICT_PARSER, ALIASES);
|
||||
parser.declareString(optionalConstructorArg(), PIPELINE);
|
||||
return parser;
|
||||
}
|
||||
|
||||
private final String index;
|
||||
private final List<DestAlias> aliases;
|
||||
private final String pipeline;
|
||||
|
||||
public DestConfig(String index, String pipeline) {
|
||||
public DestConfig(String index, List<DestAlias> aliases, String pipeline) {
|
||||
this.index = ExceptionsHelper.requireNonNull(index, INDEX.getPreferredName());
|
||||
this.aliases = aliases;
|
||||
this.pipeline = pipeline;
|
||||
}
|
||||
|
||||
public DestConfig(final StreamInput in) throws IOException {
|
||||
index = in.readString();
|
||||
if (in.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
|
||||
aliases = in.readOptionalList(DestAlias::new);
|
||||
} else {
|
||||
aliases = null;
|
||||
}
|
||||
pipeline = in.readOptionalString();
|
||||
}
|
||||
|
||||
|
@ -64,6 +76,10 @@ public class DestConfig implements Writeable, ToXContentObject {
|
|||
return index;
|
||||
}
|
||||
|
||||
public List<DestAlias> getAliases() {
|
||||
return aliases != null ? aliases : List.of();
|
||||
}
|
||||
|
||||
public String getPipeline() {
|
||||
return pipeline;
|
||||
}
|
||||
|
@ -80,6 +96,9 @@ public class DestConfig implements Writeable, ToXContentObject {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(index);
|
||||
if (out.getTransportVersion().onOrAfter(TransportVersion.V_8_8_0)) {
|
||||
out.writeOptionalCollection(aliases);
|
||||
}
|
||||
out.writeOptionalString(pipeline);
|
||||
}
|
||||
|
||||
|
@ -87,6 +106,9 @@ public class DestConfig implements Writeable, ToXContentObject {
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
builder.field(INDEX.getPreferredName(), index);
|
||||
if (aliases != null) {
|
||||
builder.field(ALIASES.getPreferredName(), aliases);
|
||||
}
|
||||
if (pipeline != null) {
|
||||
builder.field(PIPELINE.getPreferredName(), pipeline);
|
||||
}
|
||||
|
@ -104,12 +126,12 @@ public class DestConfig implements Writeable, ToXContentObject {
|
|||
}
|
||||
|
||||
DestConfig that = (DestConfig) other;
|
||||
return Objects.equals(index, that.index) && Objects.equals(pipeline, that.pipeline);
|
||||
return Objects.equals(index, that.index) && Objects.equals(aliases, that.aliases) && Objects.equals(pipeline, that.pipeline);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(index, pipeline);
|
||||
return Objects.hash(index, aliases, pipeline);
|
||||
}
|
||||
|
||||
public static DestConfig fromXContent(final XContentParser parser, boolean lenient) throws IOException {
|
||||
|
|
|
@ -55,7 +55,7 @@ public class PreviewTransformActionRequestTests extends AbstractSerializingTrans
|
|||
TransformConfig config = new TransformConfig(
|
||||
"transform-preview",
|
||||
randomSourceConfig(),
|
||||
new DestConfig("unused-transform-preview-index", null),
|
||||
new DestConfig("unused-transform-preview-index", null, null),
|
||||
null,
|
||||
randomBoolean() ? TransformConfigTests.randomSyncConfig() : null,
|
||||
null,
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.core.transform.transforms;
|
||||
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.xpack.core.transform.AbstractSerializingTransformTestCase;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.emptyString;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class DestAliasTests extends AbstractSerializingTransformTestCase<DestAlias> {
|
||||
|
||||
private boolean lenient;
|
||||
|
||||
public static DestAlias randomDestAlias() {
|
||||
return new DestAlias(randomAlphaOfLength(10), randomBoolean());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setRandomFeatures() {
|
||||
lenient = randomBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DestAlias doParseInstance(XContentParser parser) throws IOException {
|
||||
return DestAlias.fromXContent(parser, lenient);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean supportsUnknownFields() {
|
||||
return lenient;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DestAlias createTestInstance() {
|
||||
return randomDestAlias();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected DestAlias mutateInstance(DestAlias instance) {
|
||||
return new DestAlias(instance.getAlias() + "-x", instance.isMoveOnCreation() == false);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Reader<DestAlias> instanceReader() {
|
||||
return DestAlias::new;
|
||||
}
|
||||
|
||||
public void testFailOnEmptyAlias() throws IOException {
|
||||
boolean lenient2 = randomBoolean();
|
||||
String json = "{ \"alias\": \"\" }";
|
||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, json)) {
|
||||
DestAlias destAlias = DestAlias.fromXContent(parser, lenient2);
|
||||
assertThat(destAlias.getAlias(), is(emptyString()));
|
||||
ValidationException validationException = destAlias.validate(null);
|
||||
assertThat(validationException, is(notNullValue()));
|
||||
assertThat(validationException.getMessage(), containsString("dest.aliases.alias must not be empty"));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -26,7 +26,11 @@ public class DestConfigTests extends AbstractSerializingTransformTestCase<DestCo
|
|||
private boolean lenient;
|
||||
|
||||
public static DestConfig randomDestConfig() {
|
||||
return new DestConfig(randomAlphaOfLength(10), randomBoolean() ? null : randomAlphaOfLength(10));
|
||||
return new DestConfig(
|
||||
randomAlphaOfLength(10),
|
||||
randomBoolean() ? null : randomList(5, DestAliasTests::randomDestAlias),
|
||||
randomBoolean() ? null : randomAlphaOfLength(10)
|
||||
);
|
||||
}
|
||||
|
||||
@Before
|
||||
|
|
|
@ -21,6 +21,7 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.PivotConfigTests;
|
|||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
|
@ -146,7 +147,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
|
|||
|
||||
assertThat(config, equalTo(update.apply(config)));
|
||||
SourceConfig sourceConfig = new SourceConfig("the_new_index");
|
||||
DestConfig destConfig = new DestConfig("the_new_dest", "my_new_pipeline");
|
||||
DestConfig destConfig = new DestConfig("the_new_dest", List.of(new DestAlias("my_new_alias", false)), "my_new_pipeline");
|
||||
TimeValue frequency = TimeValue.timeValueSeconds(10);
|
||||
SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30));
|
||||
String newDescription = "new description";
|
||||
|
|
|
@ -48,7 +48,8 @@ testClusters.matching { it.name == 'javaRestTest' }.configureEach {
|
|||
rolesFile file('roles.yml')
|
||||
user username: "x_pack_rest_user", password: "x-pack-test-password"
|
||||
user username: "john_junior", password: "x-pack-test-password", role: "transform_admin"
|
||||
user username: "bill_senior", password: "x-pack-test-password", role: "transform_admin,source_index_access"
|
||||
user username: "not_a_transform_admin", password: "x-pack-test-password", role: "source_index_access"
|
||||
user username: "fleet_access", password: "x-pack-test-password", role: "transform_admin,source_index_access,fleet_index_access"
|
||||
user username: "bill_senior", password: "x-pack-test-password", role: "transform_admin,source_index_access,dest_index_access"
|
||||
user username: "source_and_dest_index_access_only", password: "x-pack-test-password", role: "source_index_access,dest_index_access"
|
||||
user username: "transform_user_but_not_admin", password: "x-pack-test-password", role: "transform_user,source_index_access,dest_index_access"
|
||||
user username: "fleet_access", password: "x-pack-test-password", role: "transform_admin,fleet_index_access,dest_index_access"
|
||||
}
|
||||
|
|
|
@ -4,9 +4,21 @@ source_index_access:
|
|||
- cluster:monitor/main
|
||||
- cluster:monitor/tasks/lists
|
||||
indices:
|
||||
# Give access to the source and destination indices because the transform roles alone do not provide access to
|
||||
# non-transform indices
|
||||
- names: [ 'transform-permissions-*' ]
|
||||
# Give access to the source indices because the transform roles alone do not provide access to non-transform indices
|
||||
- names: [ 'transform-permissions-*-index' ]
|
||||
privileges:
|
||||
- create_index
|
||||
- indices:admin/refresh
|
||||
- read
|
||||
- write
|
||||
- view_index_metadata
|
||||
- indices:data/write/bulk
|
||||
- indices:data/write/index
|
||||
|
||||
dest_index_access:
|
||||
indices:
|
||||
# Give access to the destination indices
|
||||
- names: [ 'transform-permissions-*-dest' ]
|
||||
privileges:
|
||||
- create_index
|
||||
- indices:admin/refresh
|
||||
|
|
|
@ -16,8 +16,11 @@ import org.elasticsearch.core.TimeValue;
|
|||
import org.elasticsearch.search.aggregations.AggregationBuilders;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.QueryConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.pivot.SingleGroupSource;
|
||||
|
@ -25,10 +28,12 @@ import org.elasticsearch.xpack.core.transform.transforms.pivot.TermsGroupSource;
|
|||
import org.junit.After;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
|
||||
|
@ -42,16 +47,24 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
||||
|
||||
private static final String TEST_ADMIN_USERNAME = "x_pack_rest_user";
|
||||
private static final String TEST_ADMIN_HEADER = basicAuthHeaderValue(TEST_ADMIN_USERNAME, TEST_PASSWORD_SECURE_STRING);
|
||||
private static final String JUNIOR_USERNAME = "john_junior";
|
||||
private static final String JUNIOR_HEADER = basicAuthHeaderValue(JUNIOR_USERNAME, TEST_PASSWORD_SECURE_STRING);
|
||||
private static final String SENIOR_USERNAME = "bill_senior";
|
||||
private static final String SENIOR_HEADER = basicAuthHeaderValue(SENIOR_USERNAME, TEST_PASSWORD_SECURE_STRING);
|
||||
private static final String NOT_A_TRANSFORM_ADMIN = "not_a_transform_admin";
|
||||
private static final String NOT_A_TRANSFORM_ADMIN_HEADER = basicAuthHeaderValue(NOT_A_TRANSFORM_ADMIN, TEST_PASSWORD_SECURE_STRING);
|
||||
private static final String FLEET_ACCESS_USERNAME = "fleet_access";
|
||||
private static final String FLEET_ACCESS_HEADER = basicAuthHeaderValue(FLEET_ACCESS_USERNAME, TEST_PASSWORD_SECURE_STRING);
|
||||
private enum Users {
|
||||
TEST_ADMIN("x_pack_rest_user", List.of()),
|
||||
JUNIOR("john_junior", List.of("transform_admin")),
|
||||
SENIOR("bill_senior", List.of("transform_admin", "source_index_access", "dest_index_access")),
|
||||
SOURCE_AND_DEST_INDEX_ACCESS_ONLY("source_and_dest_index_access_only", List.of("source_index_access", "dest_index_access")),
|
||||
TRANSFORM_USER_BUT_NOT_ADMIN("transform_user_but_not_admin", List.of("transform_user", "source_index_access", "dest_index_access")),
|
||||
FLEET_ACCESS("fleet_access", List.of("transform_admin", "fleet_index_access", "dest_index_access"));
|
||||
|
||||
private final String username;
|
||||
private final String effectiveRoles;
|
||||
private final String header;
|
||||
|
||||
Users(String username, List<String> effectiveRoles) {
|
||||
this.username = username;
|
||||
this.effectiveRoles = effectiveRoles.stream().sorted().collect(Collectors.joining(","));
|
||||
this.header = basicAuthHeaderValue(username, TEST_PASSWORD_SECURE_STRING);
|
||||
}
|
||||
}
|
||||
|
||||
private static final int NUM_USERS = 28;
|
||||
|
||||
|
@ -85,10 +98,10 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
private void testTransformPermissionsNoDefer(boolean unattended) throws Exception {
|
||||
String transformId = "transform-permissions-nodefer-" + (unattended ? 1 : 0);
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, unattended);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, unattended);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
|
@ -96,7 +109,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
transformId,
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder()
|
||||
.addHeader(AUTH_KEY, JUNIOR_HEADER)
|
||||
.addHeader(AUTH_KEY, Users.JUNIOR.header)
|
||||
.addParameter("defer_validation", String.valueOf(false))
|
||||
.build()
|
||||
)
|
||||
|
@ -107,11 +120,11 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
containsString(
|
||||
Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[create_index, index, read]]",
|
||||
+ "[%s:[create_index, index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
)
|
||||
)
|
||||
);
|
||||
|
@ -120,7 +133,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
transformId,
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder()
|
||||
.addHeader(AUTH_KEY, SENIOR_HEADER)
|
||||
.addHeader(AUTH_KEY, Users.SENIOR.header)
|
||||
.addParameter("defer_validation", String.valueOf(false))
|
||||
.build()
|
||||
);
|
||||
|
@ -128,6 +141,44 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertGreen(transformId);
|
||||
}
|
||||
|
||||
public void testTransformWithDestinationAlias() throws Exception {
|
||||
String transformId = "transform-permissions-with-destination-alias";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = transformId + "-dest";
|
||||
String destAliasName = transformId + "-alias";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(
|
||||
transformId,
|
||||
sourceIndexName,
|
||||
destIndexName,
|
||||
List.of(new DestAlias(destAliasName, false)),
|
||||
false
|
||||
);
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build()
|
||||
)
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(
|
||||
e.getMessage(),
|
||||
containsString(
|
||||
Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions [%s:[manage], %s:[manage], %s:[]]",
|
||||
transformId,
|
||||
Users.SENIOR.username,
|
||||
destAliasName,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = true
|
||||
* unattended = false
|
||||
|
@ -136,10 +187,10 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
public void testTransformPermissionsDeferNoUnattendedNoDest() throws Exception {
|
||||
String transformId = "transform-permissions-defer-nounattended";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, false);
|
||||
putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
|
@ -147,17 +198,17 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
);
|
||||
String authIssue = Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[create_index, index, read]]",
|
||||
+ "[%s:[create_index, index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
);
|
||||
assertRed(transformId, authIssue);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build())
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.JUNIOR.header).build())
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(e.getMessage(), containsString(authIssue));
|
||||
|
@ -166,7 +217,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
|
||||
e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build())
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build())
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(e.getMessage(), containsString(authIssue));
|
||||
|
@ -174,7 +225,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertRed(transformId, authIssue);
|
||||
|
||||
// update transform's credentials so that the transform has permission to access source/dest indices
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build());
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
|
||||
|
||||
assertGreen(transformId);
|
||||
|
||||
|
@ -193,12 +244,12 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
public void testTransformPermissionsDeferNoUnattendedDest() throws Exception {
|
||||
String transformId = "transform-permissions-defer-nounattended-dest-exists";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
createIndex(adminClient(), destIndexName, Settings.EMPTY);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, false);
|
||||
putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
|
@ -206,17 +257,17 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
);
|
||||
String authIssue = Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[index, read]]",
|
||||
+ "[%s:[index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
);
|
||||
assertRed(transformId, authIssue);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build())
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.JUNIOR.header).build())
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(e.getMessage(), containsString(authIssue));
|
||||
|
@ -225,7 +276,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
|
||||
e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build())
|
||||
() -> startTransform(config.getId(), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build())
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(e.getMessage(), containsString(authIssue));
|
||||
|
@ -233,7 +284,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertRed(transformId, authIssue);
|
||||
|
||||
// update transform's credentials so that the transform has permission to access source/dest indices
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build());
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
|
||||
|
||||
assertGreen(transformId);
|
||||
|
||||
|
@ -244,37 +295,101 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertGreen(transformId);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = false
|
||||
* unattended = false
|
||||
*/
|
||||
public void testNoTransformAdminRoleNoDeferNoUnattended() throws Exception {
|
||||
testNoTransformAdminRole(false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = false
|
||||
* unattended = true
|
||||
*/
|
||||
public void testNoTransformAdminRoleNoDeferUnattended() throws Exception {
|
||||
testNoTransformAdminRole(false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = true
|
||||
* unattended = false
|
||||
*/
|
||||
public void testNoTransformAdminRoleDeferNoUnattended() throws Exception {
|
||||
testNoTransformAdminRole(true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = true
|
||||
* unattended = true
|
||||
*/
|
||||
public void testNoTransformAdminRoleDeferUnattended() throws Exception {
|
||||
testNoTransformAdminRole(true, true);
|
||||
}
|
||||
|
||||
private void testNoTransformAdminRole(boolean deferValidation, boolean unattended) throws Exception {
|
||||
Users user = randomFrom(Users.SOURCE_AND_DEST_INDEX_ACCESS_ONLY, Users.TRANSFORM_USER_BUT_NOT_ADMIN);
|
||||
String transformId = "transform-permissions-no-transform-role";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, unattended);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder()
|
||||
.addHeader(AUTH_KEY, user.header)
|
||||
.addParameter("defer_validation", String.valueOf(deferValidation))
|
||||
.build()
|
||||
)
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(
|
||||
e.getMessage(),
|
||||
containsString(
|
||||
Strings.format(
|
||||
"action [cluster:admin/transform/put] is unauthorized for user [%s] with effective roles [%s], "
|
||||
+ "this action is granted by the cluster privileges [manage_data_frame_transforms,manage_transform,manage,all]",
|
||||
user.username,
|
||||
user.effectiveRoles
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* defer_validation = true
|
||||
* unattended = false
|
||||
*/
|
||||
public void testNoTransformAdminRoleInSecondaryAuth() throws Exception {
|
||||
String transformId = "transform-permissions-no-admin-role";
|
||||
Users user = randomFrom(Users.SOURCE_AND_DEST_INDEX_ACCESS_ONLY, Users.TRANSFORM_USER_BUT_NOT_ADMIN);
|
||||
String transformId = "transform-permissions-no-transform-role-in-sec-auth";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, false);
|
||||
|
||||
// PUT with defer_validation should work even though the secondary auth does not have transform_admin role
|
||||
// PUT with defer_validation should work even though the secondary auth does not have transform_admin nor transform_user role
|
||||
putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder()
|
||||
.addHeader(SECONDARY_AUTH_KEY, NOT_A_TRANSFORM_ADMIN_HEADER)
|
||||
.addHeader(SECONDARY_AUTH_KEY, user.header)
|
||||
.addParameter("defer_validation", String.valueOf(true))
|
||||
.build()
|
||||
);
|
||||
|
||||
// _update should work even though the secondary auth does not have transform_admin role
|
||||
updateConfig(
|
||||
transformId,
|
||||
"{}",
|
||||
RequestOptions.DEFAULT.toBuilder().addHeader(SECONDARY_AUTH_KEY, NOT_A_TRANSFORM_ADMIN_HEADER).build()
|
||||
);
|
||||
// _update should work even though the secondary auth does not have transform_admin nor transform_user role
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(SECONDARY_AUTH_KEY, user.header).build());
|
||||
|
||||
// _start works because user not_a_transform_admin has data access
|
||||
// _start works because user source_and_dest_index_access_only does have data access
|
||||
startTransform(config.getId(), RequestOptions.DEFAULT);
|
||||
waitUntilCheckpoint(transformId, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -285,10 +400,10 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
public void testTransformPermissionsDeferUnattendedNoDest() throws Exception {
|
||||
String transformId = "transform-permissions-defer-unattended";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, true);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, true);
|
||||
putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
|
@ -296,11 +411,11 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
);
|
||||
String authIssue = Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[create_index, index, read]]",
|
||||
+ "[%s:[create_index, index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
);
|
||||
assertRed(transformId, authIssue);
|
||||
|
||||
|
@ -311,7 +426,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertBusy(() -> assertRed(transformId, authIssue, noSuchIndexIssue), 10, TimeUnit.SECONDS);
|
||||
|
||||
// update transform's credentials so that the transform has permission to access source/dest indices
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build());
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
|
||||
waitUntilCheckpoint(transformId, 1);
|
||||
|
||||
// transform is green again
|
||||
|
@ -326,12 +441,12 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
public void testTransformPermissionsDeferUnattendedDest() throws Exception {
|
||||
String transformId = "transform-permissions-defer-unattended-dest-exists";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
createIndex(adminClient(), destIndexName, Settings.EMPTY);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, true);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, true);
|
||||
putTransform(
|
||||
transformId,
|
||||
Strings.toString(config),
|
||||
|
@ -339,11 +454,11 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
);
|
||||
String authIssue = Strings.format(
|
||||
"Cannot create transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[index, read]]",
|
||||
+ "[%s:[index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
);
|
||||
assertRed(transformId, authIssue);
|
||||
|
||||
|
@ -353,7 +468,7 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
assertRed(transformId, authIssue);
|
||||
|
||||
// update transform's credentials so that the transform has permission to access source/dest indices
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build());
|
||||
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
|
||||
waitUntilCheckpoint(transformId, 1);
|
||||
|
||||
// transform is green again
|
||||
|
@ -363,14 +478,17 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
public void testPreviewRequestFailsPermissionsCheck() throws Exception {
|
||||
String transformId = "transform-permissions-preview";
|
||||
String sourceIndexName = transformId + "-index";
|
||||
String destIndexName = sourceIndexName + "-dest";
|
||||
String destIndexName = transformId + "-dest";
|
||||
createReviewsIndex(sourceIndexName, 10, NUM_USERS, TransformIT::getUserIdForRow, TransformIT::getDateStringForRow);
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, false);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexName, destIndexName, null, false);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> previewTransform(Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, JUNIOR_HEADER).build())
|
||||
() -> previewTransform(
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.JUNIOR.header).build()
|
||||
)
|
||||
);
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(403)));
|
||||
assertThat(
|
||||
|
@ -378,16 +496,16 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
containsString(
|
||||
Strings.format(
|
||||
"Cannot preview transform [%s] because user %s lacks the required permissions "
|
||||
+ "[%s:[read, view_index_metadata], %s:[create_index, index, read]]",
|
||||
+ "[%s:[create_index, index, read], %s:[read, view_index_metadata]]",
|
||||
transformId,
|
||||
JUNIOR_USERNAME,
|
||||
sourceIndexName,
|
||||
destIndexName
|
||||
Users.JUNIOR.username,
|
||||
destIndexName,
|
||||
sourceIndexName
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
previewTransform(Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, SENIOR_HEADER).build());
|
||||
previewTransform(Strings.toString(config), RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
|
||||
}
|
||||
|
||||
public void testFleetIndicesAccess() throws Exception {
|
||||
|
@ -395,33 +513,38 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
String sourceIndexPattern = ".fleet-agents*";
|
||||
String destIndexName = transformId + "-dest";
|
||||
|
||||
TransformConfig config = createConfig(transformId, sourceIndexPattern, destIndexName, false);
|
||||
TransformConfig config = createConfig(transformId, sourceIndexPattern, destIndexName, null, false);
|
||||
|
||||
ResponseException e = expectThrows(
|
||||
ResponseException.class,
|
||||
() -> previewTransform(
|
||||
Strings.toString(config),
|
||||
RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, FLEET_ACCESS_HEADER).build()
|
||||
RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.FLEET_ACCESS.header).build()
|
||||
)
|
||||
);
|
||||
// The _preview request got past the authorization step (which is what interests us in this test) but failed because the referenced
|
||||
// source indices do not exist.
|
||||
assertThat(e.getResponse().getStatusLine().getStatusCode(), is(equalTo(400)));
|
||||
assertThat("Error was: " + e.getMessage(), e.getResponse().getStatusLine().getStatusCode(), is(equalTo(400)));
|
||||
assertThat(e.getMessage(), containsString("Source indices have been deleted or closed."));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings restAdminSettings() {
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", TEST_ADMIN_HEADER).build();
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", Users.TEST_ADMIN.header).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings restClientSettings() {
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", JUNIOR_HEADER).build();
|
||||
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", Users.JUNIOR.header).build();
|
||||
}
|
||||
|
||||
private TransformConfig createConfig(String transformId, String sourceIndexName, String destIndexName, boolean unattended)
|
||||
throws Exception {
|
||||
private TransformConfig createConfig(
|
||||
String transformId,
|
||||
String sourceIndexName,
|
||||
String destIndexName,
|
||||
List<DestAlias> aliases,
|
||||
boolean unattended
|
||||
) throws Exception {
|
||||
Map<String, SingleGroupSource> groups = Map.of(
|
||||
"by-day",
|
||||
createDateHistogramGroupSourceWithCalendarInterval("timestamp", DateHistogramInterval.DAY, null),
|
||||
|
@ -435,7 +558,12 @@ public class TransformInsufficientPermissionsIT extends TransformRestTestCase {
|
|||
.addAggregator(AggregationBuilders.avg("review_score").field("stars"))
|
||||
.addAggregator(AggregationBuilders.max("timestamp").field("timestamp"));
|
||||
|
||||
TransformConfig config = createTransformConfigBuilder(transformId, destIndexName, QueryConfig.matchAll(), sourceIndexName)
|
||||
TransformConfig config = TransformConfig.builder()
|
||||
.setId(transformId)
|
||||
.setSource(new SourceConfig(new String[] { sourceIndexName }, QueryConfig.matchAll(), Collections.emptyMap()))
|
||||
.setDest(new DestConfig(destIndexName, aliases, null))
|
||||
.setFrequency(TimeValue.timeValueSeconds(10))
|
||||
.setDescription("Test transform config id: " + transformId)
|
||||
.setPivotConfig(createPivotConfig(groups, aggs))
|
||||
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
|
||||
.setSettings(new SettingsConfig.Builder().setAlignCheckpoints(false).setUnattended(unattended).build())
|
||||
|
|
|
@ -391,7 +391,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
return TransformConfig.builder()
|
||||
.setId(id)
|
||||
.setSource(new SourceConfig(sourceIndices, queryConfig, Collections.emptyMap()))
|
||||
.setDest(new DestConfig(destinationIndex, null))
|
||||
.setDest(new DestConfig(destinationIndex, null, null))
|
||||
.setFrequency(TimeValue.timeValueSeconds(10))
|
||||
.setDescription("Test transform config id: " + id);
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ public class DateHistogramGroupByIT extends ContinuousTestCase {
|
|||
}
|
||||
|
||||
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, null, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setId(NAME);
|
||||
|
||||
var groupConfig = TransformRestTestCase.createGroupConfig(
|
||||
|
|
|
@ -62,7 +62,7 @@ public class DateHistogramGroupByOtherTimeFieldIT extends ContinuousTestCase {
|
|||
transformConfigBuilder.setSettings(addCommonSettings(new SettingsConfig.Builder()).setDatesAsEpochMillis(true).build());
|
||||
}
|
||||
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, null, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setId(NAME);
|
||||
|
||||
Map<String, SingleGroupSource> groupSource = new HashMap<>();
|
||||
|
|
|
@ -47,7 +47,7 @@ public class HistogramGroupByIT extends ContinuousTestCase {
|
|||
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
|
||||
addCommonBuilderParameters(transformConfigBuilder);
|
||||
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, null, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setId(NAME);
|
||||
|
||||
var groupConfig = TransformRestTestCase.createGroupConfig(
|
||||
|
|
|
@ -58,7 +58,7 @@ public class LatestContinuousIT extends ContinuousTestCase {
|
|||
public TransformConfig createConfig() {
|
||||
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder().setId(NAME)
|
||||
.setSource(new SourceConfig(new String[] { CONTINUOUS_EVENTS_SOURCE_INDEX }, QueryConfig.matchAll(), RUNTIME_MAPPINGS))
|
||||
.setDest(new DestConfig(NAME, INGEST_PIPELINE))
|
||||
.setDest(new DestConfig(NAME, null, INGEST_PIPELINE))
|
||||
.setLatestConfig(new LatestConfig(List.of(eventField), timestampField));
|
||||
addCommonBuilderParameters(transformConfigBuilder);
|
||||
return transformConfigBuilder.build();
|
||||
|
|
|
@ -45,7 +45,7 @@ public class TermsGroupByIT extends ContinuousTestCase {
|
|||
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
|
||||
addCommonBuilderParameters(transformConfigBuilder);
|
||||
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, null, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setId(NAME);
|
||||
|
||||
var groupConfig = TransformRestTestCase.createGroupConfig(
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TermsOnDateGroupByIT extends ContinuousTestCase {
|
|||
TransformConfig.Builder transformConfigBuilder = new TransformConfig.Builder();
|
||||
addCommonBuilderParameters(transformConfigBuilder);
|
||||
transformConfigBuilder.setSource(new SourceConfig(CONTINUOUS_EVENTS_SOURCE_INDEX));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setDest(new DestConfig(NAME, null, INGEST_PIPELINE));
|
||||
transformConfigBuilder.setId(NAME);
|
||||
|
||||
var groupConfig = TransformRestTestCase.createGroupConfig(
|
||||
|
|
|
@ -12,12 +12,12 @@ import org.elasticsearch.client.Request;
|
|||
import org.elasticsearch.client.ResponseException;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -72,59 +72,74 @@ public class TransformDeleteIT extends TransformRestTestCase {
|
|||
public void testDeleteDoesNotDeleteDestinationIndexByDefault() throws Exception {
|
||||
String transformId = "transform-1";
|
||||
String transformDest = transformId + "_idx";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
|
||||
String transformDestAlias = transformId + "_alias";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
|
||||
|
||||
createTransform(transformId, transformDest);
|
||||
createTransform(transformId, transformDest, transformDestAlias);
|
||||
assertFalse(indexExists(transformDest));
|
||||
assertFalse(aliasExists(transformDestAlias));
|
||||
|
||||
startTransform(transformId);
|
||||
waitForTransformCheckpoint(transformId, 1);
|
||||
|
||||
stopTransform(transformId, false);
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
|
||||
deleteTransform(transformId);
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
}
|
||||
|
||||
public void testDeleteWithParamDeletesAutoCreatedDestinationIndex() throws Exception {
|
||||
String transformId = "transform-2";
|
||||
String transformDest = transformId + "_idx";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
|
||||
String transformDestAlias = transformId + "_alias";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
|
||||
|
||||
createTransform(transformId, transformDest);
|
||||
createTransform(transformId, transformDest, transformDestAlias);
|
||||
assertFalse(indexExists(transformDest));
|
||||
assertFalse(aliasExists(transformDestAlias));
|
||||
|
||||
startTransform(transformId);
|
||||
waitForTransformCheckpoint(transformId, 1);
|
||||
|
||||
stopTransform(transformId, false);
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
|
||||
deleteTransform(transformId, true);
|
||||
assertFalse(indexExists(transformDest));
|
||||
assertFalse(aliasExists(transformDest));
|
||||
}
|
||||
|
||||
public void testDeleteWithParamDeletesManuallyCreatedDestinationIndex() throws Exception {
|
||||
String transformId = "transform-3";
|
||||
String transformDest = transformId + "_idx";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest);
|
||||
String transformDestAlias = transformId + "_alias";
|
||||
setupDataAccessRole(DATA_ACCESS_ROLE, REVIEWS_INDEX_NAME, transformDest, transformDestAlias);
|
||||
|
||||
createIndex(transformDest);
|
||||
assertTrue(indexExists(transformDest));
|
||||
// The alias does not exist yet, it will be created when the transform starts
|
||||
assertFalse(aliasExists(transformDestAlias));
|
||||
|
||||
createTransform(transformId, transformDest);
|
||||
createTransform(transformId, transformDest, transformDestAlias);
|
||||
assertFalse(aliasExists(transformDestAlias));
|
||||
|
||||
startTransform(transformId);
|
||||
waitForTransformCheckpoint(transformId, 1);
|
||||
|
||||
stopTransform(transformId, false);
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
|
||||
deleteTransform(transformId, true);
|
||||
assertFalse(indexExists(transformDest));
|
||||
assertFalse(aliasExists(transformDestAlias));
|
||||
}
|
||||
|
||||
public void testDeleteWithParamDoesNotDeleteAlias() throws Exception {
|
||||
public void testDeleteWithParamDoesNotDeleteManuallySetUpAlias() throws Exception {
|
||||
String transformId = "transform-4";
|
||||
String transformDest = transformId + "_idx";
|
||||
String transformDestAlias = transformId + "_alias";
|
||||
|
@ -132,22 +147,22 @@ public class TransformDeleteIT extends TransformRestTestCase {
|
|||
|
||||
createIndex(transformDest, null, null, "\"" + transformDestAlias + "\": { \"is_write_index\": true }");
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(indexExists(transformDestAlias));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
|
||||
createTransform(transformId, transformDestAlias);
|
||||
createTransform(transformId, transformDestAlias, null);
|
||||
|
||||
startTransform(transformId);
|
||||
waitForTransformCheckpoint(transformId, 1);
|
||||
|
||||
stopTransform(transformId, false);
|
||||
assertTrue(indexExists(transformDest));
|
||||
assertTrue(aliasExists(transformDestAlias));
|
||||
|
||||
ResponseException e = expectThrows(ResponseException.class, () -> deleteTransform(transformId, true));
|
||||
assertThat(
|
||||
e.getMessage(),
|
||||
containsString(
|
||||
String.format(
|
||||
Locale.ROOT,
|
||||
Strings.format(
|
||||
"The provided expression [%s] matches an alias, specify the corresponding concrete indices instead.",
|
||||
transformDestAlias
|
||||
)
|
||||
|
@ -155,16 +170,20 @@ public class TransformDeleteIT extends TransformRestTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
private void createTransform(String transformId, String destIndex) throws IOException {
|
||||
private void createTransform(String transformId, String destIndex, String destAlias) throws IOException {
|
||||
final Request createTransformRequest = createRequestWithAuth(
|
||||
"PUT",
|
||||
getTransformEndpoint() + transformId,
|
||||
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_1
|
||||
);
|
||||
String config = String.format(Locale.ROOT, """
|
||||
String destAliases = destAlias != null ? Strings.format("""
|
||||
, "aliases": [{"alias": "%s"}]
|
||||
""", destAlias) : "";
|
||||
String config = Strings.format("""
|
||||
{
|
||||
"dest": {
|
||||
"index": "%s"
|
||||
%s
|
||||
},
|
||||
"source": {
|
||||
"index": "%s"
|
||||
|
@ -185,7 +204,7 @@ public class TransformDeleteIT extends TransformRestTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
}""", destIndex, REVIEWS_INDEX_NAME);
|
||||
}""", destIndex, destAliases, REVIEWS_INDEX_NAME);
|
||||
createTransformRequest.setJsonEntity(config);
|
||||
Map<String, Object> createTransformResponse = entityAsMap(client().performRequest(createTransformRequest));
|
||||
assertThat(createTransformResponse.get("acknowledged"), equalTo(Boolean.TRUE));
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class TransformDestIndexIT extends TransformRestTestCase {
|
||||
|
||||
private static boolean indicesCreated = false;
|
||||
|
||||
// preserve indices in order to reuse source indices in several test cases
|
||||
@Override
|
||||
protected boolean preserveIndicesUponCompletion() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createIndexes() throws IOException {
|
||||
|
||||
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
|
||||
if (indicesCreated) {
|
||||
return;
|
||||
}
|
||||
|
||||
createReviewsIndex();
|
||||
indicesCreated = true;
|
||||
}
|
||||
|
||||
public void testTransformDestIndexMetadata() throws Exception {
|
||||
long testStarted = System.currentTimeMillis();
|
||||
String transformId = "test_meta";
|
||||
createPivotReviewsTransform(transformId, "pivot_reviews", null);
|
||||
startAndWaitForTransform(transformId, "pivot_reviews");
|
||||
|
||||
Response mappingResponse = client().performRequest(new Request("GET", "pivot_reviews/_mapping"));
|
||||
|
||||
Map<?, ?> mappingAsMap = entityAsMap(mappingResponse);
|
||||
assertEquals(
|
||||
Version.CURRENT.toString(),
|
||||
XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.version.created", mappingAsMap)
|
||||
);
|
||||
assertTrue(
|
||||
(Long) XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.creation_date_in_millis", mappingAsMap) < System
|
||||
.currentTimeMillis()
|
||||
);
|
||||
assertTrue(
|
||||
(Long) XContentMapValues.extractValue(
|
||||
"pivot_reviews.mappings._meta._transform.creation_date_in_millis",
|
||||
mappingAsMap
|
||||
) > testStarted
|
||||
);
|
||||
assertEquals(transformId, XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.transform", mappingAsMap));
|
||||
assertEquals("transform", XContentMapValues.extractValue("pivot_reviews.mappings._meta.created_by", mappingAsMap));
|
||||
|
||||
Response aliasesResponse = client().performRequest(new Request("GET", "pivot_reviews/_alias"));
|
||||
Map<?, ?> aliasesAsMap = entityAsMap(aliasesResponse);
|
||||
assertEquals(Map.of(), XContentMapValues.extractValue("pivot_reviews.aliases", aliasesAsMap));
|
||||
}
|
||||
|
||||
public void testTransformDestIndexAliases() throws Exception {
|
||||
String transformId = "test_aliases";
|
||||
String destIndex1 = transformId + ".1";
|
||||
String destIndex2 = transformId + ".2";
|
||||
String destAliasAll = transformId + ".all";
|
||||
String destAliasLatest = transformId + ".latest";
|
||||
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));
|
||||
|
||||
// Create the transform
|
||||
createPivotReviewsTransform(transformId, destIndex1, null, null, destAliases, null, null, null, REVIEWS_INDEX_NAME);
|
||||
startAndWaitForTransform(transformId, destIndex1);
|
||||
|
||||
// Verify that both aliases are configured on the dest index
|
||||
assertAliases(destIndex1, destAliasAll, destAliasLatest);
|
||||
|
||||
// Verify that the search results are the same, regardless whether we use index or alias
|
||||
assertHitsAreTheSame(destIndex1, destAliasAll, destAliasLatest);
|
||||
|
||||
// Delete the transform
|
||||
deleteTransform(transformId);
|
||||
assertAliases(destIndex1, destAliasAll, destAliasLatest);
|
||||
|
||||
// Create the transform again (this time with a different destination index)
|
||||
createPivotReviewsTransform(transformId, destIndex2, null, null, destAliases, null, null, null, REVIEWS_INDEX_NAME);
|
||||
startAndWaitForTransform(transformId, destIndex2);
|
||||
|
||||
// Verify that destAliasLatest no longer points at destIndex1 but it now points at destIndex2
|
||||
assertAliases(destIndex1, destAliasAll);
|
||||
assertAliases(destIndex2, destAliasAll, destAliasLatest);
|
||||
}
|
||||
|
||||
public void testTransformDestIndexCreatedDuringUpdate() throws Exception {
|
||||
String transformId = "test_dest_index_on_update";
|
||||
String destIndex = transformId + "-dest";
|
||||
|
||||
assertFalse(indexExists(destIndex));
|
||||
|
||||
// Create and start the unattended transform
|
||||
createPivotReviewsTransform(
|
||||
transformId,
|
||||
destIndex,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new SettingsConfig.Builder().setUnattended(true).build(),
|
||||
null,
|
||||
null,
|
||||
REVIEWS_INDEX_NAME
|
||||
);
|
||||
startTransform(transformId);
|
||||
|
||||
// Verify that the destination index does not exist
|
||||
assertFalse(indexExists(destIndex));
|
||||
|
||||
// Update the unattended transform. This will trigger destination index creation.
|
||||
// The update has to change something in the config (here, max_page_search_size). Otherwise it would have been optimized away.
|
||||
updateTransform(transformId, """
|
||||
{ "settings": { "max_page_search_size": 123 } }""");
|
||||
|
||||
// Verify that the destination index now exists
|
||||
assertTrue(indexExists(destIndex));
|
||||
}
|
||||
|
||||
private static void assertAliases(String index, String... aliases) throws IOException {
|
||||
Map<String, Map<?, ?>> expectedAliases = Arrays.stream(aliases).collect(Collectors.toMap(a -> a, a -> Map.of()));
|
||||
Response aliasesResponse = client().performRequest(new Request("GET", index + "/_alias"));
|
||||
assertEquals(expectedAliases, XContentMapValues.extractValue(index + ".aliases", entityAsMap(aliasesResponse)));
|
||||
}
|
||||
|
||||
private static void assertHitsAreTheSame(String index, String... aliases) throws IOException {
|
||||
Response indexSearchResponse = client().performRequest(new Request("GET", index + "/_search"));
|
||||
Object indexSearchHits = XContentMapValues.extractValue("hits", entityAsMap(indexSearchResponse));
|
||||
for (String alias : aliases) {
|
||||
Response aliasSearchResponse = client().performRequest(new Request("GET", alias + "/_search"));
|
||||
Object aliasSearchHits = XContentMapValues.extractValue("hits", entityAsMap(aliasSearchResponse));
|
||||
assertEquals("index = " + index + ", alias = " + alias, indexSearchHits, aliasSearchHits);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,67 +0,0 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License
|
||||
* 2.0; you may not use this file except in compliance with the Elastic License
|
||||
* 2.0.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.transform.integration;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.client.Request;
|
||||
import org.elasticsearch.client.Response;
|
||||
import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class TransformDestIndexMetadataIT extends TransformRestTestCase {
|
||||
|
||||
private boolean indicesCreated = false;
|
||||
|
||||
// preserve indices in order to reuse source indices in several test cases
|
||||
@Override
|
||||
protected boolean preserveIndicesUponCompletion() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void createIndexes() throws IOException {
|
||||
|
||||
// it's not possible to run it as @BeforeClass as clients aren't initialized then, so we need this little hack
|
||||
if (indicesCreated) {
|
||||
return;
|
||||
}
|
||||
|
||||
createReviewsIndex();
|
||||
indicesCreated = true;
|
||||
}
|
||||
|
||||
public void testTransformDestIndexMetadata() throws Exception {
|
||||
long testStarted = System.currentTimeMillis();
|
||||
createPivotReviewsTransform("test_meta", "pivot_reviews", null);
|
||||
startAndWaitForTransform("test_meta", "pivot_reviews");
|
||||
|
||||
Response mappingResponse = client().performRequest(new Request("GET", "pivot_reviews/_mapping"));
|
||||
|
||||
Map<?, ?> mappingAsMap = entityAsMap(mappingResponse);
|
||||
assertEquals(
|
||||
Version.CURRENT.toString(),
|
||||
XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.version.created", mappingAsMap)
|
||||
);
|
||||
assertTrue(
|
||||
(Long) XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.creation_date_in_millis", mappingAsMap) < System
|
||||
.currentTimeMillis()
|
||||
);
|
||||
assertTrue(
|
||||
(Long) XContentMapValues.extractValue(
|
||||
"pivot_reviews.mappings._meta._transform.creation_date_in_millis",
|
||||
mappingAsMap
|
||||
) > testStarted
|
||||
);
|
||||
assertEquals("test_meta", XContentMapValues.extractValue("pivot_reviews.mappings._meta._transform.transform", mappingAsMap));
|
||||
assertEquals("transform", XContentMapValues.extractValue("pivot_reviews.mappings._meta.created_by", mappingAsMap));
|
||||
}
|
||||
|
||||
}
|
|
@ -109,6 +109,8 @@ public class TransformPivotRestIT extends TransformRestTestCase {
|
|||
transformIndex,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
BASIC_AUTH_VALUE_NO_ACCESS,
|
||||
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
|
||||
REVIEWS_INDEX_NAME
|
||||
|
@ -138,6 +140,9 @@ public class TransformPivotRestIT extends TransformRestTestCase {
|
|||
transformIndex,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
BASIC_AUTH_VALUE_TRANSFORM_ADMIN_WITH_SOME_DATA_ACCESS,
|
||||
indexName
|
||||
);
|
||||
|
|
|
@ -22,6 +22,8 @@ import org.elasticsearch.common.xcontent.support.XContentMapValues;
|
|||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -237,12 +239,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
}
|
||||
|
||||
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query) throws IOException {
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, null);
|
||||
}
|
||||
|
||||
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline)
|
||||
throws IOException {
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, null);
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, null, null);
|
||||
}
|
||||
|
||||
protected void createReviewsIndexNano() throws IOException {
|
||||
|
@ -293,30 +290,29 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
String transformIndex,
|
||||
String query,
|
||||
String pipeline,
|
||||
String authHeader,
|
||||
String sourceIndex
|
||||
) throws IOException {
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, sourceIndex);
|
||||
}
|
||||
|
||||
protected void createPivotReviewsTransform(
|
||||
String transformId,
|
||||
String transformIndex,
|
||||
String query,
|
||||
String pipeline,
|
||||
List<DestAlias> destAliases,
|
||||
SettingsConfig settings,
|
||||
String authHeader,
|
||||
String secondaryAuthHeader,
|
||||
String sourceIndex
|
||||
) throws IOException {
|
||||
String config = "{";
|
||||
|
||||
String destConfig = Strings.format("""
|
||||
"dest": {"index":"%s"
|
||||
""", transformIndex);
|
||||
if (pipeline != null) {
|
||||
config += Strings.format("""
|
||||
"dest": {"index":"%s", "pipeline":"%s"},""", transformIndex, pipeline);
|
||||
} else {
|
||||
config += Strings.format("""
|
||||
"dest": {"index":"%s"},""", transformIndex);
|
||||
destConfig += Strings.format("""
|
||||
, "pipeline":"%s"
|
||||
""", pipeline);
|
||||
}
|
||||
if (destAliases != null && destAliases.isEmpty() == false) {
|
||||
destConfig += ", \"aliases\":[";
|
||||
destConfig += String.join(",", destAliases.stream().map(Strings::toString).toList());
|
||||
destConfig += "]";
|
||||
}
|
||||
destConfig += "},";
|
||||
config += destConfig;
|
||||
|
||||
if (query != null) {
|
||||
config += Strings.format("""
|
||||
|
@ -326,6 +322,10 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
"source": {"index":"%s"},""", sourceIndex);
|
||||
}
|
||||
|
||||
if (settings != null) {
|
||||
config += "\"settings\": " + Strings.toString(settings) + ",";
|
||||
}
|
||||
|
||||
config += """
|
||||
"pivot": {
|
||||
"group_by": {
|
||||
|
@ -355,7 +355,6 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
},
|
||||
"frequency": "1s"
|
||||
}""";
|
||||
|
||||
createReviewsTransform(transformId, authHeader, secondaryAuthHeader, config);
|
||||
}
|
||||
|
||||
|
@ -396,7 +395,19 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
|
||||
protected void createPivotReviewsTransform(String transformId, String transformIndex, String query, String pipeline, String authHeader)
|
||||
throws IOException {
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, authHeader, null, REVIEWS_INDEX_NAME);
|
||||
createPivotReviewsTransform(transformId, transformIndex, query, pipeline, null, null, authHeader, null, REVIEWS_INDEX_NAME);
|
||||
}
|
||||
|
||||
protected void updateTransform(String transformId, String update) throws IOException {
|
||||
final Request updateTransformRequest = createRequestWithSecondaryAuth(
|
||||
"POST",
|
||||
getTransformEndpoint() + transformId + "/_update",
|
||||
null,
|
||||
null
|
||||
);
|
||||
updateTransformRequest.setJsonEntity(update);
|
||||
|
||||
client().performRequest(updateTransformRequest);
|
||||
}
|
||||
|
||||
protected void startTransform(String transformId) throws IOException {
|
||||
|
@ -639,7 +650,7 @@ public abstract class TransformRestTestCase extends ESRestTestCase {
|
|||
"indices": [
|
||||
{
|
||||
"names": [ %s ],
|
||||
"privileges": [ "create_index", "delete_index", "read", "write", "view_index_metadata" ]
|
||||
"privileges": [ "create_index", "delete_index", "read", "write", "view_index_metadata", "manage" ]
|
||||
}
|
||||
]
|
||||
}""", indicesStr));
|
||||
|
|
|
@ -148,7 +148,7 @@ public class TransformNoRemoteClusterClientNodeIT extends TransformSingleNodeTes
|
|||
private static TransformConfig randomConfig(String transformId, String sourceIndex) {
|
||||
return new TransformConfig.Builder().setId(transformId)
|
||||
.setSource(new SourceConfig(sourceIndex))
|
||||
.setDest(new DestConfig("my-dest-index", null))
|
||||
.setDest(new DestConfig("my-dest-index", null, null))
|
||||
.setPivotConfig(PivotConfigTests.randomPivotConfig())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -154,7 +154,7 @@ public class TransformNoTransformNodeIT extends TransformSingleNodeTestCase {
|
|||
private static TransformConfig randomConfig(String transformId) {
|
||||
return new TransformConfig.Builder().setId(transformId)
|
||||
.setSource(new SourceConfig("my-index"))
|
||||
.setDest(new DestConfig("my-dest-index", null))
|
||||
.setDest(new DestConfig("my-dest-index", null, null))
|
||||
.setPivotConfig(PivotConfigTests.randomPivotConfig())
|
||||
.build();
|
||||
}
|
||||
|
|
|
@ -135,7 +135,7 @@ public class TransformProgressIT extends TransformSingleNodeTestCase {
|
|||
boolean missingBucket = userWithMissingBuckets > 0;
|
||||
createReviewsIndex(userWithMissingBuckets);
|
||||
SourceConfig sourceConfig = new SourceConfig(REVIEWS_INDEX_NAME);
|
||||
DestConfig destConfig = new DestConfig("unnecessary", null);
|
||||
DestConfig destConfig = new DestConfig("unnecessary", null, null);
|
||||
GroupConfig histgramGroupConfig = new GroupConfig(
|
||||
Collections.emptyMap(),
|
||||
Collections.singletonMap("every_50", new HistogramGroupSource("count", null, missingBucket, 50.0))
|
||||
|
|
|
@ -22,6 +22,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
|
|||
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
|
||||
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
|
||||
import org.elasticsearch.xpack.core.security.support.Exceptions;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.NullRetentionPolicyConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
||||
|
@ -119,6 +120,17 @@ final class TransformPrivilegeChecker {
|
|||
&& config.getRetentionPolicyConfig() instanceof NullRetentionPolicyConfig == false) {
|
||||
destPrivileges.add("delete");
|
||||
}
|
||||
if (config.getDestination().getAliases() != null && config.getDestination().getAliases().isEmpty() == false) {
|
||||
destPrivileges.add("manage");
|
||||
|
||||
RoleDescriptor.IndicesPrivileges destAliasPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(config.getDestination().getAliases().stream().map(DestAlias::getAlias).toList())
|
||||
.privileges("manage")
|
||||
.allowRestrictedIndices(true)
|
||||
.build();
|
||||
indicesPrivileges.add(destAliasPrivileges);
|
||||
}
|
||||
|
||||
RoleDescriptor.IndicesPrivileges destIndexPrivileges = RoleDescriptor.IndicesPrivileges.builder()
|
||||
.indices(destIndex)
|
||||
.privileges(destPrivileges)
|
||||
|
|
|
@ -29,14 +29,13 @@ import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -118,6 +117,7 @@ public class TransformUpdater {
|
|||
Settings settings,
|
||||
Client client,
|
||||
TransformConfigManager transformConfigManager,
|
||||
TransformAuditor auditor,
|
||||
final TransformConfig config,
|
||||
final TransformConfigUpdate update,
|
||||
final SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
|
||||
|
@ -179,6 +179,7 @@ public class TransformUpdater {
|
|||
updateTransformConfiguration(
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
indexNameExpressionResolver,
|
||||
updatedConfig,
|
||||
destIndexMappings,
|
||||
|
@ -293,6 +294,7 @@ public class TransformUpdater {
|
|||
private static void updateTransformConfiguration(
|
||||
Client client,
|
||||
TransformConfigManager transformConfigManager,
|
||||
TransformAuditor auditor,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransformConfig config,
|
||||
Map<String, String> mappings,
|
||||
|
@ -328,11 +330,9 @@ public class TransformUpdater {
|
|||
);
|
||||
|
||||
// <1> Create destination index if necessary
|
||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(
|
||||
clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
config.getDestination().getIndex()
|
||||
);
|
||||
final String destinationIndex = config.getDestination().getIndex();
|
||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destinationIndex);
|
||||
|
||||
String[] src = indexNameExpressionResolver.concreteIndexNames(
|
||||
clusterState,
|
||||
IndicesOptions.lenientExpandOpen(),
|
||||
|
@ -345,26 +345,19 @@ public class TransformUpdater {
|
|||
// we allow source indices to disappear. If the source and destination indices do not exist, don't do anything
|
||||
// the transform will just have to dynamically create the destination index without special mapping.
|
||||
&& src.length > 0) {
|
||||
createDestinationIndex(client, config, mappings, createDestinationListener);
|
||||
TransformIndex.createDestinationIndex(
|
||||
client,
|
||||
auditor,
|
||||
indexNameExpressionResolver,
|
||||
clusterState,
|
||||
config,
|
||||
mappings,
|
||||
createDestinationListener
|
||||
);
|
||||
} else {
|
||||
createDestinationListener.onResponse(null);
|
||||
}
|
||||
}
|
||||
|
||||
private static void createDestinationIndex(
|
||||
Client client,
|
||||
TransformConfig config,
|
||||
Map<String, String> mappings,
|
||||
ActionListener<Boolean> listener
|
||||
) {
|
||||
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
|
||||
mappings,
|
||||
config.getId(),
|
||||
Clock.systemUTC()
|
||||
);
|
||||
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
|
||||
}
|
||||
|
||||
private TransformUpdater() {}
|
||||
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.elasticsearch.xpack.core.transform.TransformField;
|
|||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction;
|
||||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Request;
|
||||
import org.elasticsearch.xpack.core.transform.action.PreviewTransformAction.Response;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SyncConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
|
@ -145,6 +146,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|||
config.getSource(),
|
||||
config.getDestination().getPipeline(),
|
||||
config.getDestination().getIndex(),
|
||||
config.getDestination().getAliases(),
|
||||
config.getSyncConfig(),
|
||||
listener
|
||||
),
|
||||
|
@ -199,6 +201,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<Requ
|
|||
SourceConfig source,
|
||||
String pipeline,
|
||||
String dest,
|
||||
List<DestAlias> aliases,
|
||||
SyncConfig syncConfig,
|
||||
ActionListener<Response> listener
|
||||
) {
|
||||
|
|
|
@ -123,6 +123,7 @@ public class TransportResetTransformAction extends AcknowledgedTransportMasterNo
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
transformConfigAndVersion.v1(),
|
||||
TransformConfigUpdate.EMPTY,
|
||||
transformConfigAndVersion.v2(),
|
||||
|
|
|
@ -14,9 +14,7 @@ import org.elasticsearch.ElasticsearchSecurityException;
|
|||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
|
@ -39,7 +37,6 @@ import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
|
|||
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.AuthorizationState;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
|
||||
|
@ -51,8 +48,6 @@ import org.elasticsearch.xpack.transform.persistence.TransformIndex;
|
|||
import org.elasticsearch.xpack.transform.transforms.TransformNodes;
|
||||
import org.elasticsearch.xpack.transform.transforms.TransformTask;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
|
@ -183,7 +178,22 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
|
||||
// <3> If the destination index exists, start the task, otherwise deduce our mappings for the destination index and create it
|
||||
ActionListener<ValidateTransformAction.Response> validationListener = ActionListener.wrap(validationResponse -> {
|
||||
createDestinationIndex(state, transformConfigHolder.get(), validationResponse.getDestIndexMappings(), createOrGetIndexListener);
|
||||
if (Boolean.TRUE.equals(transformConfigHolder.get().getSettings().getUnattended())) {
|
||||
logger.debug(
|
||||
() -> format("[%s] Skip dest index creation as this is an unattended transform", transformConfigHolder.get().getId())
|
||||
);
|
||||
createOrGetIndexListener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
TransformIndex.createDestinationIndex(
|
||||
client,
|
||||
auditor,
|
||||
indexNameExpressionResolver,
|
||||
state,
|
||||
transformConfigHolder.get(),
|
||||
validationResponse.getDestIndexMappings(),
|
||||
createOrGetIndexListener
|
||||
);
|
||||
}, e -> {
|
||||
if (Boolean.TRUE.equals(transformConfigHolder.get().getSettings().getUnattended())) {
|
||||
logger.debug(
|
||||
|
@ -261,60 +271,6 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
|
|||
transformConfigManager.getTransformConfiguration(request.getId(), getTransformListener);
|
||||
}
|
||||
|
||||
private void createDestinationIndex(
|
||||
ClusterState state,
|
||||
TransformConfig config,
|
||||
Map<String, String> destIndexMappings,
|
||||
ActionListener<Boolean> listener
|
||||
) {
|
||||
if (Boolean.TRUE.equals(config.getSettings().getUnattended())) {
|
||||
logger.debug(() -> format("[%s] Skip dest index creation as this is an unattended transform", config.getId()));
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
|
||||
final String destinationIndex = config.getDestination().getIndex();
|
||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), destinationIndex);
|
||||
|
||||
if (dest.length == 0) {
|
||||
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
|
||||
destIndexMappings,
|
||||
config.getId(),
|
||||
Clock.systemUTC()
|
||||
);
|
||||
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, ActionListener.wrap(r -> {
|
||||
String message = Boolean.FALSE.equals(config.getSettings().getDeduceMappings())
|
||||
? "Created destination index [" + destinationIndex + "]."
|
||||
: "Created destination index [" + destinationIndex + "] with deduced mappings.";
|
||||
auditor.info(config.getId(), message);
|
||||
listener.onResponse(r);
|
||||
}, listener::onFailure));
|
||||
} else {
|
||||
auditor.info(config.getId(), "Using existing destination index [" + destinationIndex + "].");
|
||||
ClientHelper.executeAsyncWithOrigin(
|
||||
client.threadPool().getThreadContext(),
|
||||
ClientHelper.TRANSFORM_ORIGIN,
|
||||
client.admin().indices().prepareStats(dest).clear().setDocs(true).request(),
|
||||
ActionListener.<IndicesStatsResponse>wrap(r -> {
|
||||
long docTotal = r.getTotal().docs.getCount();
|
||||
if (docTotal > 0L) {
|
||||
auditor.warning(
|
||||
config.getId(),
|
||||
"Non-empty destination index [" + destinationIndex + "]. " + "Contains [" + docTotal + "] total documents."
|
||||
);
|
||||
}
|
||||
listener.onResponse(true);
|
||||
}, e -> {
|
||||
String msg = "Unable to determine destination index stats, error: " + e.getMessage();
|
||||
logger.warn(msg, e);
|
||||
auditor.warning(config.getId(), msg);
|
||||
listener.onResponse(true);
|
||||
}),
|
||||
client.admin().indices()::stats
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(StartTransformAction.Request request, ClusterState state) {
|
||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||
|
|
|
@ -133,6 +133,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
configAndVersion.v1(),
|
||||
update,
|
||||
configAndVersion.v2(),
|
||||
|
|
|
@ -158,6 +158,7 @@ public class TransportUpgradeTransformsAction extends TransportMasterNodeAction<
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
config,
|
||||
update,
|
||||
configAndVersion.v2(),
|
||||
|
|
|
@ -12,20 +12,28 @@ import org.apache.logging.log4j.Logger;
|
|||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetadata;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.xpack.core.ClientHelper;
|
||||
import org.elasticsearch.xpack.core.transform.TransformField;
|
||||
import org.elasticsearch.xpack.core.transform.TransformMessages;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
|
||||
import java.time.Clock;
|
||||
import java.util.HashMap;
|
||||
|
@ -33,6 +41,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static java.util.stream.Collectors.joining;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.TRANSFORM_ORIGIN;
|
||||
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
|
||||
|
@ -92,12 +101,82 @@ public final class TransformIndex {
|
|||
|
||||
public static void createDestinationIndex(
|
||||
Client client,
|
||||
TransformConfig transformConfig,
|
||||
TransformAuditor auditor,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
ClusterState clusterState,
|
||||
TransformConfig config,
|
||||
Map<String, String> destIndexMappings,
|
||||
ActionListener<Boolean> listener
|
||||
) {
|
||||
final String destinationIndex = config.getDestination().getIndex();
|
||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destinationIndex);
|
||||
|
||||
// <3> Final listener
|
||||
ActionListener<Boolean> setUpDestinationAliasesListener = ActionListener.wrap(r -> {
|
||||
String message = "Set up aliases ["
|
||||
+ config.getDestination().getAliases().stream().map(DestAlias::getAlias).collect(joining(", "))
|
||||
+ "] for destination index ["
|
||||
+ destinationIndex
|
||||
+ "].";
|
||||
auditor.info(config.getId(), message);
|
||||
|
||||
listener.onResponse(r);
|
||||
}, listener::onFailure);
|
||||
|
||||
// <2> Set up destination index aliases, regardless whether the destination index was created by the transform or by the user
|
||||
ActionListener<Boolean> createDestinationIndexListener = ActionListener.wrap(createdDestinationIndex -> {
|
||||
if (createdDestinationIndex) {
|
||||
String message = Boolean.FALSE.equals(config.getSettings().getDeduceMappings())
|
||||
? "Created destination index [" + destinationIndex + "]."
|
||||
: "Created destination index [" + destinationIndex + "] with deduced mappings.";
|
||||
auditor.info(config.getId(), message);
|
||||
}
|
||||
setUpDestinationAliases(client, config, setUpDestinationAliasesListener);
|
||||
}, listener::onFailure);
|
||||
|
||||
if (dest.length == 0) {
|
||||
TransformDestIndexSettings generatedDestIndexSettings = createTransformDestIndexSettings(
|
||||
destIndexMappings,
|
||||
config.getId(),
|
||||
Clock.systemUTC()
|
||||
);
|
||||
|
||||
// <1> Create destination index
|
||||
createDestinationIndex(client, config, generatedDestIndexSettings, createDestinationIndexListener);
|
||||
} else {
|
||||
auditor.info(config.getId(), "Using existing destination index [" + destinationIndex + "].");
|
||||
// <1'> Audit existing destination index' stats
|
||||
ClientHelper.executeAsyncWithOrigin(
|
||||
client.threadPool().getThreadContext(),
|
||||
ClientHelper.TRANSFORM_ORIGIN,
|
||||
client.admin().indices().prepareStats(dest).clear().setDocs(true).request(),
|
||||
ActionListener.<IndicesStatsResponse>wrap(r -> {
|
||||
long docTotal = r.getTotal().docs.getCount();
|
||||
if (docTotal > 0L) {
|
||||
auditor.warning(
|
||||
config.getId(),
|
||||
"Non-empty destination index [" + destinationIndex + "]. " + "Contains [" + docTotal + "] total documents."
|
||||
);
|
||||
}
|
||||
createDestinationIndexListener.onResponse(false);
|
||||
}, e -> {
|
||||
String msg = "Unable to determine destination index stats, error: " + e.getMessage();
|
||||
logger.warn(msg, e);
|
||||
auditor.warning(config.getId(), msg);
|
||||
createDestinationIndexListener.onResponse(false);
|
||||
}),
|
||||
client.admin().indices()::stats
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
static void createDestinationIndex(
|
||||
Client client,
|
||||
TransformConfig config,
|
||||
TransformDestIndexSettings destIndexSettings,
|
||||
ActionListener<Boolean> listener
|
||||
) {
|
||||
CreateIndexRequest request = new CreateIndexRequest(transformConfig.getDestination().getIndex());
|
||||
|
||||
CreateIndexRequest request = new CreateIndexRequest(config.getDestination().getIndex());
|
||||
request.settings(destIndexSettings.getSettings());
|
||||
request.mapping(destIndexSettings.getMappings());
|
||||
for (Alias alias : destIndexSettings.getAliases()) {
|
||||
|
@ -105,7 +184,7 @@ public final class TransformIndex {
|
|||
}
|
||||
|
||||
ClientHelper.executeWithHeadersAsync(
|
||||
transformConfig.getHeaders(),
|
||||
config.getHeaders(),
|
||||
TRANSFORM_ORIGIN,
|
||||
client,
|
||||
CreateIndexAction.INSTANCE,
|
||||
|
@ -115,8 +194,47 @@ public final class TransformIndex {
|
|||
}, e -> {
|
||||
String message = TransformMessages.getMessage(
|
||||
TransformMessages.FAILED_TO_CREATE_DESTINATION_INDEX,
|
||||
transformConfig.getDestination().getIndex(),
|
||||
transformConfig.getId()
|
||||
config.getDestination().getIndex(),
|
||||
config.getId()
|
||||
);
|
||||
logger.error(message);
|
||||
listener.onFailure(new RuntimeException(message, e));
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
static void setUpDestinationAliases(Client client, TransformConfig config, ActionListener<Boolean> listener) {
|
||||
// No aliases configured to be set up, just move on
|
||||
if (config.getDestination().getAliases() == null || config.getDestination().getAliases().isEmpty()) {
|
||||
listener.onResponse(true);
|
||||
return;
|
||||
}
|
||||
|
||||
IndicesAliasesRequest request = new IndicesAliasesRequest();
|
||||
for (DestAlias destAlias : config.getDestination().getAliases()) {
|
||||
if (destAlias.isMoveOnCreation()) {
|
||||
request.addAliasAction(IndicesAliasesRequest.AliasActions.remove().alias(destAlias.getAlias()).index("*"));
|
||||
}
|
||||
}
|
||||
for (DestAlias destAlias : config.getDestination().getAliases()) {
|
||||
request.addAliasAction(
|
||||
IndicesAliasesRequest.AliasActions.add().alias(destAlias.getAlias()).index(config.getDestination().getIndex())
|
||||
);
|
||||
}
|
||||
|
||||
ClientHelper.executeWithHeadersAsync(
|
||||
config.getHeaders(),
|
||||
TRANSFORM_ORIGIN,
|
||||
client,
|
||||
IndicesAliasesAction.INSTANCE,
|
||||
request,
|
||||
ActionListener.wrap(aliasesResponse -> {
|
||||
listener.onResponse(true);
|
||||
}, e -> {
|
||||
String message = TransformMessages.getMessage(
|
||||
TransformMessages.FAILED_TO_SET_UP_DESTINATION_ALIASES,
|
||||
config.getDestination().getIndex(),
|
||||
config.getId()
|
||||
);
|
||||
logger.error(message);
|
||||
listener.onFailure(new RuntimeException(message, e));
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
|
|||
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
|
||||
import org.elasticsearch.xpack.core.security.authz.permission.ResourcePrivileges;
|
||||
import org.elasticsearch.xpack.core.security.user.User;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TimeRetentionPolicyConfig;
|
||||
|
@ -76,9 +77,10 @@ public class TransformPrivilegeCheckerTests extends ESTestCase {
|
|||
.addPrivilege("read", false)
|
||||
.addPrivilege("delete", false)
|
||||
.build();
|
||||
private static final String DEST_ALIAS_NAME = "some-dest-alias";
|
||||
private static final TransformConfig TRANSFORM_CONFIG = new TransformConfig.Builder().setId(TRANSFORM_ID)
|
||||
.setSource(new SourceConfig(SOURCE_INDEX_NAME))
|
||||
.setDest(new DestConfig(DEST_INDEX_NAME, null))
|
||||
.setDest(new DestConfig(DEST_INDEX_NAME, null, null))
|
||||
.build();
|
||||
private ThreadPool threadPool;
|
||||
private SecurityContext securityContext;
|
||||
|
@ -282,6 +284,50 @@ public class TransformPrivilegeCheckerTests extends ESTestCase {
|
|||
);
|
||||
}
|
||||
|
||||
public void testCheckPrivileges_CheckDestIndexPrivileges_DestAliasExists() {
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metadata(
|
||||
Metadata.builder()
|
||||
.put(IndexMetadata.builder(DEST_INDEX_NAME).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||
)
|
||||
.build();
|
||||
TransformConfig config = new TransformConfig.Builder(TRANSFORM_CONFIG).setDest(
|
||||
new DestConfig(DEST_INDEX_NAME, List.of(new DestAlias(DEST_ALIAS_NAME, randomBoolean())), null)
|
||||
).build();
|
||||
TransformPrivilegeChecker.checkPrivileges(
|
||||
OPERATION_NAME,
|
||||
SETTINGS,
|
||||
securityContext,
|
||||
indexNameExpressionResolver,
|
||||
clusterState,
|
||||
client,
|
||||
config,
|
||||
true,
|
||||
ActionListener.wrap(aVoid -> {
|
||||
HasPrivilegesRequest request = client.lastHasPrivilegesRequest;
|
||||
assertThat(request.username(), is(equalTo(USER_NAME)));
|
||||
assertThat(request.applicationPrivileges(), is(emptyArray()));
|
||||
assertThat(request.clusterPrivileges(), is(emptyArray()));
|
||||
assertThat(request.indexPrivileges(), is(arrayWithSize(3)));
|
||||
|
||||
RoleDescriptor.IndicesPrivileges sourceIndicesPrivileges = request.indexPrivileges()[0];
|
||||
assertThat(sourceIndicesPrivileges.getIndices(), is(arrayContaining(SOURCE_INDEX_NAME)));
|
||||
assertThat(sourceIndicesPrivileges.getPrivileges(), is(arrayContaining("read", "view_index_metadata")));
|
||||
assertThat(sourceIndicesPrivileges.allowRestrictedIndices(), is(true));
|
||||
|
||||
RoleDescriptor.IndicesPrivileges destIndicesPrivileges = request.indexPrivileges()[1];
|
||||
assertThat(destIndicesPrivileges.getIndices(), is(arrayContaining(DEST_ALIAS_NAME)));
|
||||
assertThat(destIndicesPrivileges.getPrivileges(), is(arrayContaining("manage")));
|
||||
assertThat(destIndicesPrivileges.allowRestrictedIndices(), is(true));
|
||||
|
||||
RoleDescriptor.IndicesPrivileges destAliasesPrivileges = request.indexPrivileges()[2];
|
||||
assertThat(destAliasesPrivileges.getIndices(), is(arrayContaining(DEST_INDEX_NAME)));
|
||||
assertThat(destAliasesPrivileges.getPrivileges(), is(arrayContaining("read", "index", "manage")));
|
||||
assertThat(destAliasesPrivileges.allowRestrictedIndices(), is(true));
|
||||
}, e -> fail(e.getMessage()))
|
||||
);
|
||||
}
|
||||
|
||||
public void testCheckPrivileges_MissingPrivileges() {
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
|
||||
.metadata(
|
||||
|
@ -290,7 +336,7 @@ public class TransformPrivilegeCheckerTests extends ESTestCase {
|
|||
)
|
||||
.build();
|
||||
TransformConfig config = new TransformConfig.Builder(TRANSFORM_CONFIG).setSource(new SourceConfig(SOURCE_INDEX_NAME_2))
|
||||
.setDest(new DestConfig(DEST_INDEX_NAME_2, null))
|
||||
.setDest(new DestConfig(DEST_INDEX_NAME_2, null, null))
|
||||
.build();
|
||||
client.nextHasPrivilegesResponse = new HasPrivilegesResponse(
|
||||
USER_NAME,
|
||||
|
|
|
@ -18,6 +18,7 @@ import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
|||
import org.elasticsearch.client.internal.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.core.Tuple;
|
||||
|
@ -42,6 +43,8 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState;
|
|||
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
|
||||
import org.elasticsearch.xpack.transform.action.TransformUpdater.UpdateResult;
|
||||
import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
|
||||
import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager;
|
||||
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
|
||||
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
|
||||
|
@ -59,6 +62,7 @@ import static org.hamcrest.Matchers.is;
|
|||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TransformUpdaterTests extends ESTestCase {
|
||||
|
||||
|
@ -68,6 +72,8 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
private final SecurityContext johnSecurityContext = newSecurityContextFor(JOHN);
|
||||
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
|
||||
private Client client;
|
||||
private ClusterService clusterService = mock(ClusterService.class);
|
||||
private TransformAuditor auditor = new MockTransformAuditor(clusterService);
|
||||
private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build();
|
||||
|
||||
private static class MyMockClient extends NoOpClient {
|
||||
|
@ -111,6 +117,8 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
client.close();
|
||||
}
|
||||
client = new MyMockClient(getTestName());
|
||||
clusterService = mock(ClusterService.class);
|
||||
auditor = new MockTransformAuditor(clusterService);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -140,6 +148,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
maxCompatibleConfig,
|
||||
update,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -174,6 +183,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
minCompatibleConfig,
|
||||
update,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -245,6 +255,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
oldConfig,
|
||||
update,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -311,6 +322,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
oldConfigForDryRunUpdate,
|
||||
update,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -357,6 +369,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
oldConfig,
|
||||
TransformConfigUpdate.EMPTY,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -398,6 +411,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
oldConfig,
|
||||
TransformConfigUpdate.EMPTY,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
@ -431,6 +445,7 @@ public class TransformUpdaterTests extends ESTestCase {
|
|||
settings,
|
||||
client,
|
||||
transformConfigManager,
|
||||
auditor,
|
||||
oldConfig,
|
||||
TransformConfigUpdate.EMPTY,
|
||||
null, // seqNoPrimaryTermAndIndex
|
||||
|
|
|
@ -8,6 +8,8 @@ package org.elasticsearch.xpack.transform.persistence;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesAction;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
|
||||
|
@ -21,6 +23,10 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xcontent.XContentParser;
|
||||
import org.elasticsearch.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestAlias;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.DestConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
|
||||
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -32,6 +38,7 @@ import java.time.Clock;
|
|||
import java.time.Instant;
|
||||
import java.time.ZoneId;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -40,6 +47,8 @@ import static java.util.Collections.emptyMap;
|
|||
import static java.util.Collections.singletonMap;
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.extractValue;
|
||||
import static org.hamcrest.Matchers.anEmptyMap;
|
||||
import static org.hamcrest.Matchers.contains;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
|
@ -152,6 +161,60 @@ public class TransformIndexTests extends ESTestCase {
|
|||
assertThat(extractValue("_doc._meta._transform.creation_date_in_millis", map), equalTo(CURRENT_TIME_MILLIS));
|
||||
assertThat(extractValue("_doc._meta.created_by", map), equalTo(CREATED_BY));
|
||||
}
|
||||
assertThat(createIndexRequest.aliases(), is(empty()));
|
||||
}
|
||||
|
||||
public void testSetUpDestinationAliases_NullAliases() {
|
||||
doAnswer(withResponse(null)).when(client).execute(any(), any(), any());
|
||||
|
||||
TransformConfig config = new TransformConfig.Builder().setId("my-id")
|
||||
.setSource(new SourceConfig("my-source"))
|
||||
.setDest(new DestConfig("my-dest", null, null))
|
||||
.build();
|
||||
|
||||
TransformIndex.setUpDestinationAliases(client, config, ActionListener.wrap(Assert::assertTrue, e -> fail(e.getMessage())));
|
||||
|
||||
verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
public void testSetUpDestinationAliases_EmptyAliases() {
|
||||
doAnswer(withResponse(null)).when(client).execute(any(), any(), any());
|
||||
|
||||
TransformConfig config = new TransformConfig.Builder().setId("my-id")
|
||||
.setSource(new SourceConfig("my-source"))
|
||||
.setDest(new DestConfig("my-dest", List.of(), null))
|
||||
.build();
|
||||
|
||||
TransformIndex.setUpDestinationAliases(client, config, ActionListener.wrap(Assert::assertTrue, e -> fail(e.getMessage())));
|
||||
|
||||
verifyNoMoreInteractions(client);
|
||||
}
|
||||
|
||||
public void testSetUpDestinationAliases() {
|
||||
doAnswer(withResponse(null)).when(client).execute(any(), any(), any());
|
||||
|
||||
String destIndex = "my-dest";
|
||||
TransformConfig config = new TransformConfig.Builder().setId("my-id")
|
||||
.setSource(new SourceConfig("my-source"))
|
||||
.setDest(new DestConfig(destIndex, List.of(new DestAlias(".all", false), new DestAlias(".latest", true)), null))
|
||||
.build();
|
||||
|
||||
TransformIndex.setUpDestinationAliases(client, config, ActionListener.wrap(Assert::assertTrue, e -> fail(e.getMessage())));
|
||||
|
||||
ArgumentCaptor<IndicesAliasesRequest> indicesAliasesRequestCaptor = ArgumentCaptor.forClass(IndicesAliasesRequest.class);
|
||||
verify(client).execute(eq(IndicesAliasesAction.INSTANCE), indicesAliasesRequestCaptor.capture(), any());
|
||||
verify(client, atLeastOnce()).threadPool();
|
||||
verifyNoMoreInteractions(client);
|
||||
|
||||
IndicesAliasesRequest indicesAliasesRequest = indicesAliasesRequestCaptor.getValue();
|
||||
assertThat(
|
||||
indicesAliasesRequest.getAliasActions(),
|
||||
contains(
|
||||
IndicesAliasesRequest.AliasActions.remove().alias(".latest").index("*"),
|
||||
IndicesAliasesRequest.AliasActions.add().alias(".all").index(destIndex),
|
||||
IndicesAliasesRequest.AliasActions.add().alias(".latest").index(destIndex)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public void testCreateMappingsFromStringMap() {
|
||||
|
|
|
@ -139,6 +139,33 @@
|
|||
"type": "string",
|
||||
"pattern": "^.*$"
|
||||
},
|
||||
"aliases": {
|
||||
"$id": "#root/dest/aliases",
|
||||
"title": "Aliases",
|
||||
"type": "array",
|
||||
"items": [
|
||||
{
|
||||
"type": "object",
|
||||
"description": "describes a single alias",
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"alias"
|
||||
],
|
||||
"properties": {
|
||||
"alias": {
|
||||
"type": "string",
|
||||
"description": "single alias name"
|
||||
},
|
||||
"move_on_creation": {
|
||||
"type": "boolean",
|
||||
"description": "remove this alias from all the indices it points to and assign it to the dest index only",
|
||||
"default": false
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"pattern": "^.*$"
|
||||
},
|
||||
"pipeline": {
|
||||
"$id": "#root/dest/pipeline",
|
||||
"title": "Pipeline",
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue