[7.16] [Transform] handle pit index not found error (#81368) (#81491)

Do not fail the transform if pit search fails with index not found as a result of an index that got deleted via ILM, 
if that index is part of a search that selects indices using a wildcard, e.g. logs-*. If pit search fails, the search 
is retried using search without a pit context. The 2nd search might fail if the source targets an explicit index. 
In addition the usage of the pit API can not be disabled by transform.

fixes elastic#81252
relates elastic#81256
This commit is contained in:
Hendrik Muhs 2021-12-08 09:44:56 +01:00 committed by GitHub
parent 5e0003fe66
commit 969e5655ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 372 additions and 40 deletions

View file

@ -26,6 +26,7 @@ public class SettingsConfig implements ToXContentObject {
private static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
private static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
private static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
private static final ParseField USE_PIT = new ParseField("use_point_in_time");
private static final int DEFAULT_MAX_PAGE_SEARCH_SIZE = -1;
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
@ -35,15 +36,19 @@ public class SettingsConfig implements ToXContentObject {
// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;
// use an integer as we need to code 4 states: true, false, null (unchanged), default (defined server side)
private static final int DEFAULT_USE_PIT = -1;
private final Integer maxPageSearchSize;
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer alignCheckpoints;
private final Integer usePit;
private static final ConstructingObjectParser<SettingsConfig, Void> PARSER = new ConstructingObjectParser<>(
"settings_config",
true,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4])
);
static {
@ -63,17 +68,25 @@ public class SettingsConfig implements ToXContentObject {
ALIGN_CHECKPOINTS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
PARSER.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0,
USE_PIT,
ValueType.BOOLEAN_OR_NULL
);
}
public static SettingsConfig fromXContent(final XContentParser parser) {
return PARSER.apply(parser, null);
}
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) {
SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints, Integer usePit) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
}
@Override
@ -107,6 +120,13 @@ public class SettingsConfig implements ToXContentObject {
builder.field(ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false);
}
}
if (usePit != null) {
if (usePit.equals(DEFAULT_USE_PIT)) {
builder.field(USE_PIT.getPreferredName(), (Boolean) null);
} else {
builder.field(USE_PIT.getPreferredName(), usePit > 0 ? true : false);
}
}
builder.endObject();
return builder;
}
@ -127,6 +147,10 @@ public class SettingsConfig implements ToXContentObject {
return alignCheckpoints != null ? alignCheckpoints > 0 : null;
}
public Boolean getUsePit() {
return usePit != null ? usePit > 0 : null;
}
@Override
public boolean equals(Object other) {
if (other == this) {
@ -140,12 +164,13 @@ public class SettingsConfig implements ToXContentObject {
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints);
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit);
}
@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
public static Builder builder() {
@ -157,6 +182,7 @@ public class SettingsConfig implements ToXContentObject {
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer alignCheckpoints;
private Integer usePit;
/**
* Sets the paging maximum paging maxPageSearchSize that transform can use when
@ -215,8 +241,23 @@ public class SettingsConfig implements ToXContentObject {
return this;
}
/**
* Whether the point in time API should be used for search.
* Point in time is a more resource friendly way to query. It is used by default. In case of problems
* you can disable the point in time API usage with this setting.
*
* An explicit `null` resets to default.
*
* @param usePit true if the point in time API should be used.
* @return the {@link Builder} with usePit set.
*/
public Builder setUsePit(Boolean usePit) {
this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0;
return this;
}
public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
}
}

View file

