[7.x][Transform] add transform upgrade endpoint (#77566) (#79097)

* [Transform] add transform upgrade endpoint (#77566)

Add an _upgrade endpoint to bulk upgrade transforms. _upgrade rewrites all transforms and its
artifacts into the latest format to the latest storage(index). If all transforms are upgraded old
indices and outdated documents get deleted. Using the dry_run option it is possible to check if
upgrades are necessary without applying changes.

* fix merge conflicts

* 7.x requires a different license check
This commit is contained in:
Hendrik Muhs 2021-10-14 10:39:39 +02:00 committed by GitHub
parent 5416582f57
commit 7673778034
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 2358 additions and 295 deletions

View file

@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.transform;
import org.elasticsearch.client.Validatable;
import java.util.Objects;
public class UpgradeTransformsRequest implements Validatable {
private Boolean dryRun;
public UpgradeTransformsRequest() {}
public Boolean isDryRun() {
return dryRun;
}
/**
* Whether to only check for an upgrade without taking action
*
* @param dryRun {@code true} will only check for upgrades
*/
public void setDryRun(boolean dryRun) {
this.dryRun = dryRun;
}
@Override
public int hashCode() {
return Objects.hash(dryRun);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
UpgradeTransformsRequest other = (UpgradeTransformsRequest) obj;
return Objects.equals(dryRun, other.dryRun);
}
}

View file

@ -0,0 +1,87 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.transform;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import java.util.Objects;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
public class UpgradeTransformsResponse {
public static final ParseField NO_ACTION = new ParseField("no_action");
public static final ParseField UPDATED = new ParseField("updated");
public static final ParseField NEEDS_UPDATE = new ParseField("needs_update");
private static final ConstructingObjectParser<UpgradeTransformsResponse, Void> PARSER = new ConstructingObjectParser<>(
"upgrade_transform",
true,
args -> {
long updated = args[0] == null ? 0L : (Long) args[0];
long noAction = args[1] == null ? 0L : (Long) args[1];
long needsUpdate = args[2] == null ? 0L : (Long) args[2];
return new UpgradeTransformsResponse(updated, noAction, needsUpdate);
}
);
static {
PARSER.declareLong(optionalConstructorArg(), UPDATED);
PARSER.declareLong(optionalConstructorArg(), NO_ACTION);
PARSER.declareLong(optionalConstructorArg(), NEEDS_UPDATE);
}
public static UpgradeTransformsResponse fromXContent(final XContentParser parser) {
return UpgradeTransformsResponse.PARSER.apply(parser, null);
}
private final long updated;
private final long noAction;
private final long needsUpdate;
public UpgradeTransformsResponse(long updated, long noAction, long needsUpdate) {
this.updated = updated;
this.noAction = noAction;
this.needsUpdate = needsUpdate;
}
@Override
public int hashCode() {
return Objects.hash(updated, noAction, needsUpdate);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
final UpgradeTransformsResponse that = (UpgradeTransformsResponse) other;
return this.updated == that.updated && this.noAction == that.noAction && this.needsUpdate == that.needsUpdate;
}
public long getUpdated() {
return updated;
}
public long getNoAction() {
return noAction;
}
public long getNeedsUpdate() {
return needsUpdate;
}
}

View file

@ -0,0 +1,61 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.transform;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.test.AbstractXContentTestCase.xContentTester;
public class UpgradeTransformsResponseTests extends ESTestCase {
public void testXContentParser() throws IOException {
xContentTester(
this::createParser,
UpgradeTransformsResponseTests::createTestInstance,
UpgradeTransformsResponseTests::toXContent,
UpgradeTransformsResponse::fromXContent
).assertToXContentEquivalence(false).supportsUnknownFields(false).test();
}
private static UpgradeTransformsResponse createTestInstance() {
return new UpgradeTransformsResponse(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}
private static void toXContent(UpgradeTransformsResponse response, XContentBuilder builder) throws IOException {
builder.startObject();
if (response.getUpdated() != 0) {
builder.field("updated", response.getUpdated());
}
if (response.getNoAction() != 0) {
builder.field("no_action", response.getNoAction());
}
if (response.getNeedsUpdate() != 0) {
builder.field("needs_update", response.getNeedsUpdate());
}
builder.endObject();
}
@Override
protected NamedXContentRegistry xContentRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
List<NamedXContentRegistry.Entry> namedXContents = searchModule.getNamedXContents();
namedXContents.addAll(new TransformNamedXContentProvider().getNamedXContentParsers());
return new NamedXContentRegistry(namedXContents);
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.client.transform.hlrc;
import org.elasticsearch.client.AbstractResponseTestCase;
import org.elasticsearch.client.transform.UpgradeTransformsResponse;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response;
import java.io.IOException;
public class UpgradeTransformsResponseTests extends AbstractResponseTestCase<
Response,
org.elasticsearch.client.transform.UpgradeTransformsResponse> {
public static Response randomUpgradeResponse() {
return new Response(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}
@Override
protected Response createServerTestInstance(XContentType xContentType) {
return randomUpgradeResponse();
}
@Override
protected UpgradeTransformsResponse doParseToClientInstance(XContentParser parser) throws IOException {
return org.elasticsearch.client.transform.UpgradeTransformsResponse.fromXContent(parser);
}
@Override
protected void assertInstances(Response serverTestInstance, UpgradeTransformsResponse clientInstance) {
assertEquals(serverTestInstance.getNeedsUpdate(), clientInstance.getNeedsUpdate());
assertEquals(serverTestInstance.getNoAction(), clientInstance.getNoAction());
assertEquals(serverTestInstance.getUpdated(), clientInstance.getUpdated());
}
}

View file

@ -0,0 +1,57 @@
[role="xpack"]
[testenv="basic"]
[[upgrade-transforms]]
= Upgrade {transform} API
[subs="attributes"]
++++
<titleabbrev>Upgrade {transform}</titleabbrev>
++++
Upgrades all {transform}s.
[[upgrade-transforms-request]]
== {api-request-title}
`POST _transform/_upgrade`
[[upgrade-transforms-prereqs]]
== {api-prereq-title}
Requires the following privileges:
* cluster: `manage_transform` (the `transform_admin` built-in role grants this
privilege)
* source indices: `read`, `view_index_metadata`
* destination index: `read`, `index`.
[[upgrade-transforms-desc]]
== {api-description-title}
This API upgrades all existing {transform}s.
[[upgrade-transforms-query-parms]]
== {api-query-parms-title}
`dry_run`::
(Optional, Boolean) When `true`, only checks for updates but does not execute them.
[[upgrade-transforms-example]]
== {api-examples-title}
[source,console]
--------------------------------------------------
POST _transform/_upgrade
--------------------------------------------------
// TEST[setup:simple_kibana_continuous_pivot]
When all {transform}s are upgraded, you receive a summary:
[source,console-result]
----
{
"no_action": 1
}
----
// TESTRESPONSE[s/"no_action" : 1/"no_action" : $body.no_action/]

View file

@ -0,0 +1,31 @@
{
"transform.upgrade_transforms":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/upgrade-transforms.html",
"description":"Upgrades all transforms."
},
"stability":"stable",
"visibility":"public",
"headers":{
"accept":["application/json"],
"content_type":["application/json"]
},
"url":{
"paths":[
{
"path":"/_transform/_upgrade",
"methods":[
"POST"
]
}
]
},
"params":{
"dry_run":{
"type":"boolean",
"required":false,
"description":"Whether to only check for updates but don't execute"
}
}
}
}

View file

@ -44,6 +44,7 @@ public final class TransformField {
public static final ParseField DELAY = new ParseField("delay");
// TODO: Rename to "defer_data_validation" or similar to emphasize that not all validation is deferred
public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation");
public static final ParseField DRY_RUN = new ParseField("dry_run");
public static final ParseField RETENTION_POLICY = new ParseField("retention_policy");
public static final ParseField MAX_AGE = new ParseField("max_age");

View file

@ -55,6 +55,9 @@ public class TransformMessages {
public static final String TRANSFORM_FAILED_TO_CREATE_COMPOSITE_AGGREGATION =
"Failed to create composite aggregation from {0} function";
public static final String TRANSFORM_CONFIGURATION_INVALID = "Transform configuration [{0}] has invalid elements: [{1}]";
public static final String TRANSFORM_CONFIGURATION_DEPRECATED = "Transform configuration is at version [{0}]. Use [{1}] or ["
+ TransformField.REST_BASE_PATH_TRANSFORMS
+ "_upgrade] to update.";
public static final String UNABLE_TO_GATHER_FIELD_MAPPINGS = "Failed to gather field mappings for index [{0}]";
public static final String TRANSFORM_UPDATE_CANNOT_CHANGE_SYNC_METHOD =
"Cannot change the current sync configuration of transform [{0}] from [{1}] to [{2}]";

View file

@ -0,0 +1,162 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
public class UpgradeTransformsAction extends ActionType<UpgradeTransformsAction.Response> {
public static final UpgradeTransformsAction INSTANCE = new UpgradeTransformsAction();
public static final String NAME = "cluster:admin/transform/upgrade";
private UpgradeTransformsAction() {
super(NAME, UpgradeTransformsAction.Response::new);
}
public static class Request extends MasterNodeRequest<Request> {
private final boolean dryRun;
public Request(StreamInput in) throws IOException {
super(in);
this.dryRun = in.readBoolean();
}
public Request(boolean dryRun) {
super();
this.dryRun = dryRun;
}
@Override
public ActionRequestValidationException validate() {
return null;
}
public boolean isDryRun() {
return dryRun;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(dryRun);
}
@Override
public int hashCode() {
return Objects.hash(dryRun);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
Request other = (Request) obj;
return this.dryRun == other.dryRun;
}
}
public static class Response extends ActionResponse implements Writeable, ToXContentObject {
private final long updated;
private final long noAction;
private final long needsUpdate;
public Response(StreamInput in) throws IOException {
updated = in.readVLong();
noAction = in.readVLong();
needsUpdate = in.readVLong();
}
public Response(long updated, long noAction, long needsUpdate) {
if (updated < 0 || noAction < 0 || needsUpdate < 0) {
throw new IllegalArgumentException("response counters must be > 0");
}
this.updated = updated;
this.noAction = noAction;
this.needsUpdate = needsUpdate;
}
public long getUpdated() {
return updated;
}
public long getNoAction() {
return noAction;
}
public long getNeedsUpdate() {
return needsUpdate;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(updated);
out.writeVLong(noAction);
out.writeVLong(needsUpdate);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
if (updated != 0L) {
builder.field("updated", updated);
}
if (noAction != 0L) {
builder.field("no_action", noAction);
}
if (needsUpdate != 0L) {
builder.field("needs_update", needsUpdate);
}
builder.endObject();
return builder;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Response other = (Response) obj;
return this.updated == other.updated
&& this.noAction == other.noAction
&& this.needsUpdate == other.needsUpdate;
}
@Override
public int hashCode() {
return Objects.hash(updated, noAction, needsUpdate);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}
}

View file

@ -52,6 +52,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
*/
public class TransformConfig extends AbstractDiffable<TransformConfig> implements Writeable, ToXContentObject {
public static final Version CONFIG_VERSION_LAST_CHANGED = Version.V_7_15_0;
public static final String NAME = "data_frame_transform_config";
public static final ParseField HEADERS = new ParseField("headers");
/** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */
@ -562,7 +563,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
// quick check if a rewrite is required, if none found just return the original
// a failing quick check, does not mean a rewrite is necessary
if (transformConfig.getVersion() != null
&& transformConfig.getVersion().onOrAfter(Version.V_7_15_0)
&& transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_CHANGED)
&& (transformConfig.getPivotConfig() == null || transformConfig.getPivotConfig().getMaxPageSearchSize() == null)) {
return transformConfig;
}
@ -611,7 +612,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
}
// 3. set align_checkpoints to false for transforms < 7.15 to keep BWC
if (builder.getVersion() != null && builder.getVersion().before(Version.V_7_15_0)) {
if (builder.getVersion() != null && builder.getVersion().before(CONFIG_VERSION_LAST_CHANGED)) {
builder.setSettings(
new SettingsConfig(
builder.getSettings().getMaxPageSearchSize(),

View file

@ -9,13 +9,13 @@ package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.TransformMessages;
@ -34,6 +34,8 @@ public class TransformConfigUpdate implements Writeable {
public static final String NAME = "data_frame_transform_config_update";
public static TransformConfigUpdate EMPTY = new TransformConfigUpdate(null, null, null, null, null, null, null);
private static final ConstructingObjectParser<TransformConfigUpdate, String> PARSER = new ConstructingObjectParser<>(
NAME,
false,
@ -207,7 +209,11 @@ public class TransformConfigUpdate implements Writeable {
return PARSER.apply(parser, null);
}
public boolean isNoop(TransformConfig config) {
public boolean isEmpty() {
return this.equals(EMPTY);
}
boolean isNoop(TransformConfig config) {
return isNullOrEqual(source, config.getSource())
&& isNullOrEqual(dest, config.getDestination())
&& isNullOrEqual(frequency, config.getFrequency())

View file

@ -34,10 +34,11 @@ public final class TransformInternalIndexConstants {
public static final Version INDEX_VERSION_LAST_CHANGED = Version.V_7_13_0;
public static final String INDEX_VERSION = "007";
public static final String INDEX_PATTERN = TRANSFORM_PREFIX + "internal-";
public static final String INDEX_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "internal-";
public static final String LATEST_INDEX_VERSIONED_NAME = INDEX_PATTERN + INDEX_VERSION;
public static final String LATEST_INDEX_NAME = LATEST_INDEX_VERSIONED_NAME;
public static final String INDEX_NAME_PATTERN = INDEX_PATTERN + "*";
public static final String INDEX_NAME_PATTERN_DEPRECATED = TRANSFORM_PREFIX_DEPRECATED + "internal-*";
public static final String INDEX_NAME_PATTERN_DEPRECATED = INDEX_PATTERN_DEPRECATED + "*";
// audit index
// gh #49730: upped version of audit index to 000002

View file

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Request;
public class UpgradeTransformsActionRequestTests extends AbstractWireSerializingTestCase<Request> {
@Override
protected Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected Request createTestInstance() {
return new Request(randomBoolean());
}
}

View file

@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.action;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response;
public class UpgradeTransformsActionResponseTests extends AbstractWireSerializingTransformTestCase<Response> {
public static Response randomUpgradeResponse() {
return new Response(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}
@Override
protected Reader<Response> instanceReader() {
return Response::new;
}
@Override
protected Response createTestInstance() {
return randomUpgradeResponse();
}
}

View file

@ -11,8 +11,8 @@ import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.AbstractWireSerializingTransformTestCase;
@ -56,7 +56,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
public void testIsNoop() {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformConfig config = randomTransformConfig();
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null);
TransformConfigUpdate update = TransformConfigUpdate.EMPTY;
assertTrue("null update is not noop", update.isNoop(config));
update = new TransformConfigUpdate(
config.getSource(),

View file

@ -93,6 +93,7 @@ public class Constants {
"cluster:admin/transform/start",
"cluster:admin/transform/stop",
"cluster:admin/transform/update",
"cluster:admin/transform/upgrade",
"cluster:admin/transform/validate",
// "cluster:admin/voting_config/add_exclusions",
// "cluster:admin/voting_config/clear_exclusions",

View file

@ -0,0 +1,50 @@
setup:
- do:
indices.create:
index: airline-data
body:
mappings:
properties:
time:
type: date
airline:
type: keyword
responsetime:
type: float
event_rate:
type: integer
- do:
transform.put_transform:
transform_id: "upgrading-airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data",
"frequency": "60s",
"sync": {
"time": {
"field": "time",
"delay": "90m"
}
}
}
---
"Test upgrade transform":
- do:
transform.upgrade_transforms:
dry_run: false
# upgrade does not do anything on a fresh install, so we can only test that nothing breaks
- match: { no_action: 1 }
---
"Test upgrade transform dry run":
- do:
transform.upgrade_transforms:
dry_run: true
# upgrade does not do anything on a fresh install, so we can only test that nothing breaks
- match: { no_action: 1 }

View file

@ -15,10 +15,11 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.transform.TransformMessages;
@ -34,8 +35,11 @@ import org.junit.Before;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
@ -235,7 +239,9 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
3L,
tuple(
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
Arrays.asList(transformConfig1, transformConfig2, transformConfig3))),
Arrays.asList(transformConfig1, transformConfig2, transformConfig3)
)
),
null,
null
);
@ -247,7 +253,9 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
3L,
tuple(
Arrays.asList("transform1_expand", "transform2_expand", "transform3_expand"),
Arrays.asList(transformConfig1, transformConfig2, transformConfig3))),
Arrays.asList(transformConfig1, transformConfig2, transformConfig3)
)
),
null,
null
);
@ -293,6 +301,132 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
}
);
// add a duplicate in an old index
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001";
String docId = TransformConfig.documentId(transformConfig2.getId());
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformConfig2.getId());
client().admin()
.indices()
.create(
new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN)
)
.actionGet();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex).source(source)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(request).actionGet();
}
// check that transformConfig2 gets returned, not the one from the old index or both
assertAsync(
listener -> transformConfigManager.expandTransformIds(
"transform1_expand,transform2_expand",
PageParams.defaultParams(),
true,
listener
),
tuple(2L, tuple(Arrays.asList("transform1_expand", "transform2_expand"), Arrays.asList(transformConfig1, transformConfig2))),
null,
null
);
}
public void testGetAllTransformIdsAndGetAllOutdatedTransformIds() throws Exception {
long numberOfTransformsToGenerate = 100L;
Set<String> transformIds = new HashSet<>();
for (long i = 0; i < numberOfTransformsToGenerate; ++i) {
String id = "transform_" + i;
transformIds.add(id);
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(id);
assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfig, listener), true, null, null);
}
assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null);
// test recursive retrieval
assertAsync(
listener -> transformConfigManager.expandAllTransformIds(false, 10, listener),
tuple(Long.valueOf(numberOfTransformsToGenerate), transformIds),
null,
null
);
assertAsync(
listener -> transformConfigManager.getAllOutdatedTransformIds(listener),
tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.<String>emptySet()),
null,
null
);
assertAsync(
listener -> transformConfigManager.expandAllTransformIds(true, 10, listener),
tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.<String>emptySet()),
null,
null
);
// add a duplicate in an old index
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001";
String transformId = "transform_42";
String docId = TransformConfig.documentId(transformId);
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig(transformId);
client().admin()
.indices()
.create(
new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN)
)
.actionGet();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex).source(source)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(request).actionGet();
}
assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null);
assertAsync(
listener -> transformConfigManager.getAllOutdatedTransformIds(listener),
tuple(Long.valueOf(numberOfTransformsToGenerate), Collections.<String>emptySet()),
null,
null
);
// add another old one, but not with an existing id
transformId = "transform_oldindex";
docId = TransformConfig.documentId(transformId);
transformConfig = TransformConfigTests.randomTransformConfig(transformId);
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex).source(source)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(request).actionGet();
}
transformIds.add(transformId);
assertAsync(listener -> transformConfigManager.getAllTransformIds(listener), transformIds, null, null);
assertAsync(
listener -> transformConfigManager.getAllOutdatedTransformIds(listener),
tuple(Long.valueOf(numberOfTransformsToGenerate + 1), Collections.singleton(transformId)),
null,
null
);
assertAsync(
listener -> transformConfigManager.expandAllTransformIds(true, 10, listener),
tuple(Long.valueOf(numberOfTransformsToGenerate + 1), Collections.singleton(transformId)),
null,
null
);
}
public void testStoredDoc() throws InterruptedException {
@ -303,7 +437,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
assertAsync(listener -> transformConfigManager.putOrUpdateTransformStoredDoc(storedDocs, null, listener), firstIndex, null, null);
assertAsync(
listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener),
tuple(storedDocs, firstIndex),
null,
null
@ -318,7 +452,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
null
);
assertAsync(
listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener),
tuple(updated, secondIndex),
null,
null
@ -357,14 +491,16 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
}
public void testDeleteOldTransformConfigurations() throws Exception {
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "1";
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001";
String transformId = "transform_test_delete_old_configurations";
String docId = TransformConfig.documentId(transformId);
TransformConfig transformConfig = TransformConfigTests.randomTransformConfig("transform_test_delete_old_configurations");
client().admin()
.indices()
.create(new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN))
.create(
new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN)
)
.actionGet();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@ -394,14 +530,16 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
}
public void testDeleteOldTransformStoredDocuments() throws Exception {
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "1";
String oldIndex = TransformInternalIndexConstants.INDEX_PATTERN + "001";
String transformId = "transform_test_delete_old_stored_documents";
String docId = TransformStoredDoc.documentId(transformId);
TransformStoredDoc transformStoredDoc = TransformStoredDocTests.randomTransformStoredDoc(transformId);
client().admin()
.indices()
.create(new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN))
.create(
new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN)
)
.actionGet();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@ -515,7 +653,7 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
// test that the other docs are still there
assertAsync(
listener -> transformConfigManager.getTransformStoredDoc(transformId, listener),
listener -> transformConfigManager.getTransformStoredDoc(transformId, false, listener),
tuple(storedDocs, firstIndex),
null,
null
@ -527,6 +665,63 @@ public class TransformConfigManagerTests extends TransformSingleNodeTestCase {
null,
null
);
}
public void testDeleteOldIndices() throws Exception {
String oldIndex = (randomBoolean()
? TransformInternalIndexConstants.INDEX_PATTERN
: TransformInternalIndexConstants.INDEX_PATTERN_DEPRECATED) + "001";
String transformId = "transform_test_delete_old_indices";
String docId = TransformConfig.documentId(transformId);
TransformConfig transformConfigOld = TransformConfigTests.randomTransformConfig(transformId);
TransformConfig transformConfigNew = TransformConfigTests.randomTransformConfig(transformId);
// create config in old index
client().admin()
.indices()
.create(
new CreateIndexRequest(oldIndex).mapping(MapperService.SINGLE_MAPPING_NAME, mappings())
.origin(ClientHelper.TRANSFORM_ORIGIN)
)
.actionGet();
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder source = transformConfigOld.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest request = new IndexRequest(oldIndex).source(source)
.id(docId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client().index(request).actionGet();
}
// create config in new index
assertAsync(listener -> transformConfigManager.putTransformConfiguration(transformConfigNew, listener), true, null, null);
assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(true));
assertThat(
client().get(new GetRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(),
is(true)
);
// the new/latest one should be returned
assertAsync(listener -> transformConfigManager.getTransformConfiguration(transformId, listener), transformConfigNew, null, null);
// delete old indices
assertAsync(listener -> transformConfigManager.deleteOldIndices(listener), true, null, null);
// the config should still be there
assertAsync(listener -> transformConfigManager.getTransformConfiguration(transformId, listener), transformConfigNew, null, null);
// the old index should not exist anymore
expectThrows(
IndexNotFoundException.class,
() -> assertThat(client().get(new GetRequest(oldIndex).id(docId)).actionGet().isExists(), is(false))
);
// but the latest one should
assertThat(
client().get(new GetRequest(TransformInternalIndexConstants.LATEST_INDEX_NAME).id(docId)).actionGet().isExists(),
is(true)
);
}
}

