[Transform] Implement per-transform num_failure_retries setting. (#87361)

This commit is contained in:
Przemysław Witek 2022-06-09 15:22:06 +02:00 committed by GitHub
parent e6f5cd3d9b
commit 8656a29675
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 404 additions and 54 deletions

View file

@ -0,0 +1,6 @@
pr: 87361
summary: "Implement per-transform num_failure_retries setting"
area: Transform
type: enhancement
issues: []

View file

@ -1056,6 +1056,13 @@ adjusted to a lower value. The minimum value is `10` and the maximum is `65,536`
The default value is `500`.
end::transform-settings-max-page-search-size[]
tag::transform-settings-num-failure-retries[]
Defines the number of retries on a recoverable failure before the {transform} task is marked as `failed`.
The minimum value is `0` and the maximum is `100`.
`-1` can be used to denote infinity. In this case, the {transform} never gives up on retrying a recoverable failure.
The default value is the cluster-level setting `num_transform_failure_retries`.
end::transform-settings-num-failure-retries[]
tag::transform-sort[]
Specifies the date field that is used to identify the latest documents.
end::transform-sort[]

View file

@ -37,3 +37,5 @@ retries when it experiences a non-fatal error. Once the number of retries is
exhausted, the {transform} task is marked as `failed`. The default value is `10`
with a valid minimum of `0` and maximum of `100`. If a {transform} is already
running, it has to be restarted to use the changed setting.
The `num_failure_retries` setting can also be specified on an individual {transform} level.
Specifying this setting for each {transform} individually is recommended.

View file

@ -215,6 +215,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
`num_failure_retries`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries]
====
//End settings

View file

@ -157,6 +157,9 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-ded
`max_page_search_size`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-max-page-search-size]
`num_failure_retries`:::
(Optional, integer)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-settings-num-failure-retries]
====
//End settings

View file