@ -31,6 +31,7 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1),
randomBoolean() ? null : randomIntBetween(-1, 1)
);
}
@ -74,6 +75,7 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
config = fromString("{\"dates_as_epoch_millis\" : null}");
assertFalse(config.getDatesAsEpochMillis());
@ -83,6 +85,7 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
config = fromString("{\"align_checkpoints\" : null}");
assertFalse(config.getAlignCheckpoints());
@ -92,6 +95,10 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertNull(settingsAsMap.getOrDefault("align_checkpoints", "not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
config = fromString("{\"use_point_in_time\" : null}");
assertFalse(config.getUsePit());
}
public void testExplicitNullOnWriteBuilder() throws IOException {
@ -104,6 +111,7 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
assertThat(settingsAsMap.getOrDefault("docs_per_second", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
SettingsConfig emptyConfig = new SettingsConfig.Builder().build();
assertNull(emptyConfig.getMaxPageSearchSize());
@ -121,6 +129,7 @@ public class SettingsConfigTests extends AbstractXContentTestCase<SettingsConfig
assertNull(settingsAsMap.getOrDefault("docs_per_second", "not_set"));
assertThat(settingsAsMap.getOrDefault("dates_as_epoch_millis", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("align_checkpoints", "not_set"), equalTo("not_set"));
assertThat(settingsAsMap.getOrDefault("use_point_in_time", "not_set"), equalTo("not_set"));
config = new SettingsConfig.Builder().setDatesAsEpochMillis(null).build();
// returns false, however it's `null` as in "use default", checked next

View file

@ -24,6 +24,7 @@ public class SettingsConfigTests extends AbstractResponseTestCase<
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
);
}
@ -36,6 +37,7 @@ public class SettingsConfigTests extends AbstractResponseTestCase<
assertEquals(serverTestInstance.getDocsPerSecond(), clientInstance.getDocsPerSecond());
assertEquals(serverTestInstance.getDatesAsEpochMillis(), clientInstance.getDatesAsEpochMillis());
assertEquals(serverTestInstance.getAlignCheckpoints(), clientInstance.getAlignCheckpoints());
assertEquals(serverTestInstance.getUsePit(), clientInstance.getUsePit());
}
@Override

View file

@ -39,6 +39,7 @@ public final class TransformField {
public static final ParseField DOCS_PER_SECOND = new ParseField("docs_per_second");
public static final ParseField DATES_AS_EPOCH_MILLIS = new ParseField("dates_as_epoch_millis");
public static final ParseField ALIGN_CHECKPOINTS = new ParseField("align_checkpoints");
public static final ParseField USE_PIT = new ParseField("use_point_in_time");
public static final ParseField FIELD = new ParseField("field");
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME = new ParseField("time");

View file

@ -38,12 +38,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private static final float DEFAULT_DOCS_PER_SECOND = -1F;
private static final int DEFAULT_DATES_AS_EPOCH_MILLIS = -1;
private static final int DEFAULT_ALIGN_CHECKPOINTS = -1;
private static final int DEFAULT_USE_PIT = -1;
private static ConstructingObjectParser<SettingsConfig, Void> createParser(boolean lenient) {
ConstructingObjectParser<SettingsConfig, Void> parser = new ConstructingObjectParser<>(
"transform_config_settings",
lenient,
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3])
args -> new SettingsConfig((Integer) args[0], (Float) args[1], (Integer) args[2], (Integer) args[3], (Integer) args[4])
);
parser.declareIntOrNull(optionalConstructorArg(), DEFAULT_MAX_PAGE_SEARCH_SIZE, TransformField.MAX_PAGE_SEARCH_SIZE);
parser.declareFloatOrNull(optionalConstructorArg(), DEFAULT_DOCS_PER_SECOND, TransformField.DOCS_PER_SECOND);
@ -61,6 +62,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
TransformField.ALIGN_CHECKPOINTS,
ValueType.BOOLEAN_OR_NULL
);
// this boolean requires 4 possible values: true, false, not_specified, default, therefore using a custom parser
parser.declareField(
optionalConstructorArg(),
p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? DEFAULT_USE_PIT : p.booleanValue() ? 1 : 0,
TransformField.USE_PIT,
ValueType.BOOLEAN_OR_NULL
);
return parser;
}
@ -68,25 +76,40 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private final Float docsPerSecond;
private final Integer datesAsEpochMillis;
private final Integer alignCheckpoints;
private final Integer usePit;
public SettingsConfig() {
this(null, null, (Integer) null, (Integer) null);
this(null, null, (Integer) null, (Integer) null, (Integer) null);
}
public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Boolean datesAsEpochMillis, Boolean alignCheckpoints) {
public SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Boolean datesAsEpochMillis,
Boolean alignCheckpoints,
Boolean usePit
) {
this(
maxPageSearchSize,
docsPerSecond,
datesAsEpochMillis == null ? null : datesAsEpochMillis ? 1 : 0,
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0
alignCheckpoints == null ? null : alignCheckpoints ? 1 : 0,
usePit == null ? null : usePit ? 1 : 0
);
}
public SettingsConfig(Integer maxPageSearchSize, Float docsPerSecond, Integer datesAsEpochMillis, Integer alignCheckpoints) {
public SettingsConfig(
Integer maxPageSearchSize,
Float docsPerSecond,
Integer datesAsEpochMillis,
Integer alignCheckpoints,
Integer usePit
) {
this.maxPageSearchSize = maxPageSearchSize;
this.docsPerSecond = docsPerSecond;
this.datesAsEpochMillis = datesAsEpochMillis;
this.alignCheckpoints = alignCheckpoints;
this.usePit = usePit;
}
public SettingsConfig(final StreamInput in) throws IOException {
@ -102,6 +125,11 @@ public class SettingsConfig implements Writeable, ToXContentObject {
} else {
this.alignCheckpoints = DEFAULT_ALIGN_CHECKPOINTS;
}
if (in.getVersion().onOrAfter(Version.V_7_16_1)) {
this.usePit = in.readOptionalInt();
} else {
this.usePit = DEFAULT_USE_PIT;
}
}
public Integer getMaxPageSearchSize() {
@ -128,6 +156,14 @@ public class SettingsConfig implements Writeable, ToXContentObject {
return alignCheckpoints;
}
public Boolean getUsePit() {
return usePit != null ? (usePit > 0) || (usePit == DEFAULT_USE_PIT) : null;
}
public Integer getUsePitForUpdate() {
return usePit;
}
public ActionRequestValidationException validate(ActionRequestValidationException validationException) {
if (maxPageSearchSize != null && (maxPageSearchSize < 10 || maxPageSearchSize > MultiBucketConsumerService.DEFAULT_MAX_BUCKETS)) {
validationException = addValidationError(
@ -154,6 +190,9 @@ public class SettingsConfig implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_7_15_0)) {
out.writeOptionalInt(alignCheckpoints);
}
if (out.getVersion().onOrAfter(Version.V_7_16_1)) {
out.writeOptionalInt(usePit);
}
}
@Override
@ -172,6 +211,9 @@ public class SettingsConfig implements Writeable, ToXContentObject {
if (alignCheckpoints != null && (alignCheckpoints.equals(DEFAULT_ALIGN_CHECKPOINTS) == false)) {
builder.field(TransformField.ALIGN_CHECKPOINTS.getPreferredName(), alignCheckpoints > 0 ? true : false);
}
if (usePit != null && (usePit.equals(DEFAULT_USE_PIT) == false)) {
builder.field(TransformField.USE_PIT.getPreferredName(), usePit > 0 ? true : false);
}
builder.endObject();
return builder;
}
@ -189,12 +231,13 @@ public class SettingsConfig implements Writeable, ToXContentObject {
return Objects.equals(maxPageSearchSize, that.maxPageSearchSize)
&& Objects.equals(docsPerSecond, that.docsPerSecond)
&& Objects.equals(datesAsEpochMillis, that.datesAsEpochMillis)
&& Objects.equals(alignCheckpoints, that.alignCheckpoints);
&& Objects.equals(alignCheckpoints, that.alignCheckpoints)
&& Objects.equals(usePit, that.usePit);
}
@Override
public int hashCode() {
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return Objects.hash(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
@Override
@ -211,6 +254,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
private Float docsPerSecond;
private Integer datesAsEpochMillis;
private Integer alignCheckpoints;
private Integer usePit;
/**
* Default builder
@ -227,6 +271,7 @@ public class SettingsConfig implements Writeable, ToXContentObject {
this.docsPerSecond = base.docsPerSecond;
this.datesAsEpochMillis = base.datesAsEpochMillis;
this.alignCheckpoints = base.alignCheckpoints;
this.usePit = base.usePit;
}
/**
@ -286,6 +331,21 @@ public class SettingsConfig implements Writeable, ToXContentObject {
return this;
}
/**
* Whether the point in time API should be used for search.
* Point in time is a more resource friendly way to query. It is used per default. In case of problems
* you can disable the point in time API usage with this setting.
*
* An explicit `null` resets to default.
*
* @param usePit true if the point in time API should be used.
* @return the {@link Builder} with usePit set.
*/
public Builder setUsePit(Boolean usePit) {
this.usePit = usePit == null ? DEFAULT_USE_PIT : usePit ? 1 : 0;
return this;
}
/**
* Update settings according to given settings config.
*
@ -313,12 +373,15 @@ public class SettingsConfig implements Writeable, ToXContentObject {
? null
: update.getAlignCheckpointsForUpdate();
}
if (update.getUsePitForUpdate() != null) {
this.usePit = update.getUsePitForUpdate().equals(DEFAULT_USE_PIT) ? null : update.getUsePitForUpdate();
}
return this;
}
public SettingsConfig build() {
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints);
return new SettingsConfig(maxPageSearchSize, docsPerSecond, datesAsEpochMillis, alignCheckpoints, usePit);
}
}
}

View file

@ -52,7 +52,12 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
*/
public class TransformConfig extends AbstractDiffable<TransformConfig> implements Writeable, ToXContentObject {
public static final Version CONFIG_VERSION_LAST_CHANGED = Version.V_7_15_0;
/**
* Version of the last time the config defaults have been changed.
* Whenever defaults change, we must re-write the config on update in a way it
* does not change behavior.
*/
public static final Version CONFIG_VERSION_LAST_DEFAULTS_CHANGED = Version.V_7_15_0;
public static final String NAME = "data_frame_transform_config";
public static final ParseField HEADERS = new ParseField("headers");
/** Version in which {@code FieldCapabilitiesRequest.runtime_fields} field was introduced. */
@ -591,7 +596,7 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
// quick check if a rewrite is required, if none found just return the original
// a failing quick check, does not mean a rewrite is necessary
if (transformConfig.getVersion() != null
&& transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_CHANGED)
&& transformConfig.getVersion().onOrAfter(CONFIG_VERSION_LAST_DEFAULTS_CHANGED)
&& (transformConfig.getPivotConfig() == null || transformConfig.getPivotConfig().getMaxPageSearchSize() == null)) {
return transformConfig;
}
@ -622,7 +627,8 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
maxPageSearchSize,
builder.getSettings().getDocsPerSecond(),
builder.getSettings().getDatesAsEpochMillis(),
builder.getSettings().getAlignCheckpoints()
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit()
)
);
}
@ -634,19 +640,21 @@ public class TransformConfig extends AbstractDiffable<TransformConfig> implement
builder.getSettings().getMaxPageSearchSize(),
builder.getSettings().getDocsPerSecond(),
true,
builder.getSettings().getAlignCheckpoints()
builder.getSettings().getAlignCheckpoints(),
builder.getSettings().getUsePit()
)
);
}
// 3. set align_checkpoints to false for transforms < 7.15 to keep BWC
if (builder.getVersion() != null && builder.getVersion().before(CONFIG_VERSION_LAST_CHANGED)) {
if (builder.getVersion() != null && builder.getVersion().before(Version.V_7_15_0)) {
builder.setSettings(
new SettingsConfig(
builder.getSettings().getMaxPageSearchSize(),
builder.getSettings().getDocsPerSecond(),
builder.getSettings().getDatesAsEpochMillis(),
false
false,
builder.getSettings().getUsePit()
)
);
}

View file

@ -34,12 +34,19 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
randomBoolean() ? null : randomIntBetween(10, 10_000),
randomBoolean() ? null : randomFloat(),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1),
randomBoolean() ? null : randomIntBetween(0, 1)
);
}
public static SettingsConfig randomNonEmptySettingsConfig() {
return new SettingsConfig(randomIntBetween(10, 10_000), randomFloat(), randomIntBetween(0, 1), randomIntBetween(0, 1));
return new SettingsConfig(
randomIntBetween(10, 10_000),
randomFloat(),
randomIntBetween(0, 1),
randomIntBetween(0, 1),
randomIntBetween(0, 1)
);
}
@Before
@ -82,6 +89,9 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
assertThat(fromString("{\"align_checkpoints\" : null}").getAlignCheckpointsForUpdate(), equalTo(-1));
assertNull(fromString("{}").getAlignCheckpointsForUpdate());
assertThat(fromString("{\"use_point_in_time\" : null}").getUsePitForUpdate(), equalTo(-1));
assertNull(fromString("{}").getUsePitForUpdate());
}
public void testUpdateUsingBuilder() throws IOException {
@ -89,7 +99,8 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
"{\"max_page_search_size\" : 10000, "
+ "\"docs_per_second\" :42, "
+ "\"dates_as_epoch_millis\": true, "
+ "\"align_checkpoints\": false}"
+ "\"align_checkpoints\": false,"
+ "\"use_point_in_time\": false}"
);
SettingsConfig.Builder builder = new SettingsConfig.Builder(config);
@ -99,25 +110,29 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1));
assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0));
assertThat(builder.build().getUsePitForUpdate(), equalTo(0));
builder.update(fromString("{\"max_page_search_size\" : null}"));
assertNull(builder.build().getMaxPageSearchSize());
assertThat(builder.build().getDocsPerSecond(), equalTo(42F));
assertThat(builder.build().getDatesAsEpochMillisForUpdate(), equalTo(1));
assertThat(builder.build().getAlignCheckpointsForUpdate(), equalTo(0));
assertThat(builder.build().getUsePitForUpdate(), equalTo(0));
builder.update(
fromString(
"{\"max_page_search_size\" : 77, "
+ "\"docs_per_second\" :null, "
+ "\"dates_as_epoch_millis\": null, "
+ "\"align_checkpoints\": null}"
+ "\"align_checkpoints\": null,"
+ "\"use_point_in_time\": null}"
)
);
assertThat(builder.build().getMaxPageSearchSize(), equalTo(77));
assertNull(builder.build().getDocsPerSecond());
assertNull(builder.build().getDatesAsEpochMillisForUpdate());
assertNull(builder.build().getAlignCheckpointsForUpdate());
assertNull(builder.build().getUsePitForUpdate());
}
public void testOmmitDefaultsOnWriteParser() throws IOException {
@ -151,6 +166,12 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
config = fromString("{\"use_point_in_time\" : null}");
assertThat(config.getUsePitForUpdate(), equalTo(-1));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
public void testOmmitDefaultsOnWriteBuilder() throws IOException {
@ -184,6 +205,12 @@ public class SettingsConfigTests extends AbstractSerializingTransformTestCase<Se
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
config = new SettingsConfig.Builder().setUsePit(null).build();
assertThat(config.getUsePitForUpdate(), equalTo(-1));
settingsAsMap = xContentToMap(config);
assertTrue(settingsAsMap.isEmpty());
}
private Map<String, Object> xContentToMap(ToXContent xcontent) throws IOException {

View file

@ -114,7 +114,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
TimeValue frequency = TimeValue.timeValueSeconds(10);
SyncConfig syncConfig = new TimeSyncConfig("time_field", TimeValue.timeValueSeconds(30));
String newDescription = "new description";
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true);
SettingsConfig settings = new SettingsConfig(4_000, 4_000.400F, true, true, true);
Map<String, Object> newMetadata = randomMetadata();
RetentionPolicyConfig retentionPolicyConfig = new TimeRetentionPolicyConfig("time_field", new TimeValue(60_000));
update = new TransformConfigUpdate(
@ -169,7 +169,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(4_000, null, (Boolean) null, null),
new SettingsConfig(4_000, null, (Boolean) null, null, null),
null,
null
);
@ -188,7 +188,7 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
null,
null,
null,
new SettingsConfig(null, 43.244F, (Boolean) null, null),
new SettingsConfig(null, 43.244F, (Boolean) null, null, null),
null,
null
);
@ -199,14 +199,32 @@ public class TransformConfigUpdateTests extends AbstractWireSerializingTransform
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
// now reset to default using the magic -1
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, null, (Boolean) null, null), null, null);
update = new TransformConfigUpdate(
null,
null,
null,
null,
null,
new SettingsConfig(-1, null, (Boolean) null, null, null),
null,
null
);
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertThat(updatedConfig.getSettings().getDocsPerSecond(), equalTo(43.244F));
assertThat(updatedConfig.getSettings().getDatesAsEpochMillis(), equalTo(config.getSettings().getDatesAsEpochMillis()));
assertThat(updatedConfig.getSettings().getAlignCheckpoints(), equalTo(config.getSettings().getAlignCheckpoints()));
update = new TransformConfigUpdate(null, null, null, null, null, new SettingsConfig(-1, -1F, (Boolean) null, null), null, null);
update = new TransformConfigUpdate(
null,
null,
null,
null,
null,
new SettingsConfig(-1, -1F, (Boolean) null, null, null),
null,
null
);
updatedConfig = update.apply(updatedConfig);
assertNull(updatedConfig.getSettings().getMaxPageSearchSize());
assertNull(updatedConfig.getSettings().getDocsPerSecond());