View file

@ -10,9 +10,9 @@ package org.elasticsearch.xpack.transform;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
@ -38,8 +38,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.NamedXContentRegistry.Entry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.indices.AssociatedIndexDescriptor;
@ -58,6 +56,8 @@ import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.NamedXContentRegistry.Entry;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.action.SetResetModeActionRequest;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
@ -73,6 +73,7 @@ import org.elasticsearch.xpack.core.transform.action.SetResetModeAction;
import org.elasticsearch.xpack.core.transform.action.StartTransformAction;
import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.action.compat.DeleteTransformActionDeprecated;
import org.elasticsearch.xpack.core.transform.action.compat.GetTransformActionDeprecated;
@ -92,6 +93,7 @@ import org.elasticsearch.xpack.transform.action.TransportSetTransformResetModeAc
import org.elasticsearch.xpack.transform.action.TransportStartTransformAction;
import org.elasticsearch.xpack.transform.action.TransportStopTransformAction;
import org.elasticsearch.xpack.transform.action.TransportUpdateTransformAction;
import org.elasticsearch.xpack.transform.action.TransportUpgradeTransformsAction;
import org.elasticsearch.xpack.transform.action.TransportValidateTransformAction;
import org.elasticsearch.xpack.transform.action.compat.TransportDeleteTransformActionDeprecated;
import org.elasticsearch.xpack.transform.action.compat.TransportGetTransformActionDeprecated;
@ -115,6 +117,7 @@ import org.elasticsearch.xpack.transform.rest.action.RestPutTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestStartTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestStopTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestUpdateTransformAction;
import org.elasticsearch.xpack.transform.rest.action.RestUpgradeTransformsAction;
import org.elasticsearch.xpack.transform.rest.action.compat.RestDeleteTransformActionDeprecated;
import org.elasticsearch.xpack.transform.rest.action.compat.RestGetTransformActionDeprecated;
import org.elasticsearch.xpack.transform.rest.action.compat.RestGetTransformStatsActionDeprecated;
@ -183,11 +186,9 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
/**
* Setting whether transform (the coordinator task) can run on this node.
*/
private static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting(
"node.transform",
settings ->
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
Boolean.toString(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)),
private static final Setting<Boolean> TRANSFORM_ENABLED_NODE = Setting.boolSetting("node.transform", settings ->
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
Boolean.toString(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings)),
Property.Deprecated,
Property.NodeScope
);
@ -202,8 +203,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
@Override
public boolean isEnabledByDefault(final Settings settings) {
return super.isEnabledByDefault(settings) &&
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings));
// Don't use DiscoveryNode#isDataNode(Settings) here, as it is called before all plugins are initialized
(DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) || DataTier.isExplicitDataTier(settings));
}
};
@ -250,6 +251,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
new RestPreviewTransformAction(),
new RestUpdateTransformAction(),
new RestCatTransformAction(),
new RestUpgradeTransformsAction(),
// deprecated endpoints, to be removed for 8.0.0
new RestPutTransformActionDeprecated(),
@ -276,6 +278,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
new ActionHandler<>(UpdateTransformAction.INSTANCE, TransportUpdateTransformAction.class),
new ActionHandler<>(SetResetModeAction.INSTANCE, TransportSetTransformResetModeAction.class),
new ActionHandler<>(ValidateTransformAction.INSTANCE, TransportValidateTransformAction.class),
new ActionHandler<>(UpgradeTransformsAction.INSTANCE, TransportUpgradeTransformsAction.class),
// deprecated actions, to be removed for 8.0.0
new ActionHandler<>(PutTransformActionDeprecated.INSTANCE, TransportPutTransformActionDeprecated.class),
@ -295,13 +298,7 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
return emptyList();
}
FixedExecutorBuilder indexing = new FixedExecutorBuilder(
settings,
TASK_THREAD_POOL_NAME,
4,
4,
"transform.task_thread_pool"
);
FixedExecutorBuilder indexing = new FixedExecutorBuilder(settings, TASK_THREAD_POOL_NAME, 4, 4, "transform.task_thread_pool");
return Collections.singletonList(indexing);
}
@ -326,8 +323,13 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
TransformConfigManager configManager = new IndexBasedTransformConfigManager(client, xContentRegistry);
TransformAuditor auditor = new TransformAuditor(client, clusterService.getNodeName(), clusterService);
TransformCheckpointService checkpointService =
new TransformCheckpointService(Clock.systemUTC(), settings, clusterService, configManager, auditor);
TransformCheckpointService checkpointService = new TransformCheckpointService(
Clock.systemUTC(),
settings,
clusterService,
configManager,
auditor
);
SchedulerEngine scheduler = new SchedulerEngine(settings, Clock.systemUTC());
transformServices.set(new TransformServices(configManager, checkpointService, auditor, scheduler));
@ -341,8 +343,10 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
try {
// Template upgraders are only ever called on the master nodes, so we can use the current node version as the compatibility
// version here because we can be sure that this node, if elected master, will be compatible with itself.
templates.put(TransformInternalIndexConstants.AUDIT_INDEX,
TransformInternalIndex.getAuditIndexTemplateMetadata(Version.CURRENT));
templates.put(
TransformInternalIndexConstants.AUDIT_INDEX,
TransformInternalIndex.getAuditIndexTemplateMetadata(Version.CURRENT)
);
} catch (IOException e) {
logger.warn("Error creating transform audit index", e);
}
@ -426,7 +430,8 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
}
}
@Override public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
@Override
public Collection<AssociatedIndexDescriptor> getAssociatedIndexDescriptors() {
return Collections.singletonList(new AssociatedIndexDescriptor(AUDIT_INDEX_PATTERN, "Audit index"));
}
@ -438,9 +443,10 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
) {
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> unsetResetModeListener = ActionListener.wrap(
success -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(
resetSuccess -> finalListener.onResponse(success),
resetFailure -> {
success -> client.execute(
SetResetModeAction.INSTANCE,
SetResetModeActionRequest.disabled(true),
ActionListener.wrap(resetSuccess -> finalListener.onResponse(success), resetFailure -> {
logger.error("failed to disable reset mode after otherwise successful transform reset", resetFailure);
finalListener.onFailure(
new ElasticsearchStatusException(
@ -451,13 +457,11 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
);
})
),
failure -> client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(false), ActionListener.wrap(
resetSuccess -> finalListener.onFailure(failure),
resetFailure -> {
logger.error(
TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset"),
resetFailure
);
failure -> client.execute(
SetResetModeAction.INSTANCE,
SetResetModeActionRequest.disabled(false),
ActionListener.wrap(resetSuccess -> finalListener.onFailure(failure), resetFailure -> {
logger.error(TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset"), resetFailure);
Exception ex = new ElasticsearchException(
TransformMessages.getMessage(FAILED_TO_UNSET_RESET_MODE, "a failed feature reset")
);
@ -468,13 +472,10 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
)
);
ActionListener<ListTasksResponse> afterWaitingForTasks = ActionListener.wrap(
listTasksResponse -> {
listTasksResponse.rethrowFailures("Waiting for transform indexing tasks");
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
},
unsetResetModeListener::onFailure
);
ActionListener<ListTasksResponse> afterWaitingForTasks = ActionListener.wrap(listTasksResponse -> {
listTasksResponse.rethrowFailures("Waiting for transform indexing tasks");
SystemIndexPlugin.super.cleanUpFeature(clusterService, client, unsetResetModeListener);
}, unsetResetModeListener::onFailure);
ActionListener<StopTransformAction.Response> afterStoppingTransforms = ActionListener.wrap(stopTransformsResponse -> {
if (stopTransformsResponse.isAcknowledged()
@ -485,20 +486,17 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
.prepareListTasks()
.setActions(TransformField.TASK_NAME)
.setWaitForCompletion(true)
.execute(ActionListener.wrap(
listTransformTasks -> {
listTransformTasks.rethrowFailures("Waiting for transform tasks");
client.admin()
.cluster()
.prepareListTasks()
.setActions("indices:data/write/bulk")
.setDetailed(true)
.setWaitForCompletion(true)
.setDescriptions("*" + TRANSFORM_PREFIX + "*", "*" + TRANSFORM_PREFIX_DEPRECATED + "*")
.execute(afterWaitingForTasks);
},
unsetResetModeListener::onFailure
));
.execute(ActionListener.wrap(listTransformTasks -> {
listTransformTasks.rethrowFailures("Waiting for transform tasks");
client.admin()
.cluster()
.prepareListTasks()
.setActions("indices:data/write/bulk")
.setDetailed(true)
.setWaitForCompletion(true)
.setDescriptions("*" + TRANSFORM_PREFIX + "*", "*" + TRANSFORM_PREFIX_DEPRECATED + "*")
.execute(afterWaitingForTasks);
}, unsetResetModeListener::onFailure));
} else {
String errMsg = "Failed to reset Transform: "
+ (stopTransformsResponse.isAcknowledged() ? "" : "not acknowledged ")
@ -508,25 +506,23 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
+ (stopTransformsResponse.getTaskFailures().isEmpty()
? ""
: "task failures: " + stopTransformsResponse.getTaskFailures());
unsetResetModeListener.onResponse(ResetFeatureStateResponse.ResetFeatureStateStatus.failure(this.getFeatureName(),
new ElasticsearchException(errMsg)));
unsetResetModeListener.onResponse(
ResetFeatureStateResponse.ResetFeatureStateStatus.failure(this.getFeatureName(), new ElasticsearchException(errMsg))
);
}
}, unsetResetModeListener::onFailure);
ActionListener<AcknowledgedResponse> afterResetModeSet = ActionListener.wrap(
response -> {
StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request(
Metadata.ALL,
true,
true,
null,
true,
false
);
client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms);
},
finalListener::onFailure
);
ActionListener<AcknowledgedResponse> afterResetModeSet = ActionListener.wrap(response -> {
StopTransformAction.Request stopTransformsRequest = new StopTransformAction.Request(
Metadata.ALL,
true,
true,
null,
true,
false
);
client.execute(StopTransformAction.INSTANCE, stopTransformsRequest, afterStoppingTransforms);
}, finalListener::onFailure);
client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet);
}

View file

@ -0,0 +1,337 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import java.time.Clock;
import java.util.Map;
/**
* With {@link TransformUpdater} transforms can be updated or upgraded to the latest version
*
* This implementation is shared between _update and _upgrade
*/
public class TransformUpdater {
private static final Logger logger = LogManager.getLogger(TransformUpdater.class);
public static final class UpdateResult {
// the status of the update
public enum Status {
NONE, // all checks passed, no action taken
UPDATED, // updated
NEEDS_UPDATE // special dry run status
}
// the new config after the update
private final TransformConfig config;
// the action taken for the upgrade
private final Status status;
UpdateResult(final TransformConfig config, final Status status) {
this.config = config;
this.status = status;
}
public Status getStatus() {
return status;
}
public TransformConfig getConfig() {
return config;
}
}
/**
* Update a single transform given a config and update
*
* In addition to applying update to the config, old versions of {@link TransformConfig}, {@link TransformStoredDoc} and
* {@link TransformCheckpoint} are rewritten into the latest format and written back using {@link TransformConfigManager}
*
* @param securityContext the security context
* @param indexNameExpressionResolver index name expression resolver
* @param clusterState the current cluster state
* @param settings settings
* @param client a client
* @param transformConfigManager the transform configuration manager
* @param config the old configuration to update
* @param update the update to apply to the configuration
* @param seqNoPrimaryTermAndIndex sequence id and primary term of the configuration
* @param deferValidation whether to defer some validation checks
* @param dryRun whether to actually write the configuration back or whether to just check for updates
* @param checkAccess whether to run access checks
* @param listener the listener called containing the result of the update
*/
public static void updateTransform(
XPackLicenseState licenseState,
SecurityContext securityContext,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterState clusterState,
Settings settings,
Client client,
TransformConfigManager transformConfigManager,
final TransformConfig config,
final TransformConfigUpdate update,
final SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
final boolean deferValidation,
final boolean dryRun,
final boolean checkAccess,
ActionListener<UpdateResult> listener
) {
// rewrite config into a new format if necessary
TransformConfig rewrittenConfig = TransformConfig.rewriteForUpdate(config);
TransformConfig updatedConfig = update != null ? update.apply(rewrittenConfig) : rewrittenConfig;
// <5> Update checkpoints
ActionListener<Long> updateStateListener = ActionListener.wrap(lastCheckpoint -> {
// config was updated, but the transform has no state or checkpoint
if (lastCheckpoint == null || lastCheckpoint == -1) {
listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED));
return;
}
updateTransformCheckpoint(
config.getId(),
lastCheckpoint,
transformConfigManager,
ActionListener.wrap(
r -> listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.UPDATED)),
listener::onFailure
)
);
}, listener::onFailure);
// <4> Update State document
ActionListener<Void> updateTransformListener = ActionListener.wrap(
r -> updateTransformStateAndGetLastCheckpoint(config.getId(), transformConfigManager, updateStateListener),
listener::onFailure
);
// <3> Update the transform
ActionListener<Map<String, String>> validateTransformListener = ActionListener.wrap(destIndexMappings -> {
// If it is a noop or dry run don't write the doc
// skip when:
// - config is in the latest index
// - rewrite did not change the config
// - update is not making any changes
if (config.getVersion() != null
&& config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED)
&& updatedConfig.equals(config)) {
listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NONE));
return;
}
if (dryRun) {
listener.onResponse(new UpdateResult(updatedConfig, UpdateResult.Status.NEEDS_UPDATE));
return;
}
updateTransformConfiguration(
client,
transformConfigManager,
indexNameExpressionResolver,
updatedConfig,
destIndexMappings,
seqNoPrimaryTermAndIndex,
clusterState,
ActionListener.wrap(r -> updateTransformListener.onResponse(null), listener::onFailure)
);
}, listener::onFailure);
// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
aVoid -> { validateTransform(updatedConfig, client, deferValidation, validateTransformListener); },
listener::onFailure
);
// <1> Early check to verify that the user can create the destination index and can read from the source
if (checkAccess && licenseState.isSecurityEnabled() && deferValidation == false) {
TransformPrivilegeChecker.checkPrivileges(
"update",
securityContext,
indexNameExpressionResolver,
clusterState,
client,
updatedConfig,
true,
checkPrivilegesListener
);
} else { // No security enabled, just move on
checkPrivilegesListener.onResponse(null);
}
}
private static void validateTransform(
TransformConfig config,
Client client,
boolean deferValidation,
ActionListener<Map<String, String>> listener
) {
client.execute(
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(config, deferValidation),
ActionListener.wrap(response -> listener.onResponse(response.getDestIndexMappings()), listener::onFailure)
);
}
private static void updateTransformStateAndGetLastCheckpoint(
String transformId,
TransformConfigManager transformConfigManager,
ActionListener<Long> listener
) {
transformConfigManager.getTransformStoredDoc(transformId, true, ActionListener.wrap(currentState -> {
if (currentState == null) {
// no state found
listener.onResponse(-1L);
return;
}
long lastCheckpoint = currentState.v1().getTransformState().getCheckpoint();
if (currentState.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
listener.onResponse(lastCheckpoint);
return;
}
transformConfigManager.putOrUpdateTransformStoredDoc(
currentState.v1(),
currentState.v2(),
ActionListener.wrap(r -> { listener.onResponse(lastCheckpoint); }, e -> {
if (org.elasticsearch.ExceptionsHelper.unwrapCause(e) instanceof VersionConflictEngineException) {
// if a version conflict occurs a new state has been written between us reading and writing.
// this is a benign case, as it means the transform is running and the latest state has been written by it
logger.trace("[{}] could not update transform state during update due to running transform", transformId);
listener.onResponse(lastCheckpoint);
} else {
logger.warn("[{}] failed to persist transform state during update.", transformId);
listener.onFailure(e);
}
})
);
}, listener::onFailure));
}
private static void updateTransformCheckpoint(
String transformId,
long lastCheckpoint,
TransformConfigManager transformConfigManager,
ActionListener<Boolean> listener
) {
transformConfigManager.getTransformCheckpointForUpdate(transformId, lastCheckpoint, ActionListener.wrap(checkpointAndVersion -> {
if (checkpointAndVersion == null
|| checkpointAndVersion.v2().getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
listener.onResponse(true);
return;
}
transformConfigManager.putTransformCheckpoint(checkpointAndVersion.v1(), listener);
}, listener::onFailure));
}
private static void updateTransformConfiguration(
Client client,
TransformConfigManager transformConfigManager,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformConfig config,
Map<String, String> mappings,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
ActionListener<Void> listener
) {
// <3> Return to the listener
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> {
transformConfigManager.deleteOldTransformConfigurations(config.getId(), ActionListener.wrap(r -> {
logger.trace("[{}] successfully deleted old transform configurations", config.getId());
listener.onResponse(null);
}, e -> {
logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", config.getId()), e);
listener.onResponse(null);
}));
},
// If we failed to INDEX AND we created the destination index, the destination index will still be around
// This is a similar behavior to _start
listener::onFailure
);
// <2> Update our transform
ActionListener<Boolean> createDestinationListener = ActionListener.wrap(
createDestResponse -> transformConfigManager.updateTransformConfiguration(
config,
seqNoPrimaryTermAndIndex,
putTransformConfigurationListener
),
listener::onFailure
);
// <1> Create destination index if necessary
String[] dest = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex()
);
String[] src = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.lenientExpandOpen(),
true,
config.getSource().getIndex()
);
// If we are running, we should verify that the destination index exists and create it if it does not
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, config.getId()) != null && dest.length == 0
// Verify we have source indices. The user could defer_validations and if the task is already running
// we allow source indices to disappear. If the source and destination indices do not exist, don't do anything
// the transform will just have to dynamically create the destination index without special mapping.
&& src.length > 0) {
createDestinationIndex(client, config, mappings, createDestinationListener);
} else {
createDestinationListener.onResponse(null);
}
}
private static void createDestinationIndex(
Client client,
TransformConfig config,
Map<String, String> mappings,
ActionListener<Boolean> listener
) {
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
mappings,
config.getId(),
Clock.systemUTC()
);
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
}
private TransformUpdater() {}
}

