diff --git a/docs/changelog/87361.yaml b/docs/changelog/87361.yaml new file mode 100644 index 000000000000..fcca508ff249 --- /dev/null +++ b/docs/changelog/87361.yaml @@ -0,0 +1,6 @@ +pr: 87361 +summary: "Implement per-transform num_failure_retries setting" +area: Transform +type: enhancement +issues: [] + diff --git a/docs/reference/rest-api/common-parms.asciidoc b/docs/reference/rest-api/common-parms.asciidoc index a7108c5fb03a..5cd3b88993c0 100644 --- a/docs/reference/rest-api/common-parms.asciidoc +++ b/docs/reference/rest-api/common-parms.asciidoc @@ -1056,6 +1056,13 @@ adjusted to a lower value. The minimum value is `10` and the maximum is `65,536` The default value is `500`. end::transform-settings-max-page-search-size[] +tag::transform-settings-num-failure-retries[] +Defines the number of retries on a recoverable failure before the {transform} task is marked as `failed`. +The minimum value is `0` and the maximum is `100`. +`-1` can be used to denote infinity. In this case, the {transform} never gives up on retrying a recoverable failure. +The default value is the cluster-level setting `num_transform_failure_retries`. +end::transform-settings-num-failure-retries[] + tag::transform-sort[] Specifies the date field that is used to identify the latest documents. end::transform-sort[] diff --git a/docs/reference/settings/transform-settings.asciidoc b/docs/reference/settings/transform-settings.asciidoc index 28dfb9c0c3a0..6447902023ce 100644 --- a/docs/reference/settings/transform-settings.asciidoc +++ b/docs/reference/settings/transform-settings.asciidoc @@ -16,14 +16,14 @@ default. `node.roles: [ transform ]`:: (<>) Set `node.roles` to contain `transform` to -identify the node as a _transform node_. If you want to run {transforms}, there +identify the node as a _transform node_. If you want to run {transforms}, there must be at least one {transform} node in your cluster. + If you set `node.roles`, you must explicitly specify all the required roles for the node. To learn more, refer to <>. -+ -IMPORTANT: It is strongly recommended that dedicated {transform} nodes also have -the `remote_cluster_client` role; otherwise, {ccs} fails when used in ++ +IMPORTANT: It is strongly recommended that dedicated {transform} nodes also have +the `remote_cluster_client` role; otherwise, {ccs} fails when used in {transforms}. See <>. `xpack.transform.enabled`:: @@ -37,3 +37,5 @@ retries when it experiences a non-fatal error. Once the number of retries is exhausted, the {transform} task is marked as `failed`. The default value is `10` with a valid minimum of `0` and maximum of `100`. If a {transform} is already running, it has to be restarted to use the changed setting. +The `num_failure_retries` setting can also be specified on an individual {transform} level. +Specifying this setting for each {transform} individually is recommended. diff --git a/docs/reference/transform/apis/put-transform.asciidoc b/docs/reference/transform/apis/put-transform.asciidoc index 19e8f5293e52..06d8a564412e 100644 --- a/docs/reference/transform/apis/put-transform.asciidoc +++ b/docs/reference/transform/apis/put-transform.asciidoc @@ -215,6 +215,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded `max_page_search_size`::: (Optional, integer) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size] +`num_failure_retries`::: +(Optional, integer) +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries] ==== //End settings diff --git a/docs/reference/transform/apis/update-transform.asciidoc b/docs/reference/transform/apis/update-transform.asciidoc index 0e2546958f67..4d560dccf8a9 100644 --- a/docs/reference/transform/apis/update-transform.asciidoc +++ b/docs/reference/transform/apis/update-transform.asciidoc @@ -46,8 +46,8 @@ each checkpoint. of update and runs with those privileges. If you provide <>, those credentials are used instead. -* You must use {kib} or this API to update a {transform}. Directly updating any -{transform} internal, system, or hidden indices is not supported and may cause +* You must use {kib} or this API to update a {transform}. Directly updating any +{transform} internal, system, or hidden indices is not supported and may cause permanent failure. ==== @@ -157,6 +157,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded `max_page_search_size`::: (Optional, integer) include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size] +`num_failure_retries`::: +(Optional, integer) +include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries] ==== //End settings diff --git a/libs/x-content/src/main/java/org/elasticsearch/xcontent/AbstractObjectParser.java b/libs/x-content/src/main/java/org/elasticsearch/xcontent/AbstractObjectParser.java index ac83bd9cd16d..30841ee36d41 100644 --- a/libs/x-content/src/main/java/org/elasticsearch/xcontent/AbstractObjectParser.java +++ b/libs/x-content/src/main/java/org/elasticsearch/xcontent/AbstractObjectParser.java @@ -250,7 +250,7 @@ public abstract class AbstractObjectParser { } /** - * Declare a double field that parses explicit {@code null}s in the json to a default value. + * Declare an integer field that parses explicit {@code null}s in the json to a default value. */ public void declareIntOrNull(BiConsumer consumer, int nullValue, ParseField field) { declareField( diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java index a2baae6c32df..24b147c9e85a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformField.java @@ -41,6 +41,7 @@ public final class TransformField { public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints"); public static final ParseField USE_PIT = new ParseField("use_point_in_time"); public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings"); + public static final ParseField NUM_FAILURE_RETRIES = new ParseField("num_failure_retries"); public static final ParseField FIELD = new ParseField("field"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField TIME = new ParseField("time"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java index fb6b921faca4..3c715f136be7 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfig.java @@ -34,12 +34,15 @@ public class SettingsConfig implements Writeable, ToXContentObject { public static final ConstructingObjectParser STRICT_PARSER = createParser(false); public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + public static final int MAX_NUM_FAILURE_RETRIES = 100; + private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1; private static final float DEFAULT_DOCS_PER_SECOND = -1F; private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1; private static final int DEFAULT_ALIGN_CHECKPOINTS = -1; private static final int DEFAULT_USE_PIT = -1; private static final int DEFAULT_DEDUCE_MAPPINGS = -1; + private static final int DEFAULT_NUM_FAILURE_RETRIES = -2; private static ConstructingObjectParser createParser(boolean lenient) { ConstructingObjectParser parser = new ConstructingObjectParser<>( @@ -51,7 +54,8 @@ public class SettingsConfig implements Writeable, ToXContentObject { (Integer) args[2], (Integer) args[3], (Integer) args[4], - (Integer) args[5] + (Integer) args[5], + (Integer) args[6] ) ); parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE); @@ -84,6 +88,7 @@ public class SettingsConfig implements Writeable, ToXContentObject { TransformField.DEDUCE_MAPPINGS, ValueType.BOOLEAN_OR_NULL ); + parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_NUM_FAILURE_RETRIES, TransformField.NUM_FAILURE_RETRIES); return parser; } @@ -93,9 +98,10 @@ public class SettingsConfig implements Writeable, ToXContentObject { private final Integer alignCheckpoints; private final Integer usePit; private final Integer deduceMappings; + private final Integer numFailureRetries; public SettingsConfig() { - this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null); + this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null); } public SettingsConfig( @@ -104,7 +110,8 @@ public class SettingsConfig implements Writeable, ToXContentObject { Boolean datesAsEpochMillis, Boolean alignCheckpoints, Boolean usePit, - Boolean deduceMappings + Boolean deduceMappings, + Integer numFailureRetries ) { this( maxPageSearchSize, @@ -112,17 +119,19 @@ public class SettingsConfig implements Writeable, ToXContentObject { datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0, alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0, usePit == null ? null : usePit ? 1 : 0, - deduceMappings == null ? null : deduceMappings ? 1 : 0 + deduceMappings == null ? null : deduceMappings ? 1 : 0, + numFailureRetries ); } - public SettingsConfig( + SettingsConfig( Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints, Integer usePit, - Integer deduceMappings + Integer deduceMappings, + Integer numFailureRetries ) { this.maxPageSearchSize = maxPageSearchSize; this.docsPerSecond = docsPerSecond; @@ -130,6 +139,7 @@ public class SettingsConfig implements Writeable, ToXContentObject { this.alignCheckpoints = alignCheckpoints; this.usePit = usePit; this.deduceMappings = deduceMappings; + this.numFailureRetries = numFailureRetries; } public SettingsConfig(final StreamInput in) throws IOException { @@ -155,6 +165,11 @@ public class SettingsConfig implements Writeable, ToXContentObject { } else { deduceMappings = DEFAULT_DEDUCE_MAPPINGS; } + if (in.getVersion().onOrAfter(Version.V_8_4_0)) { + numFailureRetries = in.readOptionalInt(); + } else { + numFailureRetries = null; + } } public Integer getMaxPageSearchSize() { @@ -197,6 +212,14 @@ public class SettingsConfig implements Writeable, ToXContentObject { return deduceMappings; } + public Integer getNumFailureRetries() { + return numFailureRetries != null ? (numFailureRetries == DEFAULT_NUM_FAILURE_RETRIES ? null : numFailureRetries) : null; + } + + public Integer getNumFailureRetriesForUpdate() { + return numFailureRetries; + } + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) { validationException = addValidationError( @@ -207,7 +230,15 @@ public class SettingsConfig implements Writeable, ToXContentObject { validationException ); } - + if (numFailureRetries != null && (numFailureRetries < -1 || numFailureRetries > MAX_NUM_FAILURE_RETRIES)) { + validationException = addValidationError( + "settings.num_failure_retries [" + + numFailureRetries + + "] is out of range. The minimum value is -1 (infinity) and the maximum is " + + MAX_NUM_FAILURE_RETRIES, + validationException + ); + } return validationException; } @@ -229,6 +260,9 @@ public class SettingsConfig implements Writeable, ToXContentObject { if (out.getVersion().onOrAfter(Version.V_8_1_0)) { out.writeOptionalInt(deduceMappings); } + if (out.getVersion().onOrAfter(Version.V_8_4_0)) { + out.writeOptionalInt(numFailureRetries); + } } @Override @@ -253,6 +287,9 @@ public class SettingsConfig implements Writeable, ToXContentObject { if (deduceMappings != null && (deduceMappings.equals(DEFAULT_DEDUCE_MAPPINGS) == false)) { builder.field(TransformField.DEDUCE_MAPPINGS.getPreferredName(), deduceMappings > 0 ? true : false); } + if (numFailureRetries != null && (numFailureRetries.equals(DEFAULT_NUM_FAILURE_RETRIES) == false)) { + builder.field(TransformField.NUM_FAILURE_RETRIES.getPreferredName(), numFailureRetries); + } builder.endObject(); return builder; } @@ -272,12 +309,21 @@ public class SettingsConfig implements Writeable, ToXContentObject { && Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis) && Objects.equals(alignCheckpoints, that.alignCheckpoints) && Objects.equals(usePit, that.usePit) - && Objects.equals(deduceMappings, that.deduceMappings); + && Objects.equals(deduceMappings, that.deduceMappings) + && Objects.equals(numFailureRetries, that.numFailureRetries); } @Override public int hashCode() { - return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings); + return Objects.hash( + maxPageSearchSize, + docsPerSecond, + datesAsEpochMillis, + alignCheckpoints, + usePit, + deduceMappings, + numFailureRetries + ); } @Override @@ -296,6 +342,7 @@ public class SettingsConfig implements Writeable, ToXContentObject { private Integer alignCheckpoints; private Integer usePit; private Integer deduceMappings; + private Integer numFailureRetries; /** * Default builder @@ -314,6 +361,7 @@ public class SettingsConfig implements Writeable, ToXContentObject { this.alignCheckpoints = base.alignCheckpoints; this.usePit = base.usePit; this.deduceMappings = base.deduceMappings; + this.numFailureRetries = base.numFailureRetries; } /** @@ -402,6 +450,11 @@ public class SettingsConfig implements Writeable, ToXContentObject { return this; } + public Builder setNumFailureRetries(Integer numFailureRetries) { + this.numFailureRetries = numFailureRetries == null ? DEFAULT_NUM_FAILURE_RETRIES : numFailureRetries; + return this; + } + /** * Update settings according to given settings config. * @@ -437,12 +490,25 @@ public class SettingsConfig implements Writeable, ToXContentObject { ? null : update.getDeduceMappingsForUpdate(); } + if (update.getNumFailureRetriesForUpdate() != null) { + this.numFailureRetries = update.getNumFailureRetriesForUpdate().equals(DEFAULT_NUM_FAILURE_RETRIES) + ? null + : update.getNumFailureRetriesForUpdate(); + } return this; } public SettingsConfig build() { - return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings); + return new SettingsConfig( + maxPageSearchSize, + docsPerSecond, + datesAsEpochMillis, + alignCheckpoints, + usePit, + deduceMappings, + numFailureRetries + ); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java index 22522b515130..1e5818c053ce 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfig.java @@ -623,7 +623,8 @@ public class TransformConfig implements SimpleDiffable, Writeab builder.getSettings().getDatesAsEpochMillis(), builder.getSettings().getAlignCheckpoints(), builder.getSettings().getUsePit(), - builder.getSettings().getDeduceMappings() + builder.getSettings().getDeduceMappings(), + builder.getSettings().getNumFailureRetries() ) ); } @@ -637,7 +638,8 @@ public class TransformConfig implements SimpleDiffable, Writeab true, builder.getSettings().getAlignCheckpoints(), builder.getSettings().getUsePit(), - builder.getSettings().getDeduceMappings() + builder.getSettings().getDeduceMappings(), + builder.getSettings().getNumFailureRetries() ) ); } @@ -651,7 +653,8 @@ public class TransformConfig implements SimpleDiffable, Writeab builder.getSettings().getDatesAsEpochMillis(), false, builder.getSettings().getUsePit(), - builder.getSettings().getDeduceMappings() + builder.getSettings().getDeduceMappings(), + builder.getSettings().getNumFailureRetries() ) ); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java index f2682a982200..e394fea57db3 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/SettingsConfigTests.java @@ -23,7 +23,10 @@ import org.junit.Before; import java.io.IOException; import java.util.Map; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; public class SettingsConfigTests extends AbstractSerializingTransformTestCase { @@ -36,7 +39,8 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase xContentToMap(ToXContent xcontent) throws IOException { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java index 84e788ce9cfe..f379b4d93b06 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdateTests.java @@ -115,7 +115,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform TimeValue frequency = TimeValue.timeValueSeconds(10); SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30)); String newDescription = "new description"; - SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true, true); + SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true, true, 10); Map newMetadata = randomMetadata(); RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000)); update = new TransformConfigUpdate( @@ -204,7 +204,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform null, null, null, - new SettingsConfig(4_000, null, (Boolean) null, null, null, null), + new SettingsConfig(4_000, null, (Boolean) null, null, null, null, null), null, null ); @@ -223,7 +223,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform null, null, null, - new SettingsConfig(null, 43.244F, (Boolean) null, null, null, null), + new SettingsConfig(null, 43.244F, (Boolean) null, null, null, null, null), null, null ); @@ -240,7 +240,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform null, null, null, - new SettingsConfig(-1, null, (Boolean) null, null, null, null), + new SettingsConfig(-1, null, (Boolean) null, null, null, null, null), null, null ); @@ -256,7 +256,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform null, null, null, - new SettingsConfig(-1, -1F, (Boolean) null, null, null, null), + new SettingsConfig(-1, -1F, (Boolean) null, null, null, null, null), null, null ); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml index 62438e924692..288224f997f0 100644 --- a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/transform/transforms_crud.yml @@ -508,6 +508,42 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } + +--- +"Test put config with invalid number of failure retries": + - do: + catch: /settings\.num_failure_retries \[-2\] is out of range. The minimum value is -1 \(infinity\) and the maximum is 100/ + transform.put_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "settings": { + "num_failure_retries": -2 + } + } + - do: + catch: /settings\.num_failure_retries \[101\] is out of range. The minimum value is -1 \(infinity\) and the maximum is 100/ + transform.put_transform: + transform_id: "airline-transform" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest-index" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + }, + "settings": { + "num_failure_retries": 101 + } + } + --- "Test creation failures due to duplicate and conflicting field names": - do: diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java index 0fa0b0f0fccc..7e9e871a5d02 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformIT.java @@ -134,7 +134,7 @@ public class TransformIT extends TransformRestTestCase { indexName ).setPivotConfig(createPivotConfig(groups, aggs)) .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) - .setSettings(new SettingsConfig(null, null, null, false, null, null)) + .setSettings(new SettingsConfig(null, null, null, false, null, null, null)) .build(); putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); @@ -353,7 +353,7 @@ public class TransformIT extends TransformRestTestCase { ).setPivotConfig(createPivotConfig(groups, aggs)) .setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1))) // set requests per second and page size low enough to fail the test if update does not succeed, - .setSettings(new SettingsConfig(10, 1F, null, false, null, null)) + .setSettings(new SettingsConfig(10, 1F, null, false, null, null, null)) .build(); putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index 3c9b3f359602..adb538d125c8 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -73,6 +73,7 @@ import org.elasticsearch.xpack.core.transform.action.StopTransformAction; import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction; import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction; import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction; +import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.transform.action.TransportDeleteTransformAction; import org.elasticsearch.xpack.transform.action.TransportGetCheckpointAction; import org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction; @@ -131,16 +132,18 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa private final Settings settings; private final SetOnce transformServices = new SetOnce<>(); - public static final int DEFAULT_FAILURE_RETRIES = 10; public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500); public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000); - // How many times the transform task can retry on an non-critical failure + public static final int DEFAULT_FAILURE_RETRIES = 10; + // How many times the transform task can retry on a non-critical failure. + // This cluster-level setting is deprecated, the users should be using transform-level setting instead. + // In order to ensure BWC, this cluster-level setting serves as a fallback in case the transform-level setting is not specified. public static final Setting NUM_FAILURE_RETRIES_SETTING = Setting.intSetting( "xpack.transform.num_transform_failure_retries", DEFAULT_FAILURE_RETRIES, 0, - 100, + SettingsConfig.MAX_NUM_FAILURE_RETRIES, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index 8efef3210edc..5293acd23c06 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -55,6 +55,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -947,10 +948,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer context.getNumFailureRetries()) { + int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries()) + .orElse(context.getNumFailureRetries()); + if (numFailureRetries != -1 && context.getAndIncrementFailureCount() > numFailureRetries) { failIndexer( "task encountered more than " - + context.getNumFailureRetries() + + numFailureRetries + " failures; latest failure: " + ExceptionRootCauseFinder.getDetailedMessage(unwrappedException) ); diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index cabbcc341bd6..039c9c6ce2c6 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -309,7 +309,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null, null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null), null, null, null, @@ -384,7 +384,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null, null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null), null, null, null, @@ -448,7 +448,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(pageSize, null, (Boolean) null, null, null, null), + new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null), null, null, null, @@ -833,6 +833,93 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase { assertEquals(0, context.getFailureCount()); } + public void testHandleFailure() { + testHandleFailure(0, 5, 0); + testHandleFailure(5, 0, 5); + testHandleFailure(3, 5, 3); + testHandleFailure(5, 3, 5); + testHandleFailure(0, null, 0); + testHandleFailure(3, null, 3); + testHandleFailure(5, null, 5); + testHandleFailure(7, null, 7); + testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES); + testHandleFailure(null, 0, 0); + testHandleFailure(null, 3, 3); + testHandleFailure(null, 5, 5); + testHandleFailure(null, 7, 7); + testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES); + testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES); + } + + private void testHandleFailure( + Integer configNumFailureRetries, + Integer contextNumFailureRetries, + int expectedEffectiveNumFailureRetries + ) { + String transformId = randomAlphaOfLength(10); + TransformConfig config = new TransformConfig.Builder().setId(transformId) + .setSource(randomSourceConfig()) + .setDest(randomDestConfig()) + .setSyncConfig(new TimeSyncConfig("time", TimeSyncConfig.DEFAULT_DELAY)) + .setPivotConfig(randomPivotConfig()) + .setSettings(new SettingsConfig.Builder().setNumFailureRetries(configNumFailureRetries).build()) + .build(); + + AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); + Function searchFunction = request -> mock(SearchResponse.class); + Function bulkFunction = request -> mock(BulkResponse.class); + + final AtomicBoolean failIndexerCalled = new AtomicBoolean(false); + final AtomicReference failureMessage = new AtomicReference<>(); + Consumer failureConsumer = message -> { + failIndexerCalled.compareAndSet(false, true); + failureMessage.compareAndSet(null, message); + }; + + MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor(); + TransformContext.Listener contextListener = mock(TransformContext.Listener.class); + TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener); + if (contextNumFailureRetries != null) { + context.setNumFailureRetries(contextNumFailureRetries); + } + + MockedTransformIndexer indexer = createMockIndexer( + config, + state, + searchFunction, + bulkFunction, + null, + failureConsumer, + threadPool, + ThreadPool.Names.GENERIC, + auditor, + context + ); + + for (int i = 0; i <= expectedEffectiveNumFailureRetries; ++i) { + indexer.handleFailure(new Exception("exception no. " + (i + 1))); + assertFalse(failIndexerCalled.get()); + assertThat(failureMessage.get(), is(nullValue())); + assertThat(context.getFailureCount(), is(equalTo(i + 1))); + } + indexer.handleFailure(new Exception("exception no. " + (expectedEffectiveNumFailureRetries + 2))); + assertTrue(failIndexerCalled.get()); + assertThat( + failureMessage.get(), + is( + equalTo( + "task encountered more than " + + expectedEffectiveNumFailureRetries + + " failures; latest failure: exception no. " + + (expectedEffectiveNumFailureRetries + 2) + ) + ) + ); + assertThat(context.getFailureCount(), is(equalTo(expectedEffectiveNumFailureRetries + 2))); + + auditor.assertAllExpectationsMatched(); + } + private MockedTransformIndexer createMockIndexer( TransformConfig config, AtomicReference state, diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java index 6ad7895026d0..854648e53c0b 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerStateTests.java @@ -506,7 +506,7 @@ public class TransformIndexerStateTests extends ESTestCase { randomPivotConfig(), null, randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000), - new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null, null), + new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null, null, null), null, null, null, diff --git a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_config.schema.json b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_config.schema.json index 18f96b414648..3687f2601c88 100644 --- a/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_config.schema.json +++ b/x-pack/plugin/transform/src/test/resources/rest-api-spec/schema/transform_config.schema.json @@ -203,6 +203,11 @@ "title": "deduce mappings", "type": "boolean", "default": true + }, + "num_failure_retries": { + "$id": "#root/settings/num_failure_retries", + "title": "num failure retries", + "type": "integer" } } },