View file

@ -191,6 +191,12 @@
"title": "max page search size",
"type": "integer",
"default": 500
},
"use_point_in_time": {
"$id": "#root/settings/use_point_in_time",
"title": "use_point_in_time",
"type": "boolean",
"default": true
}
}
},

View file

@ -31,6 +31,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -42,6 +43,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.ClientHelper;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
@ -111,6 +113,19 @@ class ClientTransformIndexer extends TransformIndexer {
// TODO: move into context constructor
context.setShouldStopAtCheckpoint(shouldStopAtCheckpoint);
if (transformConfig.getSettings().getUsePit() != null) {
disablePit = transformConfig.getSettings().getUsePit() == false;
}
}
@Override
public void applyNewSettings(SettingsConfig newSettings) {
if (newSettings.getUsePit() != null) {
disablePit = newSettings.getUsePit() == false;
}
super.applyNewSettings(newSettings);
}
@Override
@ -508,6 +523,26 @@ class ClientTransformIndexer extends TransformIndexer {
);
return;
}
if (unwrappedException instanceof IndexNotFoundException && pit != null) {
/*
* gh#81252 pit API search request can fail if indices get deleted (by ILM)
* fall-back to normal search, the pit gets re-created (with an updated set of indices) on the next run
*
* Note: Due to BWC this needs to be kept until CCS support for < 8.1 is dropped
*/
namedPits.remove(name);
searchRequest.source().pointInTimeBuilder(null);
ClientHelper.executeWithHeadersAsync(
transformConfig.getHeaders(),
ClientHelper.TRANSFORM_ORIGIN,
client,
SearchAction.INSTANCE,
searchRequest,
listener
);
return;
}
listener.onFailure(e);
})
);