View file

@ -16,7 +16,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
@ -24,7 +23,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.ingest.IngestService;
@ -40,23 +38,17 @@ import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Request;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction.Response;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.TransformDestIndexSettings;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.Function;
import org.elasticsearch.xpack.transform.transforms.FunctionFactory;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import java.time.Clock;
import java.util.List;
import java.util.Map;
@ -64,6 +56,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
private static final Logger logger = LogManager.getLogger(TransportUpdateTransformAction.class);
private final XPackLicenseState licenseState;
private final Settings settings;
private final Client client;
private final TransformConfigManager transformConfigManager;
private final SecurityContext securityContext;
@ -123,6 +116,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
ThreadPool.Names.SAME
);
this.settings = settings;
this.licenseState = licenseState;
this.client = client;
this.transformConfigManager = transformServices.getConfigManager();
@ -165,91 +159,65 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
// GET transform and attempt to update
// We don't want the update to complete if the config changed between GET and INDEX
transformConfigManager.getTransformConfigurationForUpdate(request.getId(), ActionListener.wrap(configAndVersion -> {
final TransformConfig oldConfig = configAndVersion.v1();
final TransformConfig config = TransformConfig.rewriteForUpdate(oldConfig);
TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
clusterState,
settings,
client,
transformConfigManager,
configAndVersion.v1(),
update,
configAndVersion.v2(),
request.isDeferValidation(),
false, // dryRun
true, // checkAccess
ActionListener.wrap(updateResponse -> {
TransformConfig updatedConfig = updateResponse.getConfig();
auditor.info(updatedConfig.getId(), "Updated transform.");
logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus());
// If it is a noop don't bother even writing the doc, save the cycles, just return here.
// skip when:
// - config is in the latest index
// - rewrite did not change the config
// - update is not making any changes
if (config.getVersion() != null
&& config.getVersion().onOrAfter(TransformInternalIndexConstants.INDEX_VERSION_LAST_CHANGED)
&& config.equals(oldConfig)
&& update.isNoop(config)) {
listener.onResponse(new Response(config));
return;
}
TransformConfig updatedConfig = update.apply(config);
checkTransformConfigAndLogWarnings(updatedConfig);
final ActionListener<Response> updateListener;
if (update.changesSettings(config)) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = tasksMetadata.getTask(request.getId());
if (update.changesSettings(configAndVersion.v1())) {
PersistentTasksCustomMetadata tasksMetadata = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(
clusterState
);
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = tasksMetadata.getTask(request.getId());
// to send a request to apply new settings at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
// - the node where transform is executed on is at least 7.8.0 in order to understand the request
if (transformTask != null
&& transformTask.isAssigned()
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {
request.setNodes(transformTask.getExecutorNode());
updateListener = ActionListener.wrap(updateResponse -> {
request.setConfig(updateResponse.getConfig());
super.doExecute(task, request, listener);
}, listener::onFailure);
} else {
updateListener = listener;
}
} else {
updateListener = listener;
}
// to send a request to apply new settings at runtime, several requirements must be met:
// - transform must be running, meaning a task exists
// - transform is not failed (stopped transforms do not have a task)
// - the node where transform is executed on is at least 7.8.0 in order to understand the request
if (transformTask != null
&& transformTask.isAssigned()
&& transformTask.getState() instanceof TransformState
&& ((TransformState) transformTask.getState()).getTaskState() != TransformTaskState.FAILED
&& clusterState.nodes().get(transformTask.getExecutorNode()).getVersion().onOrAfter(Version.V_7_8_0)) {
// <3> Update the transform
ActionListener<ValidateTransformAction.Response> validateTransformListener = ActionListener.wrap(
validationResponse -> {
updateTransform(
request,
updatedConfig,
validationResponse.getDestIndexMappings(),
configAndVersion.v2(),
clusterState,
updateListener);
},
listener::onFailure
request.setNodes(transformTask.getExecutorNode());
request.setConfig(updatedConfig);
super.doExecute(task, request, listener);
return;
}
}
listener.onResponse(new Response(updatedConfig));
}, listener::onFailure)
);
// <2> Validate source and destination indices
ActionListener<Void> checkPrivilegesListener = ActionListener.wrap(
aVoid -> {
client.execute(
ValidateTransformAction.INSTANCE,
new ValidateTransformAction.Request(updatedConfig, request.isDeferValidation()),
validateTransformListener
);
},
listener::onFailure);
// <1> Early check to verify that the user can create the destination index and can read from the source
if (licenseState.isSecurityEnabled() && request.isDeferValidation() == false) {
TransformPrivilegeChecker.checkPrivileges(
"update",
securityContext,
indexNameExpressionResolver,
clusterState,
client,
updatedConfig,
true,
checkPrivilegesListener);
} else { // No security enabled, just move on
checkPrivilegesListener.onResponse(null);
}
}, listener::onFailure));
}
private void checkTransformConfigAndLogWarnings(TransformConfig config) {
final Function function = FunctionFactory.create(config);
List<String> warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig());
for (String warning : warnings) {
logger.warn(new ParameterizedMessage("[{}] {}", config.getId(), warning));
auditor.warning(config.getId(), warning);
}
}
@Override
protected void taskOperation(Request request, TransformTask transformTask, ActionListener<Response> listener) {
// apply the settings
@ -268,77 +236,4 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
return tasks.get(0);
}
private void updateTransform(
Request request,
TransformConfig config,
Map<String, String> mappings,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ClusterState clusterState,
ActionListener<Response> listener
) {
final Function function = FunctionFactory.create(config);
// <3> Return to the listener
ActionListener<Boolean> putTransformConfigurationListener = ActionListener.wrap(putTransformConfigurationResult -> {
auditor.info(config.getId(), "Updated transform.");
List<String> warnings = TransformConfigLinter.getWarnings(function, config.getSource(), config.getSyncConfig());
for (String warning : warnings) {
logger.warn(new ParameterizedMessage("[{}] {}", config.getId(), warning));
auditor.warning(config.getId(), warning);
}
transformConfigManager.deleteOldTransformConfigurations(request.getId(), ActionListener.wrap(r -> {
logger.trace("[{}] successfully deleted old transform configurations", request.getId());
listener.onResponse(new Response(config));
}, e -> {
logger.warn(LoggerMessageFormat.format("[{}] failed deleting old transform configurations.", request.getId()), e);
listener.onResponse(new Response(config));
}));
},
// If we failed to INDEX AND we created the destination index, the destination index will still be around
// This is a similar behavior to _start
listener::onFailure
);
// <2> Update our transform
ActionListener<Boolean> createDestinationListener = ActionListener.wrap(
createDestResponse -> transformConfigManager.updateTransformConfiguration(
config,
seqNoPrimaryTermAndIndex,
putTransformConfigurationListener
),
listener::onFailure
);
// <1> Create destination index if necessary
String[] dest = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.lenientExpandOpen(),
config.getDestination().getIndex()
);
String[] src = indexNameExpressionResolver.concreteIndexNames(
clusterState,
IndicesOptions.lenientExpandOpen(),
true,
config.getSource().getIndex()
);
// If we are running, we should verify that the destination index exists and create it if it does not
if (PersistentTasksCustomMetadata.getTaskWithId(clusterState, request.getId()) != null && dest.length == 0
// Verify we have source indices. The user could defer_validations and if the task is already running
// we allow source indices to disappear. If the source and destination indices do not exist, don't do anything
// the transform will just have to dynamically create the destination index without special mapping.
&& src.length > 0) {
createDestinationIndex(config, mappings, createDestinationListener);
} else {
createDestinationListener.onResponse(null);
}
}
private void createDestinationIndex(TransformConfig config, Map<String, String> mappings, ActionListener<Boolean> listener) {
TransformDestIndexSettings generatedDestIndexSettings = TransformIndex.createTransformDestIndexSettings(
mappings,
config.getId(),
Clock.systemUTC()
);
TransformIndex.createDestinationIndex(client, config, generatedDestIndexSettings, listener);
}
}