@ -250,7 +250,7 @@ public abstract class AbstractObjectParser<Value, Context> {
}
/**
* Declare a double field that parses explicit {@code null}s in the json to a default value.
* Declare an integer field that parses explicit {@code null}s in the json to a default value.
*/
public void declareIntOrNull(BiConsumer<Value, Integer> consumer, int nullValue, ParseField field) {
declareField(

View file

@ -41,6 +41,7 @@ public final class TransformField {
public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField DEDUCE_MAPPINGS = new ParseField("deduce_mappings");
public static final ParseField NUM_FAILURE_RETRIES = new ParseField("num_failure_retries");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");

View file

@ -34,12 +34,15 @@ public class SettingsConfig implements Writeable, ToXContentObject {
public static final ConstructingObjectParser<SettingsConfig, Void> STRICT_PARSER = createParser(false);
public static final ConstructingObjectParser<SettingsConfig, Void> LENIENT_PARSER = createParser(true);
public static final int MAX_NUM_FAILURE_RETRIES = 100;
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;
private static final int DEFAULT_USE_PIT = -1;
private static final int DEFAULT_DEDUCE_MAPPINGS = -1;
private static final int DEFAULT_NUM_FAILURE_RETRIES = -2;
private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
@ -51,7 +54,8 @@ public class SettingsConfig implements Writeable, ToXContentObject {
(Integer) args[2],
(Integer) args[3],
(Integer) args[4],
(Integer) args[5]
(Integer) args[5],
(Integer) args[6]
)
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
@ -84,6 +88,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
TransformField.DEDUCE_MAPPINGS,
ValueType.BOOLEAN_OR_NULL
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_NUM_FAILURE_RETRIES, TransformField.NUM_FAILURE_RETRIES);
return parser;
}
@ -93,9 +98,10 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private final Integer alignCheckpoints;
private final Integer usePit;
private final Integer deduceMappings;
private final Integer numFailureRetries;
public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
this(null, null, (Integer) null, (Integer) null, (Integer) null, (Integer) null, (Integer) null);
}
public SettingsConfig(
@ -104,7 +110,8 @@ public class SettingsConfig implements Writeable, ToXContentObject {
Boolean datesAsEpochMillis,
Boolean alignCheckpoints,
Boolean usePit,
Boolean deduceMappings
Boolean deduceMappings,
Integer numFailureRetries
) {
this(
maxPageSearchSize,
@ -112,17 +119,19 @@ public class SettingsConfig implements Writeable, ToXContentObject {
datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0,
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0,
usePit == null ? null : usePit ? 1 : 0,
deduceMappings == null ? null : deduceMappings ? 1 : 0
deduceMappings == null ? null : deduceMappings ? 1 : 0,
numFailureRetries
);
}
public SettingsConfig(
SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit,
Integer deduceMappings
Integer deduceMappings,
Integer numFailureRetries
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
@ -130,6 +139,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
this.deduceMappings = deduceMappings;
this.numFailureRetries = numFailureRetries;
}
public SettingsConfig(final StreamInput in) throws IOException {
@ -155,6 +165,11 @@ public class SettingsConfig implements Writeable, ToXContentObject {
} else {
deduceMappings = DEFAULT_DEDUCE_MAPPINGS;
}
if (in.getVersion().onOrAfter(Version.V_8_4_0)) {
numFailureRetries = in.readOptionalInt();
} else {
numFailureRetries = null;
}
}
public Integer getMaxPageSearchSize() {
@ -197,6 +212,14 @@ public class SettingsConfig implements Writeable, ToXContentObject {
return deduceMappings;
}
public Integer getNumFailureRetries() {
return numFailureRetries != null ? (numFailureRetries == DEFAULT_NUM_FAILURE_RETRIES ? null : numFailureRetries) : null;
}
public Integer getNumFailureRetriesForUpdate() {
return numFailureRetries;
}
public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
@ -207,7 +230,15 @@ public class SettingsConfig implements Writeable, ToXContentObject {
validationException
);
}
if (numFailureRetries != null && (numFailureRetries < -1 || numFailureRetries > MAX_NUM_FAILURE_RETRIES)) {
validationException = addValidationError(
"settings.num_failure_retries ["
+ numFailureRetries
+ "] is out of range. The minimum value is -1 (infinity) and the maximum is "
+ MAX_NUM_FAILURE_RETRIES,
validationException
);
}
return validationException;
}
@ -229,6 +260,9 @@ public class SettingsConfig implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeOptionalInt(deduceMappings);
}
if (out.getVersion().onOrAfter(Version.V_8_4_0)) {
out.writeOptionalInt(numFailureRetries);
}
}
@Override
@ -253,6 +287,9 @@ public class SettingsConfig implements Writeable, ToXContentObject {
if (deduceMappings != null && (deduceMappings.equals(DEFAULT_DEDUCE_MAPPINGS) == false)) {
builder.field(TransformField.DEDUCE_MAPPINGS.getPreferredName(), deduceMappings > 0 ? true : false);
}
if (numFailureRetries != null && (numFailureRetries.equals(DEFAULT_NUM_FAILURE_RETRIES) == false)) {
builder.field(TransformField.NUM_FAILURE_RETRIES.getPreferredName(), numFailureRetries);
}
builder.endObject();
return builder;
}
@ -272,12 +309,21 @@ public class SettingsConfig implements Writeable, ToXContentObject {
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit)
&& Objects.equals(deduceMappings, that.deduceMappings);
&& Objects.equals(deduceMappings, that.deduceMappings)
&& Objects.equals(numFailureRetries, that.numFailureRetries);
}
@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
return Objects.hash(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis,
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
);
}
@Override
@ -296,6 +342,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private Integer alignCheckpoints;
private Integer usePit;
private Integer deduceMappings;
private Integer numFailureRetries;
/**
* Default builder
@ -314,6 +361,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
this.alignCheckpoints = base.alignCheckpoints;
this.usePit = base.usePit;
this.deduceMappings = base.deduceMappings;
this.numFailureRetries = base.numFailureRetries;
}
/**
@ -402,6 +450,11 @@ public class SettingsConfig implements Writeable, ToXContentObject {
return this;
}
public Builder setNumFailureRetries(Integer numFailureRetries) {
this.numFailureRetries = numFailureRetries == null ? DEFAULT_NUM_FAILURE_RETRIES : numFailureRetries;
return this;
}
/**
* Update settings according to given settings config.
*
@ -437,12 +490,25 @@ public class SettingsConfig implements Writeable, ToXContentObject {
? null
: update.getDeduceMappingsForUpdate();
}
if (update.getNumFailureRetriesForUpdate() != null) {
this.numFailureRetries = update.getNumFailureRetriesForUpdate().equals(DEFAULT_NUM_FAILURE_RETRIES)
? null
: update.getNumFailureRetriesForUpdate();
}
return this;
}
public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit, deduceMappings);
return new SettingsConfig(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis,
alignCheckpoints,
usePit,
deduceMappings,
numFailureRetries
);
}
}
}

View file

@ -623,7 +623,8 @@ public class TransformConfig implements SimpleDiffable<TransformConfig>, Writeab
builder.getSettings().getDatesAsEpochMillis(),
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}
@ -637,7 +638,8 @@ public class TransformConfig implements SimpleDiffable<TransformConfig>, Writeab
true,
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}
@ -651,7 +653,8 @@ public class TransformConfig implements SimpleDiffable<TransformConfig>, Writeab
builder.getSettings().getDatesAsEpochMillis(),
false,
builder.getSettings().getUsePit(),
builder.getSettings().getDeduceMappings()
builder.getSettings().getDeduceMappings(),
builder.getSettings().getNumFailureRetries()
)
);
}

View file

@ -23,7 +23,10 @@ import org.junit.Before;
import java.io.IOException;
import java.util.Map;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
public class SettingsConfigTests extends AbstractSerializingTransformTestCase<SettingsConfig> {
@ -36,7 +39,8 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(-1, 100)
);
}
@ -47,7 +51,8 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
randomIntBetween(0, 1),
randomIntBetween(0, 1),
randomIntBetween(0, 1),
randomIntBetween(0, 1)
randomIntBetween(0, 1),
randomIntBetween(-1, 100)
);
}
@ -97,31 +102,35 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
assertThat(fromString("{\"deduce_mappings\" : null}").getDeduceMappingsForUpdate(), equalTo(-1));
assertNull(fromString("{}").getDeduceMappingsForUpdate());
assertNull(fromString("{\"num_failure_retries\" : null}").getNumFailureRetries());
assertThat(fromString("{\"num_failure_retries\" : null}").getNumFailureRetriesForUpdate(), equalTo(-2));
assertNull(fromString("{}").getNumFailureRetries());
assertNull(fromString("{}").getNumFailureRetriesForUpdate());
}
public void testUpdateUsingBuilder() throws IOException {
public void testUpdateMaxPageSearchSizeUsingBuilder() throws IOException {
SettingsConfig config = fromString(
"{\"max_page_search_size\" : 10000, "
+ "\"docs_per_second\" :42, "
"{\"max_page_search_size\": 10000, "
+ "\"docs_per_second\": 42, "
+ "\"dates_as_epoch_millis\": true, "
+ "\"align_checkpoints\": false,"
+ "\"use_point_in_time\": false,"
+ "\"deduce_mappings\": false}"
+ "\"deduce_mappings\": false,"
+ "\"num_failure_retries\": 5}"
);
SettingsConfig.Builder builder = new SettingsConfig.Builder(config);
builder.update(fromString("{\"max_page_search_size\" : 100}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, 5))));
assertThat(builder.build().getMaxPageSearchSize(), equalTo(100));
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
builder.update(fromString("{\"max_page_search_size\": 100}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(100, 42F, true, false, false, false, 5))));
assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1));
assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0));
assertThat(builder.build().getUsePitForUpdate(), equalTo(0));
assertThat(builder.build().getDeduceMappingsForUpdate(), equalTo(0));
builder.update(fromString("{\"max_page_search_size\" : null}"));
assertNull(builder.build().getMaxPageSearchSize());
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
builder.update(fromString("{\"max_page_search_size\": null}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(null, 42F, true, false, false, false, 5))));
assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1));
assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0));
assertThat(builder.build().getUsePitForUpdate(), equalTo(0));
@ -129,22 +138,48 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
builder.update(
fromString(
"{\"max_page_search_size\" : 77, "
+ "\"docs_per_second\" :null, "
"{\"max_page_search_size\": 77, "
+ "\"docs_per_second\": null, "
+ "\"dates_as_epoch_millis\": null, "
+ "\"align_checkpoints\": null,"
+ "\"use_point_in_time\": null,"
+ "\"deduce_mappings\": null}"
+ "\"deduce_mappings\": null,"
+ "\"num_failure_retries\": null}"
)
);
assertThat(builder.build().getMaxPageSearchSize(), equalTo(77));
assertNull(builder.build().getDocsPerSecond());
assertThat(builder.build(), is(equalTo(new SettingsConfig(77, null, (Boolean) null, null, null, null, null))));
assertNull(builder.build().getDatesAsEpochMillisForUpdate());
assertNull(builder.build().getAlignCheckpointsForUpdate());
assertNull(builder.build().getUsePitForUpdate());
assertNull(builder.build().getDeduceMappingsForUpdate());
}
public void testUpdateNumFailureRetriesUsingBuilder() throws IOException {
SettingsConfig config = fromString(
"{\"max_page_search_size\": 10000, "
+ "\"docs_per_second\": 42, "
+ "\"dates_as_epoch_millis\": true, "
+ "\"align_checkpoints\": false,"
+ "\"use_point_in_time\": false,"
+ "\"deduce_mappings\": false,"
+ "\"num_failure_retries\": 5}"
);
SettingsConfig.Builder builder = new SettingsConfig.Builder(config);
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, 5))));
builder.update(fromString("{\"num_failure_retries\": 6}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, 6))));
builder.update(fromString("{\"num_failure_retries\": -1}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, -1))));
builder.update(fromString("{\"num_failure_retries\": null}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, null))));
builder.update(fromString("{\"num_failure_retries\": 55}"));
assertThat(builder.build(), is(equalTo(new SettingsConfig(10000, 42F, true, false, false, false, 55))));
}
public void testOmmitDefaultsOnWriteParser() throws IOException {
// test that an explicit null is handled differently than not set
SettingsConfig config = fromString("{\"max_page_search_size\" : null}");
@ -188,6 +223,13 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
config = fromString("{\"num_failure_retries\" : null}");
assertThat(config.getNumFailureRetries(), nullValue());
assertThat(config.getNumFailureRetriesForUpdate(), equalTo(-2));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
public void testOmmitDefaultsOnWriteBuilder() throws IOException {
@ -233,6 +275,89 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
config = new SettingsConfig.Builder().setNumFailureRetries(null).build();
assertThat(config.getNumFailureRetries(), nullValue());
assertThat(config.getNumFailureRetriesForUpdate(), equalTo(-2));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
public void testValidateMaxPageSearchSize() {
SettingsConfig config = new SettingsConfig.Builder().build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setMaxPageSearchSize(null).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.max_page_search_size [-1] is out of range. The minimum value is 10 and the maximum is 65536")
);
config = new SettingsConfig.Builder().setMaxPageSearchSize(-2).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.max_page_search_size [-2] is out of range. The minimum value is 10 and the maximum is 65536")
);
config = new SettingsConfig.Builder().setMaxPageSearchSize(-1).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.max_page_search_size [-1] is out of range. The minimum value is 10 and the maximum is 65536")
);
config = new SettingsConfig.Builder().setMaxPageSearchSize(0).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.max_page_search_size [0] is out of range. The minimum value is 10 and the maximum is 65536")
);
config = new SettingsConfig.Builder().setMaxPageSearchSize(10).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setMaxPageSearchSize(65536).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setMaxPageSearchSize(65537).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.max_page_search_size [65537] is out of range. The minimum value is 10 and the maximum is 65536")
);
}
public void testValidateNumFailureRetries() {
SettingsConfig config = new SettingsConfig.Builder().build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setNumFailureRetries(null).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.num_failure_retries [-2] is out of range. The minimum value is -1 (infinity) and the maximum is 100")
);
config = new SettingsConfig.Builder().setNumFailureRetries(-2).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.num_failure_retries [-2] is out of range. The minimum value is -1 (infinity) and the maximum is 100")
);
config = new SettingsConfig.Builder().setNumFailureRetries(-1).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setNumFailureRetries(0).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setNumFailureRetries(1).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setNumFailureRetries(100).build();
assertThat(config.validate(null), is(nullValue()));
config = new SettingsConfig.Builder().setNumFailureRetries(101).build();
assertThat(
config.validate(null).validationErrors(),
contains("settings.num_failure_retries [101] is out of range. The minimum value is -1 (infinity) and the maximum is 100")
);
}
private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {

View file

@ -115,7 +115,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
TimeValue frequency = TimeValue.timeValueSeconds(10);
SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30));
String newDescription = "new description";
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true, true);
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true, true, 10);
Map<String, Object> newMetadata = randomMetadata();
RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000));
update = new TransformConfigUpdate(
@ -204,7 +204,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(4_000, null, (Boolean) null, null, null, null),
new SettingsConfig(4_000, null, (Boolean) null, null, null, null, null),
null,
null
);
@ -223,7 +223,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(null, 43.244F, (Boolean) null, null, null, null),
new SettingsConfig(null, 43.244F, (Boolean) null, null, null, null, null),
null,
null
);
@ -240,7 +240,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(-1, null, (Boolean) null, null, null, null),
new SettingsConfig(-1, null, (Boolean) null, null, null, null, null),
null,
null
);
@ -256,7 +256,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(-1, -1F, (Boolean) null, null, null, null),
new SettingsConfig(-1, -1F, (Boolean) null, null, null, null, null),
null,
null
);

View file

@ -508,6 +508,42 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test put config with invalid number of failure retries":
- do:
catch: /settings\.num_failure_retries \[-2\] is out of range. The minimum value is -1 \(infinity\) and the maximum is 100/
transform.put_transform:
transform_id: "airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-dest-index" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"settings": {
"num_failure_retries": -2
}
}
- do:
catch: /settings\.num_failure_retries \[101\] is out of range. The minimum value is -1 \(infinity\) and the maximum is 100/
transform.put_transform:
transform_id: "airline-transform"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-dest-index" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"settings": {
"num_failure_retries": 101
}
}
---
"Test creation failures due to duplicate and conflicting field names":
- do:

View file

@ -134,7 +134,7 @@ public class TransformIT extends TransformRestTestCase {
indexName
).setPivotConfig(createPivotConfig(groups, aggs))
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
.setSettings(new SettingsConfig(null, null, null, false, null, null))
.setSettings(new SettingsConfig(null, null, null, false, null, null, null))
.build();
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);
@ -353,7 +353,7 @@ public class TransformIT extends TransformRestTestCase {
).setPivotConfig(createPivotConfig(groups, aggs))
.setSyncConfig(new TimeSyncConfig("timestamp", TimeValue.timeValueSeconds(1)))
// set requests per second and page size low enough to fail the test if update does not succeed,
.setSettings(new SettingsConfig(10, 1F, null, false, null, null))
.setSettings(new SettingsConfig(10, 1F, null, false, null, null, null))
.build();
putTransform(transformId, Strings.toString(config), RequestOptions.DEFAULT);

View file

@ -73,6 +73,7 @@ import org.elasticsearch.xpack.core.transform.action.StopTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpdateTransformAction;
import org.elasticsearch.xpack.core.transform.action.UpgradeTransformsAction;
import org.elasticsearch.xpack.core.transform.action.ValidateTransformAction;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.transform.action.TransportDeleteTransformAction;
import org.elasticsearch.xpack.transform.action.TransportGetCheckpointAction;
import org.elasticsearch.xpack.transform.action.TransportGetCheckpointNodeAction;
@ -131,16 +132,18 @@ public class Transform extends Plugin implements SystemIndexPlugin, PersistentTa
private final Settings settings;
private final SetOnce<TransformServices> transformServices = new SetOnce<>();
public static final int DEFAULT_FAILURE_RETRIES = 10;
public static final Integer DEFAULT_INITIAL_MAX_PAGE_SEARCH_SIZE = Integer.valueOf(500);
public static final TimeValue DEFAULT_TRANSFORM_FREQUENCY = TimeValue.timeValueMillis(60000);
// How many times the transform task can retry on an non-critical failure
public static final int DEFAULT_FAILURE_RETRIES = 10;
// How many times the transform task can retry on a non-critical failure.
// This cluster-level setting is deprecated, the users should be using transform-level setting instead.
// In order to ensure BWC, this cluster-level setting serves as a fallback in case the transform-level setting is not specified.
public static final Setting<Integer> NUM_FAILURE_RETRIES_SETTING = Setting.intSetting(
"xpack.transform.num_transform_failure_retries",
DEFAULT_FAILURE_RETRIES,
0,
100,
SettingsConfig.MAX_NUM_FAILURE_RETRIES,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

View file

@ -55,6 +55,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -947,10 +948,12 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
return;
}
if (context.getAndIncrementFailureCount() > context.getNumFailureRetries()) {
int numFailureRetries = Optional.ofNullable(transformConfig.getSettings().getNumFailureRetries())
.orElse(context.getNumFailureRetries());
if (numFailureRetries != -1 && context.getAndIncrementFailureCount() > numFailureRetries) {
failIndexer(
"task encountered more than "
+ context.getNumFailureRetries()
+ numFailureRetries
+ " failures; latest failure: "
+ ExceptionRootCauseFinder.getDetailedMessage(unwrappedException)
);

View file

@ -309,7 +309,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null),
null,
null,
null,
@ -384,7 +384,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null),
null,
null,
null,
@ -448,7 +448,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null, null, null),
null,
null,
null,
@ -833,6 +833,93 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
assertEquals(0, context.getFailureCount());
}
public void testHandleFailure() {
testHandleFailure(0, 5, 0);
testHandleFailure(5, 0, 5);
testHandleFailure(3, 5, 3);
testHandleFailure(5, 3, 5);
testHandleFailure(0, null, 0);
testHandleFailure(3, null, 3);
testHandleFailure(5, null, 5);
testHandleFailure(7, null, 7);
testHandleFailure(Transform.DEFAULT_FAILURE_RETRIES, null, Transform.DEFAULT_FAILURE_RETRIES);
testHandleFailure(null, 0, 0);
testHandleFailure(null, 3, 3);
testHandleFailure(null, 5, 5);
testHandleFailure(null, 7, 7);
testHandleFailure(null, Transform.DEFAULT_FAILURE_RETRIES, Transform.DEFAULT_FAILURE_RETRIES);
testHandleFailure(null, null, Transform.DEFAULT_FAILURE_RETRIES);
}
private void testHandleFailure(
Integer configNumFailureRetries,
Integer contextNumFailureRetries,
int expectedEffectiveNumFailureRetries
) {
String transformId = randomAlphaOfLength(10);
TransformConfig config = new TransformConfig.Builder().setId(transformId)
.setSource(randomSourceConfig())
.setDest(randomDestConfig())
.setSyncConfig(new TimeSyncConfig("time", TimeSyncConfig.DEFAULT_DELAY))
.setPivotConfig(randomPivotConfig())
.setSettings(new SettingsConfig.Builder().setNumFailureRetries(configNumFailureRetries).build())
.build();
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
Function<SearchRequest, SearchResponse> searchFunction = request -> mock(SearchResponse.class);
Function<BulkRequest, BulkResponse> bulkFunction = request -> mock(BulkResponse.class);
final AtomicBoolean failIndexerCalled = new AtomicBoolean(false);
final AtomicReference<String> failureMessage = new AtomicReference<>();
Consumer<String> failureConsumer = message -> {
failIndexerCalled.compareAndSet(false, true);
failureMessage.compareAndSet(null, message);
};
MockTransformAuditor auditor = MockTransformAuditor.createMockAuditor();
TransformContext.Listener contextListener = mock(TransformContext.Listener.class);
TransformContext context = new TransformContext(TransformTaskState.STARTED, "", 0, contextListener);
if (contextNumFailureRetries != null) {
context.setNumFailureRetries(contextNumFailureRetries);
}
MockedTransformIndexer indexer = createMockIndexer(
config,
state,
searchFunction,
bulkFunction,
null,
failureConsumer,
threadPool,
ThreadPool.Names.GENERIC,
auditor,
context
);
for (int i = 0; i <= expectedEffectiveNumFailureRetries; ++i) {
indexer.handleFailure(new Exception("exception no. " + (i + 1)));
assertFalse(failIndexerCalled.get());
assertThat(failureMessage.get(), is(nullValue()));
assertThat(context.getFailureCount(), is(equalTo(i + 1)));
}
indexer.handleFailure(new Exception("exception no. " + (expectedEffectiveNumFailureRetries + 2)));
assertTrue(failIndexerCalled.get());
assertThat(
failureMessage.get(),
is(
equalTo(
"task encountered more than "
+ expectedEffectiveNumFailureRetries
+ " failures; latest failure: exception no. "
+ (expectedEffectiveNumFailureRetries + 2)
)
)
);
assertThat(context.getFailureCount(), is(equalTo(expectedEffectiveNumFailureRetries + 2)));
auditor.assertAllExpectationsMatched();
}
private MockedTransformIndexer createMockIndexer(
TransformConfig config,
AtomicReference<IndexerState> state,

View file

@ -506,7 +506,7 @@ public class TransformIndexerStateTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null, null),
new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null, null, null),
null,
null,
null,

View file

@ -203,6 +203,11 @@
"title": "deduce mappings",
"type": "boolean",
"default": true
},
"num_failure_retries": {
"$id": "#root/settings/num_failure_retries",
"title": "num failure retries",
"type": "integer"
}
}
},