View file

@ -1112,8 +1112,16 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
// reduce the indexes to query to the ones that have changes
SearchRequest request = new SearchRequest(
// gh#77329 optimization turned off
TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, getNextCheckpoint()).toArray(new String[0])
/*
* gh#77329 optimization turned off, gh#81252 transform can fail if an index gets deleted during searches
*
* Until proper checkpoint searches (seq_id per shard) are possible, we have to query
* - all indices
* - resolve indices at search
*
* TransformCheckpoint.getChangedIndices(TransformCheckpoint.EMPTY, getNextCheckpoint()).toArray(new String[0])
*/
getConfig().getSource().getIndex()
);
request.allowPartialSearchResults(false) // shard failures should fail the request
@ -1155,7 +1163,16 @@ public abstract class TransformIndexer extends AsyncTwoPhaseIndexer<TransformInd
if (filter != null) {
filteredQuery.filter(filter);
}
request.indices(changeCollector.getIndicesToQuery(lastCheckpoint, nextCheckpoint).toArray(new String[0]));
/*
* gh#81252 transform can fail if an index gets deleted during searches
*
* Until proper checkpoint searches (seq_id per shard) are possible, we have to query
* - all indices
* - resolve indices at search time
*
* request.indices(changeCollector.getIndicesToQuery(lastCheckpoint, nextCheckpoint).toArray(new String[0]));
*/
request.indices(getConfig().getSource().getIndex());
} else {
request.indices(getConfig().getSource().getIndex());
}