View file

@ -0,0 +1,250 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.action;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Request;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction.Response;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.action.TransformUpdater.UpdateResult;
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformNodes;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
public class TransportUpgradeTransformsAction extends TransportMasterNodeAction<Request, Response> {
private static final Logger logger = LogManager.getLogger(TransportUpgradeTransformsAction.class);
private final XPackLicenseState licenseState;
private final TransformConfigManager transformConfigManager;
private final SecurityContext securityContext;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Settings settings;
private final Client client;
private final TransformAuditor auditor;
@Inject
public TransportUpgradeTransformsAction(
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
XPackLicenseState licenseState,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
Client client,
Settings settings
) {
this(
UpgradeTransformsAction.NAME,
transportService,
actionFilters,
clusterService,
licenseState,
threadPool,
indexNameExpressionResolver,
transformServices,
client,
settings
);
}
protected TransportUpgradeTransformsAction(
String name,
TransportService transportService,
ActionFilters actionFilters,
ClusterService clusterService,
XPackLicenseState licenseState,
ThreadPool threadPool,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformServices transformServices,
Client client,
Settings settings
) {
super(
name,
transportService,
clusterService,
threadPool,
actionFilters,
Request::new,
indexNameExpressionResolver,
Response::new,
ThreadPool.Names.SAME
);
this.transformConfigManager = transformServices.getConfigManager();
this.settings = settings;
this.licenseState = licenseState;
this.client = client;
this.auditor = transformServices.getAuditor();
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings)
? new SecurityContext(settings, threadPool.getThreadContext())
: null;
}
@Override
protected void masterOperation(Request request, ClusterState state, ActionListener<Response> listener)
throws Exception {
TransformNodes.warnIfNoTransformNodes(state);
// do not allow in mixed clusters
if (state.nodes().getMaxNodeVersion().after(state.nodes().getMinNodeVersion())) {
listener.onFailure(
new ElasticsearchStatusException(
"Cannot upgrade transforms. All nodes must be the same version [{}]",
RestStatus.CONFLICT,
state.nodes().getMaxNodeVersion().toString()
)
);
return;
}
recursiveExpandTransformIdsAndUpgrade(request.isDryRun(), ActionListener.wrap(updatesByStatus -> {
final long updated = updatesByStatus.getOrDefault(UpdateResult.Status.UPDATED, 0L);
final long noAction = updatesByStatus.getOrDefault(UpdateResult.Status.NONE, 0L);
final long needsUpdate = updatesByStatus.getOrDefault(UpdateResult.Status.NEEDS_UPDATE, 0L);
if (request.isDryRun() == false) {
transformConfigManager.deleteOldIndices(ActionListener.wrap(aBool -> {
logger.info("Successfully upgraded all transforms, (updated: [{}], no action [{}])", updated, noAction);
listener.onResponse(new UpgradeTransformsAction.Response(updated, noAction, needsUpdate));
}, listener::onFailure));
} else {
// else: dry run
listener.onResponse(new UpgradeTransformsAction.Response(updated, noAction, needsUpdate));
}
}, listener::onFailure));
}
@Override
protected ClusterBlockException checkBlock(UpgradeTransformsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
private void updateOneTransform(String id, boolean dryRun, ActionListener<UpdateResult> listener) {
final ClusterState clusterState = clusterService.state();
transformConfigManager.getTransformConfigurationForUpdate(id, ActionListener.wrap(configAndVersion -> {
TransformConfigUpdate update = new TransformConfigUpdate(null, null, null, null, null, null, null);
TransformConfig config = configAndVersion.v1();
/*
* keep headers from the original document
*
* TODO: Handle deprecated data_frame_transform roles
*
* The headers store user roles and in case the transform has been created in 7.2-7.4
* contain the old data_frame_transform_* roles
*
* For 9.x we need to take action as data_frame_transform_* will be removed
*
* Hint: {@link AuthenticationContextSerializer} for decoding the header
*/
update.setHeaders(config.getHeaders());
TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
clusterState,
settings,
client,
transformConfigManager,
config,
update,
configAndVersion.v2(),
false, // defer validation
dryRun,
false, // check access,
listener
);
}, listener::onFailure));
}
private void recursiveUpdate(
Deque<String> transformsToUpgrade,
Map<UpdateResult.Status, Long> updatesByStatus,
boolean dryRun,
ActionListener<Void> listener
) {
String next = transformsToUpgrade.pollFirst();
// extra paranoia: return if next is null
if (next == null) {
listener.onResponse(null);
return;
}
updateOneTransform(next, dryRun, ActionListener.wrap(updateResponse -> {
TransformConfig updatedConfig = updateResponse.getConfig();
auditor.info(updatedConfig.getId(), "Updated transform.");
logger.debug("[{}] Updated transform [{}]", updatedConfig.getId(), updateResponse.getStatus());
updatesByStatus.compute(updateResponse.getStatus(), (k, v) -> (v == null) ? 1 : v + 1L);
if (transformsToUpgrade.isEmpty() == false) {
recursiveUpdate(transformsToUpgrade, updatesByStatus, dryRun, listener);
} else {
listener.onResponse(null);
}
}, listener::onFailure));
}
private void recursiveExpandTransformIdsAndUpgrade(boolean dryRun, ActionListener<Map<UpdateResult.Status, Long>> listener) {
transformConfigManager.getAllOutdatedTransformIds(ActionListener.wrap(totalAndIds -> {
// exit quickly if there is nothing to do
if (totalAndIds.v2().isEmpty()) {
listener.onResponse(Collections.singletonMap(UpdateResult.Status.NONE, totalAndIds.v1()));
return;
}
Map<UpdateResult.Status, Long> updatesByStatus = new HashMap<>();
updatesByStatus.put(UpdateResult.Status.NONE, totalAndIds.v1() - totalAndIds.v2().size());
Deque<String> ids = new ArrayDeque<>(totalAndIds.v2());
recursiveUpdate(
ids,
updatesByStatus,
dryRun,
ActionListener.wrap(r -> listener.onResponse(updatesByStatus), listener::onFailure)
);
}, listener::onFailure));
}
}