View file

@ -148,7 +148,7 @@ public class TransformUpdaterTests extends ESTestCase {
TransformConfig minCompatibleConfig = TransformConfigTests.randomTransformConfig(
randomAlphaOfLengthBetween(1, 10),
TransformConfig.CONFIG_VERSION_LAST_CHANGED
TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED
);
transformConfigManager.putTransformConfiguration(minCompatibleConfig, ActionListener.wrap(r -> {}, e -> {}));
@ -177,7 +177,7 @@ public class TransformUpdaterTests extends ESTestCase {
);
assertConfiguration(listener -> transformConfigManager.getTransformConfiguration(minCompatibleConfig.getId(), listener), config -> {
assertNotNull(config);
assertEquals(TransformConfig.CONFIG_VERSION_LAST_CHANGED, config.getVersion());
assertEquals(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED, config.getVersion());
});
}
@ -189,7 +189,7 @@ public class TransformUpdaterTests extends ESTestCase {
VersionUtils.randomVersionBetween(
random(),
Version.V_7_2_0,
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED)
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED)
)
);
@ -274,7 +274,7 @@ public class TransformUpdaterTests extends ESTestCase {
VersionUtils.randomVersionBetween(
random(),
Version.V_7_2_0,
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_CHANGED)
VersionUtils.getPreviousVersion(TransformConfig.CONFIG_VERSION_LAST_DEFAULTS_CHANGED)
)
);

View file

@ -22,9 +22,11 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.Client;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchContextId;
@ -36,6 +38,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
@ -51,6 +54,7 @@ import org.elasticsearch.xpack.transform.persistence.IndexBasedTransformConfigMa
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -286,6 +290,92 @@ public class ClientTransformIndexerTests extends ESTestCase {
}
}
public void testDisablePit() throws InterruptedException {
TransformConfig config = TransformConfigTests.randomTransformConfig();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();
try (PitMockClient client = new PitMockClient(getTestName(), true)) {
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
mock(SchedulerEngine.class)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
client,
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
if (pitEnabled) {
assertEquals("the_pit_id+", response.pointInTimeId());
} else {
assertNull(response.pointInTimeId());
}
});
// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());
this.<SearchResponse>assertAsync(listener -> indexer.doNextSearch(0, listener), response -> {
if (pitEnabled) {
assertNull(response.pointInTimeId());
} else {
assertEquals("the_pit_id+", response.pointInTimeId());
}
});
}
}
public void testHandlePitIndexNotFound() throws InterruptedException {
// simulate a deleted index due to ILM
try (PitMockClient client = new PitMockClient(getTestName(), true)) {
ClientTransformIndexer indexer = createTestIndexer(client);
SearchRequest searchRequest = new SearchRequest("deleted-index");
searchRequest.source().pointInTimeBuilder(new PointInTimeBuilder("the_pit_id"));
Tuple<String, SearchRequest> namedSearchRequest = new Tuple<>("test-handle-pit-index-not-found", searchRequest);
this.<SearchResponse>assertAsync(listener -> indexer.doSearch(namedSearchRequest, listener), response -> {
// if the pit got deleted, we know it retried
assertNull(response.pointInTimeId());
});
}
// simulate a deleted index that is essential, search must fail (after a retry without pit)
try (PitMockClient client = new PitMockClient(getTestName(), true)) {
ClientTransformIndexer indexer = createTestIndexer(client);
SearchRequest searchRequest = new SearchRequest("essential-deleted-index");
searchRequest.source().pointInTimeBuilder(new PointInTimeBuilder("the_pit_id"));
Tuple<String, SearchRequest> namedSearchRequest = new Tuple<>("test-handle-pit-index-not-found", searchRequest);
indexer.doSearch(namedSearchRequest, ActionListener.wrap(r -> fail("expected a failure, got response"), e -> {
assertTrue(e instanceof IndexNotFoundException);
assertEquals("no such index [essential-deleted-index]", e.getMessage());
}));
}
}
private static class MockClientTransformIndexer extends ClientTransformIndexer {
MockClientTransformIndexer(
@ -366,6 +456,17 @@ public class ClientTransformIndexerTests extends ESTestCase {
} else if (request instanceof SearchRequest) {
SearchRequest searchRequest = (SearchRequest) request;
// if pit is used and deleted-index is given throw index not found
if (searchRequest.pointInTimeBuilder() != null && Arrays.binarySearch(searchRequest.indices(), "deleted-index") >= 0) {
listener.onFailure(new IndexNotFoundException("deleted-index"));
return;
}
if (Arrays.binarySearch(searchRequest.indices(), "essential-deleted-index") >= 0) {
listener.onFailure(new IndexNotFoundException("essential-deleted-index"));
return;
}
// throw search context missing for the 4th run
if (searchRequest.pointInTimeBuilder() != null
&& "the_pit_id+++".equals(searchRequest.pointInTimeBuilder().getEncodedId())) {
@ -419,6 +520,10 @@ public class ClientTransformIndexerTests extends ESTestCase {
}
private ClientTransformIndexer createTestIndexer() {
return createTestIndexer(null);
}
private ClientTransformIndexer createTestIndexer(Client client) {
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.executor("generic")).thenReturn(mock(ExecutorService.class));
@ -433,9 +538,9 @@ public class ClientTransformIndexerTests extends ESTestCase {
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
mock(Client.class),
client == null ? mock(Client.class) : client,
mock(TransformIndexerStats.class),
mock(TransformConfig.class),
TransformConfigTests.randomTransformConfig(),
null,
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 0L, Collections.emptyMap(), Instant.now().toEpochMilli()),
new TransformCheckpoint("transform", Instant.now().toEpochMilli(), 2L, Collections.emptyMap(), Instant.now().toEpochMilli()),

View file

@ -309,7 +309,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null),
null,
null,
null,
@ -384,7 +384,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null),
null,
null,
null,
@ -448,7 +448,7 @@ public class TransformIndexerFailureHandlingTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(pageSize, null, (Boolean) null, null),
new SettingsConfig(pageSize, null, (Boolean) null, null, null),
null,
null,
null,

View file

@ -506,7 +506,7 @@ public class TransformIndexerStateTests extends ESTestCase {
randomPivotConfig(),
null,
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000),
new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null),
new SettingsConfig(null, Float.valueOf(1.0f), (Boolean) null, (Boolean) null, null),
null,
null,
null,