View file

@ -15,6 +15,8 @@ import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -28,15 +30,9 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.BoolQueryBuilder;
@ -50,6 +46,12 @@ import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.core.action.util.ExpandedIdsMatcher;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.transform.TransformField;
@ -63,6 +65,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
@ -94,6 +97,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class IndexBasedTransformConfigManager implements TransformConfigManager {
private static final Logger logger = LogManager.getLogger(IndexBasedTransformConfigManager.class);
private static final int MAX_RESULTS_WINDOW = 10_000;
private final Client client;
private final NamedXContentRegistry xContentRegistry;
@ -249,6 +253,23 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
);
}
@Override
public void deleteOldIndices(ActionListener<Boolean> listener) {
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED,
"-" + TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME
).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
executeAsyncWithOrigin(client, TRANSFORM_ORIGIN, DeleteIndexAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> {
if (response.isAcknowledged() == false) {
listener.onFailure(new ElasticsearchStatusException("Failed to delete internal indices", RestStatus.INTERNAL_SERVER_ERROR));
return;
}
listener.onResponse(true);
}, listener::onFailure));
}
private void putTransformConfiguration(
TransformConfig transformConfig,
DocWriteRequest.OpType optType,
@ -326,6 +347,55 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
);
}
@Override
public void getTransformCheckpointForUpdate(
String transformId,
long checkpoint,
ActionListener<Tuple<TransformCheckpoint, SeqNoPrimaryTermAndIndex>> checkpointAndVersionListener
) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformCheckpoint.documentId(transformId, checkpoint));
SearchRequest searchRequest = client.prepareSearch(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
)
.setQuery(queryBuilder)
// use sort to get the last
.addSort("_index", SortOrder.DESC)
.setSize(1)
.seqNoAndPrimaryTerm(true)
.setAllowPartialSearchResults(false)
.request();
executeAsyncWithOrigin(
client,
TRANSFORM_ORIGIN,
SearchAction.INSTANCE,
searchRequest,
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
// do not fail, this _must_ be handled by the caller
checkpointAndVersionListener.onResponse(null);
return;
}
SearchHit hit = searchResponse.getHits().getHits()[0];
BytesReference source = searchResponse.getHits().getHits()[0].getSourceRef();
parseCheckpointsLenientlyFromSource(
source,
transformId,
ActionListener.wrap(
parsedCheckpoint -> checkpointAndVersionListener.onResponse(
Tuple.tuple(
parsedCheckpoint,
new SeqNoPrimaryTermAndIndex(hit.getSeqNo(), hit.getPrimaryTerm(), hit.getIndex())
)
),
checkpointAndVersionListener::onFailure
)
);
}, checkpointAndVersionListener::onFailure)
);
}
@Override
public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformConfig.documentId(transformId));
@ -413,6 +483,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
)
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
.addSort("_index", SortOrder.DESC)
.setFrom(pageParams.getFrom())
.setTrackTotalHits(true)
.setSize(pageParams.getSize())
@ -438,8 +509,9 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, stream)
) {
TransformConfig config = TransformConfig.fromXContent(parser, null, true);
ids.add(config.getId());
configs.add(config);
if (ids.add(config.getId())) {
configs.add(config);
}
} catch (IOException e) {
foundConfigsListener.onFailure(new ElasticsearchParseException("failed to parse search hit for ids", e));
return;
@ -459,7 +531,8 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
// in versioned indexes (like transform)
if (requiredMatches.isOnlyExact()) {
foundConfigsListener.onResponse(
new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs)))
);
} else {
foundConfigsListener.onResponse(new Tuple<>(totalHits, Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(configs))));
}
@ -468,6 +541,16 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
);
}
@Override
public void getAllTransformIds(ActionListener<Set<String>> listener) {
expandAllTransformIds(false, MAX_RESULTS_WINDOW, ActionListener.wrap(r -> listener.onResponse(r.v2()), listener::onFailure));
}
@Override
public void getAllOutdatedTransformIds(ActionListener<Tuple<Long, Set<String>>> listener) {
expandAllTransformIds(true, MAX_RESULTS_WINDOW, listener);
}
@Override
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
DeleteByQueryRequest request = createDeleteByQueryRequest();
@ -547,6 +630,7 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
@Override
public void getTransformStoredDoc(
String transformId,
boolean allowNoMatch,
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener
) {
QueryBuilder queryBuilder = QueryBuilders.termQuery("_id", TransformStoredDoc.documentId(transformId));
@ -569,9 +653,15 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
searchRequest,
ActionListener.<SearchResponse>wrap(searchResponse -> {
if (searchResponse.getHits().getHits().length == 0) {
resultListener.onFailure(
new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId))
);
if (allowNoMatch) {
resultListener.onResponse(null);
} else {
resultListener.onFailure(
new ResourceNotFoundException(
TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId)
)
);
}
return;
}
SearchHit searchHit = searchResponse.getHits().getHits()[0];
@ -714,6 +804,87 @@ public class IndexBasedTransformConfigManager implements TransformConfigManager
return QueryBuilders.constantScoreQuery(queryBuilder);
}
/**
* Expand all transform ids
*
* @param filterForOutdated if true, only returns outdated ids (after de-duplication)
* @param maxResultWindow the max result window size (exposed for testing)
* @param listener listener to call containing transform ids
*/
void expandAllTransformIds(boolean filterForOutdated, int maxResultWindow, ActionListener<Tuple<Long, Set<String>>> listener) {
PageParams startPage = new PageParams(0, maxResultWindow);
Set<String> collectedIds = new HashSet<>();
recursiveExpandAllTransformIds(collectedIds, 0, filterForOutdated, maxResultWindow, null, startPage, listener);
}
private void recursiveExpandAllTransformIds(
Set<String> collectedIds,
long total,
boolean filterForOutdated,
int maxResultWindow,
String lastId,
PageParams page,
ActionListener<Tuple<Long, Set<String>>> listener
) {
SearchRequest request = client.prepareSearch(
TransformInternalIndexConstants.INDEX_NAME_PATTERN,
TransformInternalIndexConstants.INDEX_NAME_PATTERN_DEPRECATED
)
.addSort(TransformField.ID.getPreferredName(), SortOrder.ASC)
.addSort("_index", SortOrder.DESC)
.setFrom(page.getFrom())
.setSize(page.getSize())
.setFetchSource(false)
.addDocValueField(TransformField.ID.getPreferredName())
.request();
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
ActionListener.<SearchResponse>wrap(searchResponse -> {
long totalHits = total;
String idOfLastHit = lastId;
for (SearchHit hit : searchResponse.getHits().getHits()) {
String id = hit.field(TransformField.ID.getPreferredName()).getValue();
// paranoia
if (Strings.isNullOrEmpty(id)) {
continue;
}
// only count hits if looking for outdated transforms
if (filterForOutdated && hit.getIndex().equals(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)) {
++totalHits;
} else if (id.equals(idOfLastHit) == false && collectedIds.add(id)) {
++totalHits;
}
idOfLastHit = id;
}
if (searchResponse.getHits().getHits().length == page.getSize()) {
PageParams nextPage = new PageParams(page.getFrom() + page.getSize(), maxResultWindow);
recursiveExpandAllTransformIds(
collectedIds,
totalHits,
filterForOutdated,
maxResultWindow,
idOfLastHit,
nextPage,
listener
);
return;
}
listener.onResponse(new Tuple<>(totalHits, collectedIds));
}, listener::onFailure),
client::search
);
}
private static Tuple<RestStatus, Throwable> getStatusAndReason(final BulkByScrollResponse response) {
RestStatus status = RestStatus.OK;
Throwable reason = new Exception("Unknown error");

View file

@ -19,6 +19,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface TransformConfigManager {
@ -85,14 +86,36 @@ public interface TransformConfigManager {
*/
void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener);
/**
* This deletes all _old_ internal storages(indices) except the most recent one.
*
* CAUTION: Deletes data without checks! Special method for upgrades.
*
* @param listener listener to call on completion
*/
void deleteOldIndices(ActionListener<Boolean> listener);
/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id
*
* @param transformId the transform id
* @param checkpoint the checkpoint
* @param resultListener listener to call after request has been made
* @param checkpointListener listener to call after request has been made
*/
void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> resultListener);
void getTransformCheckpoint(String transformId, long checkpoint, ActionListener<TransformCheckpoint> checkpointListener);
/**
* Get a stored checkpoint, requires the transform id as well as the checkpoint id. This function is only for internal use.
*
* @param transformId the transform id
* @param checkpoint the checkpoint
* @param checkpointAndVersionListener listener to call after inner request has returned
*/
void getTransformCheckpointForUpdate(
String transformId,
long checkpoint,
ActionListener<Tuple<TransformCheckpoint, SeqNoPrimaryTermAndIndex>> checkpointAndVersionListener
);
/**
* Get the transform configuration for a given transform id. This function is only for internal use. For transforms returned via GET
@ -132,6 +155,20 @@ public interface TransformConfigManager {
ActionListener<Tuple<Long, Tuple<List<String>, List<TransformConfig>>>> foundConfigsListener
);
/**
* Get all transform ids
*
* @param listener The listener to call with the collected ids
*/
void getAllTransformIds(ActionListener<Set<String>> listener);
/**
* Get all transform ids that aren't using the latest index.
*
* @param listener The listener to call with total number of transforms and the list of transform ids.
*/
void getAllOutdatedTransformIds(ActionListener<Tuple<Long, Set<String>>> listener);
/**
* This deletes the configuration and all other documents corresponding to the transform id (e.g. checkpoints).
*
@ -146,7 +183,11 @@ public interface TransformConfigManager {
ActionListener<SeqNoPrimaryTermAndIndex> listener
);
void getTransformStoredDoc(String transformId, ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener);
void getTransformStoredDoc(
String transformId,
boolean allowNoMatch,
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener
);
void getTransformStoredDocs(Collection<String> transformIds, ActionListener<List<TransformStoredDoc>> listener);

View file

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.rest.action;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.transform.TransformField;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction;
import java.io.IOException;
import java.util.List;
import static java.util.Collections.singletonList;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpgradeTransformsAction extends BaseRestHandler {
@Override
public List<Route> routes() {
return singletonList(new Route(POST, TransformField.REST_BASE_PATH_TRANSFORMS + "_upgrade"));
}
@Override
public String getName() {
return "transform_upgrade_transforms_action";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
if (restRequest.hasContent()) {
throw new IllegalArgumentException("upgrade does not allow a request body");
}
boolean dryRun = restRequest.paramAsBoolean(TransformField.DRY_RUN.getPreferredName(), false);
return channel -> client.execute(
UpgradeTransformsAction.INSTANCE,
new UpgradeTransformsAction.Request(dryRun),
new RestToXContentListener<>(channel)
);
}
}

View file

@ -295,7 +295,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
ValidationException validationException = config.validate(null);
if (validationException == null) {
indexerBuilder.setTransformConfig(config);
transformServices.getConfigManager().getTransformStoredDoc(transformId, transformStatsActionListener);
transformServices.getConfigManager().getTransformStoredDoc(transformId, false, transformStatsActionListener);
} else {
auditor.error(transformId, validationException.getMessage());
markAsFailed(

View file

@ -0,0 +1,350 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xpack.core.XPackSettings;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesRequest;
import org.elasticsearch.xpack.core.security.action.user.HasPrivilegesResponse;
import org.elasticsearch.xpack.core.security.user.User;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigUpdate;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.action.TransformUpdater.UpdateResult;
import org.elasticsearch.xpack.transform.persistence.InMemoryTransformConfigManager;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.mockito.Mockito.mock;
public class TransformUpdaterTests extends ESTestCase {
private static final String USER_NAME = "bob";
private final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, null) {
@Override
public User getUser() {
return new User(USER_NAME);
}
};
private final IndexNameExpressionResolver indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance();
private Client client;
private final Settings settings = Settings.builder().put(XPackSettings.SECURITY_ENABLED.getKey(), true).build();
private final XPackLicenseState licenseState = mock(XPackLicenseState.class);
private static class MyMockClient extends NoOpClient {
MyMockClient(String testName) {
super(testName);
}
@SuppressWarnings("unchecked")
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
if (request instanceof HasPrivilegesRequest) {
listener.onResponse((Response) new HasPrivilegesResponse());
} else if (request instanceof ValidateTransformAction.Request) {
listener.onResponse((Response) new ValidateTransformAction.Response(Collections.emptyMap()));
} else {
super.doExecute(action, request, listener);
}
}
}
@Before
public void setupClient() {
if (client != null) {
client.close();
}
client = new MyMockClient(getTestName());
}
@After
public void tearDownClient() {
client.close();
}
public void testTransformUpdateNoAction() throws InterruptedException {
TransformConfigManager transformConfigManager = new InMemoryTransformConfigManager();
TransformConfig maxCompatibleConfig = TransformConfigTests.randomTransformConfig(
randomAlphaOfLengthBetween(1, 10),
Version.CURRENT
);
transformConfigManager.putTransformConfiguration(maxCompatibleConfig, ActionListener.wrap(r -> {}, e -> {}));
assertConfiguration(
listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener),
config -> {}
);
TransformConfigUpdate update = TransformConfigUpdate.EMPTY;
assertUpdate(
listener -> TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
ClusterState.EMPTY_STATE,
settings,
client,
transformConfigManager,
maxCompatibleConfig,
update,
null, // seqNoPrimaryTermAndIndex
true,
false,
false,
listener
),
updateResult -> {
assertEquals(UpdateResult.Status.NONE, updateResult.getStatus());
assertEquals(maxCompatibleConfig, updateResult.getConfig());
}
);
assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(maxCompatibleConfig.getId(), listener), config -> {
assertNotNull(config);
assertEquals(Version.CURRENT, config.getVersion());
});
TransformConfig minCompatibleConfig = TransformConfigTests.randomTransformConfig(
randomAlphaOfLengthBetween(1, 10),
TransformConfig.CONFIG_VERSION_LAST_CHANGED
);
transformConfigManager.putTransformConfiguration(minCompatibleConfig, ActionListener.wrap(r -> {}, e -> {}));
assertUpdate(
listener -> TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
ClusterState.EMPTY_STATE,
settings,
client,
transformConfigManager,
minCompatibleConfig,
update,
null, // seqNoPrimaryTermAndIndex
true,
false,
false,
listener
),
updateResult -> {
assertEquals(UpdateResult.Status.NONE, updateResult.getStatus());
assertEquals(minCompatibleConfig, updateResult.getConfig());
}
);
assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> {
assertNotNull(config);
assertEquals(TransformConfig.CONFIG_VERSION_LAST_CHANGED, config.getVersion());
});
}
public void testTransformUpdateRewrite() throws InterruptedException {
InMemoryTransformConfigManager transformConfigManager = new InMemoryTransformConfigManager();
TransformConfig oldConfig = TransformConfigTests.randomTransformConfig(
randomAlphaOfLengthBetween(1, 10),
VersionUtils.randomVersionBetween(
random(),
Version.V_7_2_0,
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED)
)
);
transformConfigManager.putOldTransformConfiguration(oldConfig, ActionListener.wrap(r -> {}, e -> {}));
TransformCheckpoint checkpoint = new TransformCheckpoint(
oldConfig.getId(),
0L, // timestamp
42L, // checkpoint
Collections.singletonMap("index_1", new long[] { 1, 2, 3, 4 }), // index checkpoints
0L
);
transformConfigManager.putOldTransformCheckpoint(checkpoint, ActionListener.wrap(r -> {}, e -> {}));
TransformStoredDoc stateDoc = new TransformStoredDoc(
oldConfig.getId(),
new TransformState(
TransformTaskState.STARTED,
IndexerState.INDEXING,
null, // position
42L, // checkpoint
null, // reason
null, // progress
null, // node attributes
false // shouldStopAtNextCheckpoint
),
TransformIndexerStatsTests.randomStats()
);
transformConfigManager.putOrUpdateOldTransformStoredDoc(stateDoc, null, ActionListener.wrap(r -> {}, e -> {}));
assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> {});
TransformConfigUpdate update = TransformConfigUpdate.EMPTY;
assertUpdate(
listener -> TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
ClusterState.EMPTY_STATE,
settings,
client,
transformConfigManager,
oldConfig,
update,
null, // seqNoPrimaryTermAndIndex
true,
false,
false,
listener
),
updateResult -> {
assertEquals(UpdateResult.Status.UPDATED, updateResult.getStatus());
assertNotEquals(oldConfig, updateResult.getConfig());
}
);
assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(oldConfig.getId(), listener), config -> {
assertNotNull(config);
assertEquals(Version.CURRENT, config.getVersion());
});
assertCheckpoint(
listener -> transformConfigManager.getTransformCheckpointForUpdate(oldConfig.getId(), 42L, listener),
checkpointAndVersion -> {
assertEquals(InMemoryTransformConfigManager.CURRENT_INDEX, checkpointAndVersion.v2().getIndex());
assertEquals(42L, checkpointAndVersion.v1().getCheckpoint());
assertEquals(checkpoint.getIndicesCheckpoints(), checkpointAndVersion.v1().getIndicesCheckpoints());
}
);
assertStoredState(
listener -> transformConfigManager.getTransformStoredDoc(oldConfig.getId(), false, listener),
storedDocAndVersion -> {
assertEquals(InMemoryTransformConfigManager.CURRENT_INDEX, storedDocAndVersion.v2().getIndex());
assertEquals(stateDoc.getTransformState(), storedDocAndVersion.v1().getTransformState());
assertEquals(stateDoc.getTransformStats(), storedDocAndVersion.v1().getTransformStats());
}
);
// same as dry run
TransformConfig oldConfigForDryRunUpdate = TransformConfigTests.randomTransformConfig(
randomAlphaOfLengthBetween(1, 10),
VersionUtils.randomVersionBetween(
random(),
Version.V_7_2_0,
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED)
)
);
transformConfigManager.putOldTransformConfiguration(oldConfigForDryRunUpdate, ActionListener.wrap(r -> {}, e -> {}));
assertConfiguration(
listener -> transformConfigManager.getTransformConfiguration(oldConfigForDryRunUpdate.getId(), listener),
config -> {}
);
assertUpdate(
listener -> TransformUpdater.updateTransform(
licenseState,
securityContext,
indexNameExpressionResolver,
ClusterState.EMPTY_STATE,
settings,
client,
transformConfigManager,
oldConfigForDryRunUpdate,
update,
null, // seqNoPrimaryTermAndIndex
true,
true,
false,
listener
),
updateResult -> {
assertEquals(UpdateResult.Status.NEEDS_UPDATE, updateResult.getStatus());
assertNotEquals(oldConfigForDryRunUpdate, updateResult.getConfig());
assertEquals(Version.CURRENT, updateResult.getConfig().getVersion());
}
);
assertConfiguration(
listener -> transformConfigManager.getTransformConfiguration(oldConfigForDryRunUpdate.getId(), listener),
config -> {
assertNotNull(config);
assertEquals(oldConfigForDryRunUpdate, config);
}
);
}
private void assertUpdate(Consumer<ActionListener<UpdateResult>> function, Consumer<UpdateResult> furtherTests)
throws InterruptedException {
assertAsync(function, furtherTests);
}
private void assertConfiguration(Consumer<ActionListener<TransformConfig>> function, Consumer<TransformConfig> furtherTests)
throws InterruptedException {
assertAsync(function, furtherTests);
}
private void assertCheckpoint(
Consumer<ActionListener<Tuple<TransformCheckpoint, SeqNoPrimaryTermAndIndex>>> function,
Consumer<Tuple<TransformCheckpoint, SeqNoPrimaryTermAndIndex>> furtherTests
) throws InterruptedException {
assertAsync(function, furtherTests);
}
private void assertStoredState(
Consumer<ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>>> function,
Consumer<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> furtherTests
) throws InterruptedException {
assertAsync(function, furtherTests);
}
private <T> void assertAsync(Consumer<ActionListener<T>> function, Consumer<T> furtherTests) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean listenerCalled = new AtomicBoolean(false);
LatchedActionListener<T> listener = new LatchedActionListener<>(ActionListener.wrap(r -> {
assertTrue("listener called more than once", listenerCalled.compareAndSet(false, true));
furtherTests.accept(r);
}, e -> { fail("got unexpected exception: " + e); }), latch);
function.accept(listener);
assertTrue("timed out after 20s", latch.await(20, TimeUnit.SECONDS));
}
}

View file

@ -9,8 +9,8 @@ package org.elasticsearch.xpack.transform.persistence;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xpack.core.action.util.PageParams;
import org.elasticsearch.xpack.core.transform.TransformMessages;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
@ -20,7 +20,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,12 +35,32 @@ import static java.util.Collections.singletonList;
*/
public class InMemoryTransformConfigManager implements TransformConfigManager {
public static String CURRENT_INDEX = "index-1";
public static String OLD_INDEX = "index-0";
private final Map<String, List<TransformCheckpoint>> checkpoints = new HashMap<>();
private final Map<String, TransformConfig> configs = new HashMap<>();
private final Map<String, TransformStoredDoc> transformStoredDocs = new HashMap<>();
// for mocking updates
private final Map<String, List<TransformCheckpoint>> oldCheckpoints = new HashMap<>();
private final Map<String, TransformConfig> oldConfigs = new HashMap<>();
private final Map<String, TransformStoredDoc> oldTransformStoredDocs = new HashMap<>();
public InMemoryTransformConfigManager() {}
public void putOldTransformCheckpoint(TransformCheckpoint checkpoint, ActionListener<Boolean> listener) {
oldCheckpoints.compute(checkpoint.getTransformId(), (id, listOfCheckpoints) -> {
if (listOfCheckpoints == null) {
listOfCheckpoints = new ArrayList<TransformCheckpoint>();
}
listOfCheckpoints.add(checkpoint);
return listOfCheckpoints;
});
listener.onResponse(true);
}
@Override
public void putTransformCheckpoint(TransformCheckpoint checkpoint, ActionListener<Boolean> listener) {
checkpoints.compute(checkpoint.getTransformId(), (id, listOfCheckpoints) -> {
@ -54,6 +74,11 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
listener.onResponse(true);
}
public void putOldTransformConfiguration(TransformConfig transformConfig, ActionListener<Boolean> listener) {
oldConfigs.put(transformConfig.getId(), transformConfig);
listener.onResponse(true);
}
@Override
public void putTransformConfiguration(TransformConfig transformConfig, ActionListener<Boolean> listener) {
configs.put(transformConfig.getId(), transformConfig);
@ -66,7 +91,6 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<Boolean> listener
) {
// for now we ignore seqNoPrimaryTermAndIndex
configs.put(transformConfig.getId(), transformConfig);
listener.onResponse(true);
@ -74,23 +98,51 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
@Override
public void deleteOldTransformConfigurations(String transformId, ActionListener<Boolean> listener) {
configs.remove(transformId);
oldConfigs.remove(transformId);
listener.onResponse(true);
}
@Override
public void deleteOldTransformStoredDocuments(String transformId, ActionListener<Long> listener) {
listener.onResponse(transformStoredDocs.remove(transformId) == null ? 0L : 1L);
long deletedDocs = oldTransformStoredDocs.remove(transformId) == null ? 0L : 1L;
listener.onResponse(deletedDocs);
}
@Override
public void deleteOldCheckpoints(String transformId, long deleteCheckpointsBelow, long deleteOlderThan, ActionListener<Long> listener) {
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
int sizeBeforeDelete = checkpointsById.size();
long deletedDocs = 0;
final List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
if (checkpointsById != null) {
final int sizeBeforeDelete = checkpointsById.size();
checkpointsById.removeIf(cp -> { return cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan; });
deletedDocs += sizeBeforeDelete - checkpointsById.size();
}
listener.onResponse(Long.valueOf(sizeBeforeDelete - checkpointsById.size()));
// old checkpoints
final List<TransformCheckpoint> checkpointsByIdOld = oldCheckpoints.get(transformId);
if (checkpointsByIdOld != null) {
final int sizeBeforeDeleteOldCheckpoints = checkpointsByIdOld.size();
checkpointsByIdOld.removeIf(cp -> cp.getCheckpoint() < deleteCheckpointsBelow && cp.getTimestamp() < deleteOlderThan);
deletedDocs += sizeBeforeDeleteOldCheckpoints - checkpointsByIdOld.size();
}
listener.onResponse(deletedDocs);
}
@Override
public void deleteOldIndices(ActionListener<Boolean> listener) {
if (oldCheckpoints.isEmpty() && oldConfigs.isEmpty() && oldTransformStoredDocs.isEmpty()) {
listener.onResponse(true);
return;
}
// found old documents, emulate index deletion
oldCheckpoints.clear();
oldConfigs.clear();
oldTransformStoredDocs.clear();
listener.onResponse(true);
}
@Override
@ -106,12 +158,56 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
}
}
checkpointsById = oldCheckpoints.get(transformId);
if (checkpointsById != null) {
for (TransformCheckpoint t : checkpointsById) {
if (t.getCheckpoint() == checkpoint) {
resultListener.onResponse(t);
return;
}
}
}
resultListener.onResponse(TransformCheckpoint.EMPTY);
}
@Override
public void getTransformCheckpointForUpdate(
String transformId,
long checkpoint,
ActionListener<Tuple<TransformCheckpoint, SeqNoPrimaryTermAndIndex>> checkpointAndVersionListener
) {
List<TransformCheckpoint> checkpointsById = checkpoints.get(transformId);
if (checkpointsById != null) {
for (TransformCheckpoint t : checkpointsById) {
if (t.getCheckpoint() == checkpoint) {
checkpointAndVersionListener.onResponse(Tuple.tuple(t, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX)));
return;
}
}
}
checkpointsById = oldCheckpoints.get(transformId);
if (checkpointsById != null) {
for (TransformCheckpoint t : checkpointsById) {
if (t.getCheckpoint() == checkpoint) {
checkpointAndVersionListener.onResponse(Tuple.tuple(t, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX)));
return;
}
}
}
checkpointAndVersionListener.onResponse(null);
}
@Override
public void getTransformConfiguration(String transformId, ActionListener<TransformConfig> resultListener) {
TransformConfig config = configs.get(transformId);
if (config == null) {
config = oldConfigs.get(transformId);
}
if (config == null) {
resultListener.onFailure(
new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
@ -127,14 +223,20 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
ActionListener<Tuple<TransformConfig, SeqNoPrimaryTermAndIndex>> configAndVersionListener
) {
TransformConfig config = configs.get(transformId);
if (config == null) {
TransformConfig oldConfig = oldConfigs.get(transformId);
if (config == null && oldConfig == null) {
configAndVersionListener.onFailure(
new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.REST_UNKNOWN_TRANSFORM, transformId))
);
return;
}
configAndVersionListener.onResponse(Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1")));
if (config != null) {
configAndVersionListener.onResponse(Tuple.tuple(config, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX)));
} else {
configAndVersionListener.onResponse(Tuple.tuple(oldConfig, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX)));
}
}
@Override
@ -144,39 +246,72 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
boolean allowNoMatch,
ActionListener<Tuple<Long, Tuple<List<String>, List<TransformConfig>>>> foundConfigsListener
) {
if (Regex.isMatchAllPattern(transformIdsExpression)) {
List<String> ids = new ArrayList<>(configs.keySet());
foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, new ArrayList<>(configs.values()))));
List<String> ids = new ArrayList<>();
List<TransformConfig> configsAsList = new ArrayList<>();
configs.entrySet().forEach(entry -> {
ids.add(entry.getKey());
configsAsList.add(entry.getValue());
});
oldConfigs.entrySet().forEach(entry -> {
if (configs.containsKey(entry.getKey()) == false) {
ids.add(entry.getKey());
configsAsList.add(entry.getValue());
}
});
foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, configsAsList)));
return;
}
if (Regex.isSimpleMatchPattern(transformIdsExpression) == false) {
if (configs.containsKey(transformIdsExpression)) {
foundConfigsListener.onResponse(
new Tuple<>(
1L, Tuple.tuple(singletonList(transformIdsExpression), singletonList(configs.get(transformIdsExpression)))));
new Tuple<>(1L, Tuple.tuple(singletonList(transformIdsExpression), singletonList(configs.get(transformIdsExpression))))
);
} else {
foundConfigsListener.onResponse(new Tuple<>(0L, Tuple.tuple(emptyList(), emptyList())));
}
return;
}
Set<String> ids = new LinkedHashSet<>();
Set<TransformConfig> matchedConfigs = new LinkedHashSet<>();
configs.keySet().forEach(id -> {
if (Regex.simpleMatch(transformIdsExpression, id)) {
ids.add(id);
matchedConfigs.add(configs.get(id));
List<String> ids = new ArrayList<>();
List<TransformConfig> configsAsList = new ArrayList<>();
configs.entrySet().forEach(entry -> {
if (Regex.simpleMatch(transformIdsExpression, entry.getKey())) {
ids.add(entry.getKey());
configsAsList.add(entry.getValue());
}
});
foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(new ArrayList<>(ids), new ArrayList<>(matchedConfigs))));
oldConfigs.entrySet().forEach(entry -> {
if (configs.containsKey(entry.getKey()) == false && Regex.simpleMatch(transformIdsExpression, entry.getKey())) {
ids.add(entry.getKey());
configsAsList.add(entry.getValue());
}
});
foundConfigsListener.onResponse(new Tuple<>((long) ids.size(), Tuple.tuple(ids, configsAsList)));
}
@Override
public void deleteTransform(String transformId, ActionListener<Boolean> listener) {
configs.remove(transformId);
oldConfigs.remove(transformId);
transformStoredDocs.remove(transformId);
oldTransformStoredDocs.remove(transformId);
checkpoints.remove(transformId);
oldCheckpoints.remove(transformId);
}
public void putOrUpdateOldTransformStoredDoc(
TransformStoredDoc storedDoc,
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<SeqNoPrimaryTermAndIndex> listener
) {
// for now we ignore seqNoPrimaryTermAndIndex
oldTransformStoredDocs.put(storedDoc.getId(), storedDoc);
listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX));
}
@Override
@ -185,27 +320,37 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
SeqNoPrimaryTermAndIndex seqNoPrimaryTermAndIndex,
ActionListener<SeqNoPrimaryTermAndIndex> listener
) {
// for now we ignore seqNoPrimaryTermAndIndex
transformStoredDocs.put(storedDoc.getId(), storedDoc);
listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1"));
listener.onResponse(new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX));
}
@Override
public void getTransformStoredDoc(
String transformId,
boolean allowNoMatch,
ActionListener<Tuple<TransformStoredDoc, SeqNoPrimaryTermAndIndex>> resultListener
) {
TransformStoredDoc storedDoc = transformStoredDocs.get(transformId);
if (storedDoc == null) {
resultListener.onFailure(
new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId))
);
if (storedDoc != null) {
resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, CURRENT_INDEX)));
return;
}
resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, "index-1")));
storedDoc = oldTransformStoredDocs.get(transformId);
if (storedDoc != null) {
resultListener.onResponse(Tuple.tuple(storedDoc, new SeqNoPrimaryTermAndIndex(1L, 1L, OLD_INDEX)));
return;
}
if (allowNoMatch) {
resultListener.onResponse(null);
return;
}
resultListener.onFailure(
new ResourceNotFoundException(TransformMessages.getMessage(TransformMessages.UNKNOWN_TRANSFORM_STATS, transformId))
);
}
@Override
@ -215,6 +360,11 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
TransformStoredDoc storedDoc = transformStoredDocs.get(transformId);
if (storedDoc != null) {
docs.add(storedDoc);
} else {
storedDoc = oldTransformStoredDocs.get(transformId);
if (storedDoc != null) {
docs.add(storedDoc);
}
}
}
listener.onResponse(docs);
@ -225,4 +375,18 @@ public class InMemoryTransformConfigManager implements TransformConfigManager {
listener.onResponse(true);
}
@Override
public void getAllTransformIds(ActionListener<Set<String>> listener) {
Set<String> allIds = new HashSet<>(configs.keySet());
allIds.addAll(oldConfigs.keySet());
listener.onResponse(allIds);
}
@Override
public void getAllOutdatedTransformIds(ActionListener<Tuple<Long, Set<String>>> listener) {
Set<String> outdatedIds = new HashSet<>(oldConfigs.keySet());
outdatedIds.removeAll(configs.keySet());
listener.onResponse(new Tuple<>(Long.valueOf(configs.size() + outdatedIds.size()), outdatedIds));
}
}