[Transform] add a health section to transform stats (#90760)

adds a health section to the transform stats endpoint and implements reporting assignment, indexing/search and persistence problems, together with a overall health state.
This commit is contained in:
Hendrik Muhs 2022-10-25 09:01:21 +02:00 committed by GitHub
parent d3a781c460
commit 82a71f6ef6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 816 additions and 247 deletions

View file

@ -0,0 +1,5 @@
pr: 90760
summary: Add a health section to transform stats
area: Transform
type: enhancement
issues: []

View file

@ -142,6 +142,53 @@ that the {transform} is failing to keep up.
====
//End checkpointing
//Begin health
`health`::
(object) Health indicator for this {transform}.
+
.Properties of `health`
[%collapsible%open]
====
`status`::
(string) Health status of this transform. Statuses are:
`green`:::
The transform is healthy.
`unknown`:::
The health of the transform could not be determined.
`yellow`:::
The functionality of the transform is in a degraded state and may need remediation
to avoid the health becoming `red`.
`red`:::
The transform is experiencing an outage or is unavailable for use.
`issues`::
(Optional, array) If a non-healthy status is returned, contains a list of issues
of the transform.
+
.Properties of `issues`
[%collapsible%open]
========
`issue`::
(string) A description of the issue.
`details`::
(Optional, string) Details about the issue.
`count`::
(integer) Number of times the issue has occured since it started.
`first_occurrence`::
(Optional, date) The timestamp this issue occured for the first time.
========
//End issues
====
//End health
`id`::
(string)
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=transform-id]
@ -328,6 +375,9 @@ The API returns the following results:
"time_upper_bound_millis" : 1585344498220
},
"changes_last_detected_at" : 1585344558219
},
"health": {
"status": "green"
}
}
]

View file

@ -0,0 +1,91 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
public class TransformHealth implements Writeable, ToXContentObject {
public static final TransformHealth GREEN = new TransformHealth(HealthStatus.GREEN, null);
public static final TransformHealth UNKNOWN = new TransformHealth(HealthStatus.UNKNOWN, null);
private static final String STATUS = "status";
private static final String ISSUES = "issues";
private final HealthStatus status;
private final List<TransformHealthIssue> issues;
public TransformHealth(HealthStatus status, List<TransformHealthIssue> issues) {
this.status = status;
this.issues = issues;
}
public TransformHealth(StreamInput in) throws IOException {
this.status = in.readEnum(HealthStatus.class);
this.issues = in.readOptionalList(TransformHealthIssue::new);
}
public HealthStatus getStatus() {
return status;
}
public List<TransformHealthIssue> getIssues() {
return issues;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(STATUS, status.xContentValue());
if (issues != null && issues.isEmpty() == false) {
builder.field(ISSUES, issues);
}
return builder.endObject();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
status.writeTo(out);
out.writeOptionalCollection(issues);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
TransformHealth that = (TransformHealth) other;
return this.status.value() == that.status.value() && Objects.equals(this.issues, that.issues);
}
@Override
public int hashCode() {
return Objects.hash(status, issues);
}
public String toString() {
return Strings.toString(this, true, true);
}
}

View file

@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
public class TransformHealthIssue implements Writeable, ToXContentObject {
private static final String ISSUE = "issue";
private static final String DETAILS = "details";
private static final String COUNT = "count";
private static final String FIRST_OCCURRENCE = "first_occurrence";
private static final String FIRST_OCCURRENCE_HUMAN_READABLE = FIRST_OCCURRENCE + "_string";
private final String issue;
private final String details;
private final int count;
private final Instant firstOccurrence;
public TransformHealthIssue(String issue, String details, int count, Instant firstOccurrence) {
this.issue = Objects.requireNonNull(issue);
this.details = details;
if (count < 1) {
throw new IllegalArgumentException("[count] must be at least 1, got: " + count);
}
this.count = count;
this.firstOccurrence = firstOccurrence != null ? firstOccurrence.truncatedTo(ChronoUnit.MILLIS) : null;
}
public TransformHealthIssue(StreamInput in) throws IOException {
this.issue = in.readString();
this.details = in.readOptionalString();
this.count = in.readVInt();
this.firstOccurrence = in.readOptionalInstant();
}
public String getIssue() {
return issue;
}
public String getDetails() {
return details;
}
public int getCount() {
return count;
}
public Instant getFirstOccurrence() {
return firstOccurrence;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ISSUE, issue);
if (Strings.isNullOrEmpty(details) == false) {
builder.field(DETAILS, details);
}
builder.field(COUNT, count);
if (firstOccurrence != null) {
builder.timeField(FIRST_OCCURRENCE, FIRST_OCCURRENCE_HUMAN_READABLE, firstOccurrence.toEpochMilli());
}
return builder.endObject();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(issue);
out.writeOptionalString(details);
out.writeVInt(count);
out.writeOptionalInstant(firstOccurrence);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (other == null || getClass() != other.getClass()) {
return false;
}
TransformHealthIssue that = (TransformHealthIssue) other;
return this.count == that.count
&& Objects.equals(this.issue, that.issue)
&& Objects.equals(this.details, that.details)
&& Objects.equals(this.firstOccurrence, that.firstOccurrence);
}
@Override
public int hashCode() {
return Objects.hash(issue, details, count, firstOccurrence);
}
@Override
public String toString() {
return Strings.toString(this, true, true);
}
}

View file

@ -14,12 +14,9 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.TransformField;
@ -27,9 +24,6 @@ import java.io.IOException;
import java.util.Locale;
import java.util.Objects;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
/**
* Used as a wrapper for the objects returned from the stats endpoint.
* Objects of this class are expected to be ephemeral.
@ -38,6 +32,7 @@ import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstr
public class TransformStats implements Writeable, ToXContentObject {
public static final String NAME = "data_frame_transform_stats";
public static final ParseField HEALTH_FIELD = new ParseField("health");
public static final ParseField STATE_FIELD = new ParseField("state");
public static final ParseField REASON_FIELD = new ParseField("reason");
public static final ParseField NODE_FIELD = new ParseField("node");
@ -51,39 +46,22 @@ public class TransformStats implements Writeable, ToXContentObject {
private NodeAttributes node;
private final TransformIndexerStats indexerStats;
private final TransformCheckpointingInfo checkpointingInfo;
public static final ConstructingObjectParser<TransformStats, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
a -> new TransformStats(
(String) a[0],
(State) a[1],
(String) a[2],
(NodeAttributes) a[3],
(TransformIndexerStats) a[4],
(TransformCheckpointingInfo) a[5]
)
);
static {
PARSER.declareString(constructorArg(), TransformField.ID);
PARSER.declareField(constructorArg(), p -> TransformStats.State.fromString(p.text()), STATE_FIELD, ObjectParser.ValueType.STRING);
PARSER.declareString(optionalConstructorArg(), REASON_FIELD);
PARSER.declareField(optionalConstructorArg(), NodeAttributes.PARSER::apply, NODE_FIELD, ObjectParser.ValueType.OBJECT);
PARSER.declareObject(constructorArg(), (p, c) -> TransformIndexerStats.fromXContent(p), TransformField.STATS_FIELD);
PARSER.declareObject(constructorArg(), (p, c) -> TransformCheckpointingInfo.fromXContent(p), CHECKPOINTING_INFO_FIELD);
}
public static TransformStats fromXContent(XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}
private final TransformHealth health;
public static TransformStats initialStats(String id) {
return stoppedStats(id, new TransformIndexerStats());
}
public static TransformStats stoppedStats(String id, TransformIndexerStats indexerTransformStats) {
return new TransformStats(id, State.STOPPED, null, null, indexerTransformStats, TransformCheckpointingInfo.EMPTY);
return new TransformStats(
id,
State.STOPPED,
null,
null,
indexerTransformStats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
);
}
public TransformStats(
@ -92,7 +70,8 @@ public class TransformStats implements Writeable, ToXContentObject {
@Nullable String reason,
@Nullable NodeAttributes node,
TransformIndexerStats stats,
TransformCheckpointingInfo checkpointingInfo
TransformCheckpointingInfo checkpointingInfo,
TransformHealth health
) {
this.id = Objects.requireNonNull(id);
this.state = Objects.requireNonNull(state);
@ -100,32 +79,29 @@ public class TransformStats implements Writeable, ToXContentObject {
this.node = node;
this.indexerStats = Objects.requireNonNull(stats);
this.checkpointingInfo = Objects.requireNonNull(checkpointingInfo);
this.health = Objects.requireNonNull(health);
}
public TransformStats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.id = in.readString();
this.state = in.readEnum(State.class);
this.reason = in.readOptionalString();
if (in.readBoolean()) {
this.node = new NodeAttributes(in);
} else {
this.node = null;
}
this.indexerStats = new TransformIndexerStats(in);
this.checkpointingInfo = new TransformCheckpointingInfo(in);
this.id = in.readString();
this.state = in.readEnum(State.class);
this.reason = in.readOptionalString();
if (in.readBoolean()) {
this.node = new NodeAttributes(in);
} else {
// Prior to version 7.4 TransformStats didn't exist, and we have
// to do the best we can of reading from a TransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
this.id = in.readString();
TransformState transformState = new TransformState(in);
this.state = State.fromComponents(transformState.getTaskState(), transformState.getIndexerState());
this.reason = transformState.getReason();
this.node = transformState.getNode();
this.indexerStats = new TransformIndexerStats(in);
this.checkpointingInfo = new TransformCheckpointingInfo(in);
this.node = null;
}
this.indexerStats = new TransformIndexerStats(in);
this.checkpointingInfo = new TransformCheckpointingInfo(in);
if (in.getVersion().onOrAfter(Version.V_8_6_0)) {
if (in.readBoolean()) {
this.health = new TransformHealth(in);
} else {
this.health = null;
}
} else {
this.health = null;
}
}
@ -142,52 +118,39 @@ public class TransformStats implements Writeable, ToXContentObject {
}
builder.field(TransformField.STATS_FIELD.getPreferredName(), indexerStats, params);
builder.field(CHECKPOINTING_INFO_FIELD.getPreferredName(), checkpointingInfo, params);
if (health != null) {
builder.field(HEALTH_FIELD.getPreferredName(), health);
}
builder.endObject();
return builder;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeString(id);
// 7.13 introduced the waiting state, in older version report the state as started
if (out.getVersion().before(Version.V_7_13_0) && state.equals(State.WAITING)) {
out.writeEnum(State.STARTED);
} else {
out.writeEnum(state);
}
out.writeOptionalString(reason);
if (node != null) {
out.writeString(id);
out.writeEnum(state);
out.writeOptionalString(reason);
if (node != null) {
out.writeBoolean(true);
node.writeTo(out);
} else {
out.writeBoolean(false);
}
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_6_0)) {
if (health != null) {
out.writeBoolean(true);
node.writeTo(out);
health.writeTo(out);
} else {
out.writeBoolean(false);
}
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
} else {
// Prior to version 7.4 TransformStats didn't exist, and we have
// to do the best we can of writing to a TransformStoredDoc object
// (which is called DataFrameTransformStateAndStats in 7.2/7.3)
out.writeString(id);
Tuple<TransformTaskState, IndexerState> stateComponents = state.toComponents();
new TransformState(
stateComponents.v1(),
stateComponents.v2(),
checkpointingInfo.getNext().getPosition(),
checkpointingInfo.getLast().getCheckpoint(),
reason,
checkpointingInfo.getNext().getCheckpointProgress(),
node
).writeTo(out);
indexerStats.writeTo(out);
checkpointingInfo.writeTo(out);
}
}
@Override
public int hashCode() {
return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo);
return Objects.hash(id, state, reason, node, indexerStats, checkpointingInfo, health);
}
@Override
@ -207,7 +170,8 @@ public class TransformStats implements Writeable, ToXContentObject {
&& Objects.equals(this.reason, that.reason)
&& Objects.equals(this.node, that.node)
&& Objects.equals(this.indexerStats, that.indexerStats)
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo);
&& Objects.equals(this.checkpointingInfo, that.checkpointingInfo)
&& Objects.equals(this.health, that.health);
}
public String getId() {

View file

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.time.Instant;
public class TransformHealthIssueTests extends AbstractWireSerializingTestCase<TransformHealthIssue> {
public static TransformHealthIssue randomTransformHealthIssue() {
return new TransformHealthIssue(
randomAlphaOfLengthBetween(10, 200),
randomBoolean() ? randomAlphaOfLengthBetween(10, 200) : null,
randomIntBetween(1, 10),
randomBoolean() ? null : Instant.ofEpochSecond(randomLongBetween(1, 100000), randomLongBetween(-999_999_999, 999_999_999))
);
}
@Override
protected Writeable.Reader<TransformHealthIssue> instanceReader() {
return TransformHealthIssue::new;
}
@Override
protected TransformHealthIssue createTestInstance() {
return randomTransformHealthIssue();
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
public class TransformHealthTests extends AbstractWireSerializingTestCase<TransformHealth> {
public static TransformHealth randomTransformHealth() {
return new TransformHealth(
randomFrom(HealthStatus.values()),
randomBoolean() ? null : randomList(1, 10, TransformHealthIssueTests::randomTransformHealthIssue)
);
}
@Override
protected Writeable.Reader<TransformHealth> instanceReader() {
return TransformHealth::new;
}
@Override
protected TransformHealth createTestInstance() {
return randomTransformHealth();
}
}

View file

@ -7,21 +7,10 @@
package org.elasticsearch.xpack.core.transform.transforms;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable.Reader;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.io.IOException;
import java.util.function.Predicate;
import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.STARTED;
import static org.elasticsearch.xpack.core.transform.transforms.TransformStats.State.WAITING;
import static org.hamcrest.Matchers.equalTo;
public class TransformStatsTests extends AbstractXContentSerializingTestCase<TransformStats> {
public class TransformStatsTests extends AbstractWireSerializingTestCase<TransformStats> {
public static TransformStats randomTransformStats() {
return new TransformStats(
@ -30,15 +19,11 @@ public class TransformStatsTests extends AbstractXContentSerializingTestCase<Tra
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
TransformIndexerStatsTests.randomStats(),
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo()
TransformCheckpointingInfoTests.randomTransformCheckpointingInfo(),
TransformHealthTests.randomTransformHealth()
);
}
@Override
protected TransformStats doParseInstance(XContentParser parser) throws IOException {
return TransformStats.fromXContent(parser);
}
@Override
protected TransformStats createTestInstance() {
return randomTransformStats();
@ -48,106 +33,4 @@ public class TransformStatsTests extends AbstractXContentSerializingTestCase<Tra
protected Reader<TransformStats> instanceReader() {
return TransformStats::new;
}
@Override
protected boolean supportsUnknownFields() {
return true;
}
@Override
protected String[] getShuffleFieldsExceptions() {
return new String[] { "position" };
}
@Override
protected Predicate<String> getRandomFieldsExcludeFilter() {
return field -> field.isEmpty() == false;
}
public void testBwcWith73() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 0.0, 0.0, 0.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_3_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_3_0);
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld, equalTo(stats));
}
}
}
}
public void testBwcWith76() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
STARTED,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 15.0, 16.0, 17.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_6_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_6_0);
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld, equalTo(stats));
}
}
}
}
public void testBwcWith712() throws IOException {
for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) {
TransformStats stats = new TransformStats(
"bwc-id",
WAITING,
randomBoolean() ? null : randomAlphaOfLength(100),
randomBoolean() ? null : NodeAttributeTests.randomNodeAttributes(),
new TransformIndexerStats(1, 2, 3, 0, 5, 6, 7, 0, 0, 10, 11, 0, 13, 14, 15.0, 16.0, 17.0),
new TransformCheckpointingInfo(
new TransformCheckpointStats(0, null, null, 10, 100),
new TransformCheckpointStats(0, null, null, 100, 1000),
// changesLastDetectedAt aren't serialized back
100,
null,
null
)
);
try (BytesStreamOutput output = new BytesStreamOutput()) {
output.setVersion(Version.V_7_12_0);
stats.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
in.setVersion(Version.V_7_13_0);
TransformStats statsFromOld = new TransformStats(in);
assertThat(statsFromOld.getState(), equalTo(STARTED));
}
}
}
}
}

View file

@ -34,12 +34,14 @@ import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction.Req
import org.elasticsearch.xpack.core.transform.action.GetTransformStatsAction.Response;
import org.elasticsearch.xpack.core.transform.transforms.NodeAttributes;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealth;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService;
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
import org.elasticsearch.xpack.transform.transforms.TransformHealthChecker;
import org.elasticsearch.xpack.transform.transforms.TransformNodeAssignments;
import org.elasticsearch.xpack.transform.transforms.TransformNodes;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
@ -238,7 +240,8 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
reason,
null,
task.getStats(),
checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo
checkpointingInfo == null ? TransformCheckpointingInfo.EMPTY : checkpointingInfo,
TransformHealthChecker.checkTransform(task)
);
}
@ -341,7 +344,8 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
assignment.getExplanation(),
null,
stat.getTransformStats(),
checkpointingInfo
checkpointingInfo,
TransformHealthChecker.checkUnassignedTransform(stat.getId(), clusterState)
)
);
} else {
@ -352,7 +356,8 @@ public class TransportGetTransformStatsAction extends TransportTasksAction<Trans
null,
null,
stat.getTransformStats(),
checkpointingInfo
checkpointingInfo,
TransformHealth.GREEN
)
);
}

View file

@ -16,7 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
class TransformContext {
public class TransformContext {
public interface Listener {
void shutdown();
@ -28,12 +28,16 @@ class TransformContext {
private final AtomicReference<TransformTaskState> taskState;
private final AtomicReference<String> stateReason;
private volatile Instant stateFailureTime;
private final Listener taskListener;
private volatile int numFailureRetries = Transform.DEFAULT_FAILURE_RETRIES;
private final AtomicInteger failureCount;
// Keeps track of the last failure that occurred, used for throttling logs and audit
private final AtomicReference<String> lastFailure = new AtomicReference<>();
private final AtomicReference<Throwable> lastFailure = new AtomicReference<>();
private volatile Instant lastFailureStartTime;
private final AtomicInteger statePersistenceFailureCount = new AtomicInteger();
private final AtomicReference<Throwable> lastStatePersistenceFailure = new AtomicReference<>();
private volatile Instant lastStatePersistenceFailureStartTime;
private volatile Instant changesLastDetectedAt;
private volatile Instant lastSearchTime;
private volatile boolean shouldStopAtCheckpoint = false;
@ -43,7 +47,7 @@ class TransformContext {
// Note: Each indexer run creates a new future checkpoint which becomes the current checkpoint only after the indexer run finished
private final AtomicLong currentCheckpoint;
TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
public TransformContext(TransformTaskState taskState, String stateReason, long currentCheckpoint, Listener taskListener) {
this.taskState = new AtomicReference<>(taskState);
this.stateReason = new AtomicReference<>(stateReason);
this.currentCheckpoint = new AtomicLong(currentCheckpoint);
@ -67,12 +71,15 @@ class TransformContext {
void setTaskStateToFailed(String reason) {
taskState.set(TransformTaskState.FAILED);
stateReason.set(reason);
stateFailureTime = Instant.now();
}
void resetReasonAndFailureCounter() {
stateReason.set(null);
failureCount.set(0);
lastFailure.set(null);
stateFailureTime = null;
lastFailureStartTime = null;
taskListener.failureCountChanged();
}
@ -80,6 +87,10 @@ class TransformContext {
return stateReason.get();
}
Instant getStateFailureTime() {
return stateFailureTime;
}
void setCheckpoint(long newValue) {
currentCheckpoint.set(newValue);
}
@ -104,17 +115,25 @@ class TransformContext {
return failureCount.get();
}
int incrementAndGetFailureCount(String failure) {
int incrementAndGetFailureCount(Throwable failure) {
int newFailureCount = failureCount.incrementAndGet();
lastFailure.set(failure);
if (newFailureCount == 1) {
lastFailureStartTime = Instant.now();
}
taskListener.failureCountChanged();
return newFailureCount;
}
String getLastFailure() {
Throwable getLastFailure() {
return lastFailure.get();
}
Instant getLastFailureStartTime() {
return lastFailureStartTime;
}
void setChangesLastDetectedAt(Instant time) {
changesLastDetectedAt = time;
}
@ -149,14 +168,29 @@ class TransformContext {
void resetStatePersistenceFailureCount() {
statePersistenceFailureCount.set(0);
lastStatePersistenceFailure.set(null);
lastStatePersistenceFailureStartTime = null;
}
int getStatePersistenceFailureCount() {
return statePersistenceFailureCount.get();
}
int incrementAndGetStatePersistenceFailureCount() {
return statePersistenceFailureCount.incrementAndGet();
Throwable getLastStatePersistenceFailure() {
return lastStatePersistenceFailure.get();
}
int incrementAndGetStatePersistenceFailureCount(Throwable failure) {
lastStatePersistenceFailure.set(failure);
int newFailureCount = statePersistenceFailureCount.incrementAndGet();
if (newFailureCount == 1) {
lastStatePersistenceFailureStartTime = Instant.now();
}
return newFailureCount;
}
Instant getLastStatePersistenceFailureStartTime() {
return lastStatePersistenceFailureStartTime;
}
void shutdown() {

View file

@ -94,7 +94,7 @@ class TransformFailureHandler {
// counter for search/index gets reset after a successful bulk index request
int numFailureRetries = getNumFailureRetries(settingsConfig);
final int failureCount = context.incrementAndGetStatePersistenceFailureCount();
final int failureCount = context.incrementAndGetStatePersistenceFailureCount(e);
if (numFailureRetries != -1 && failureCount > numFailureRetries) {
fail(
@ -226,9 +226,11 @@ class TransformFailureHandler {
*/
private void retry(Throwable unwrappedException, String message, boolean unattended, int numFailureRetries) {
// group failures to decide whether to report it below
final String thisFailureClass = unwrappedException.getClass().toString();
final String lastFailureClass = context.getLastFailure();
final int failureCount = context.incrementAndGetFailureCount(thisFailureClass);
final boolean repeatedFailure = context.getLastFailure() == null
? false
: unwrappedException.getClass().equals(context.getLastFailure().getClass());
final int failureCount = context.incrementAndGetFailureCount(unwrappedException);
if (unattended == false && numFailureRetries != -1 && failureCount > numFailureRetries) {
fail("task encountered more than " + numFailureRetries + " failures; latest failure: " + message);
@ -238,9 +240,7 @@ class TransformFailureHandler {
// Since our schedule fires again very quickly after failures it is possible to run into the same failure numerous
// times in a row, very quickly. We do not want to spam the audit log with repeated failures, so only record the first one
// and if the number of retries is about to exceed
if (thisFailureClass.equals(lastFailureClass) == false
|| failureCount % LOG_FAILURE_EVERY == 0
|| failureCount == numFailureRetries) {
if (repeatedFailure == false || failureCount % LOG_FAILURE_EVERY == 0 || failureCount == numFailureRetries) {
String retryMessage = format(
"Transform encountered an exception: [%s]; Will automatically retry [%d/%d]",
message,

View file

@ -0,0 +1,102 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.transforms;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata.Assignment;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealth;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealthIssue;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import java.util.ArrayList;
import java.util.List;
/**
* Check the health of a transform.
*/
public final class TransformHealthChecker {
// simple boundary to decide when to report a red status vs. a yellow status after consecutive retries
static int RED_STATUS_FAILURE_COUNT_BOUNDARY = 5;
public static TransformHealth checkUnassignedTransform(String transformId, ClusterState clusterState) {
final Assignment assignment = TransformNodes.getAssignment(transformId, clusterState);
return new TransformHealth(
HealthStatus.RED,
List.of(new TransformHealthIssue("Failed to assign transform to a node", assignment.getExplanation(), 1, null))
);
}
public static TransformHealth checkTransform(TransformTask transformTask) {
// quick check
if (TransformTaskState.FAILED.equals(transformTask.getState().getTaskState()) == false
&& transformTask.getContext().getFailureCount() == 0
&& transformTask.getContext().getStatePersistenceFailureCount() == 0) {
return TransformHealth.GREEN;
}
final TransformContext transformContext = transformTask.getContext();
List<TransformHealthIssue> issues = new ArrayList<>();
HealthStatus maxStatus = HealthStatus.GREEN;
if (TransformTaskState.FAILED.equals(transformTask.getState().getTaskState())) {
maxStatus = HealthStatus.RED;
issues.add(
new TransformHealthIssue(
"Transform task state is [failed]",
transformTask.getState().getReason(),
1,
transformContext.getStateFailureTime()
)
);
}
if (transformContext.getFailureCount() != 0) {
final Throwable lastFailure = transformContext.getLastFailure();
final String lastFailureMessage = lastFailure instanceof ElasticsearchException elasticsearchException
? elasticsearchException.getDetailedMessage()
: lastFailure.getMessage();
if (HealthStatus.RED.equals(maxStatus) == false) {
maxStatus = transformContext.getFailureCount() > RED_STATUS_FAILURE_COUNT_BOUNDARY ? HealthStatus.RED : HealthStatus.YELLOW;
}
issues.add(
new TransformHealthIssue(
"Transform indexer failed",
lastFailureMessage,
transformContext.getFailureCount(),
transformContext.getLastFailureStartTime()
)
);
}
if (transformContext.getStatePersistenceFailureCount() != 0) {
if (HealthStatus.RED.equals(maxStatus) == false) {
maxStatus = transformContext.getStatePersistenceFailureCount() > RED_STATUS_FAILURE_COUNT_BOUNDARY
? HealthStatus.RED
: HealthStatus.YELLOW;
}
issues.add(
new TransformHealthIssue(
"Task encountered failures updating internal state",
transformContext.getLastStatePersistenceFailure().getMessage(),
transformContext.getStatePersistenceFailureCount(),
transformContext.getLastStatePersistenceFailureStartTime()
)
);
}
return new TransformHealth(maxStatus, issues);
}
private TransformHealthChecker() {}
}

View file

@ -549,10 +549,6 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
return threadPool;
}
TransformTaskState getTaskState() {
return context.getTaskState();
}
public static PersistentTask<?> getTransformTask(String transformId, ClusterState clusterState) {
Collection<PersistentTask<?>> transformTasks = findTransformTasks(t -> t.getId().equals(transformId), clusterState);
if (transformTasks.isEmpty()) {
@ -588,6 +584,11 @@ public class TransformTask extends AllocatedPersistentTask implements TransformS
return findTransformTasks(taskMatcher, clusterState);
}
// used for {@link TransformHealthChecker}
public TransformContext getContext() {
return context;
}
private static Collection<PersistentTask<?>> findTransformTasks(Predicate<PersistentTask<?>> predicate, ClusterState clusterState) {
PersistentTasksCustomMetadata pTasksMeta = PersistentTasksCustomMetadata.getPersistentTasksCustomMetadata(clusterState);
if (pTasksMeta == null) {

View file

@ -6,18 +6,23 @@
*/
package org.elasticsearch.xpack.transform.action;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealth;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealthIssue;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStatsTests;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.transform.transforms.TransformContext;
import org.elasticsearch.xpack.transform.transforms.TransformTask;
import java.time.Instant;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
@ -52,11 +57,21 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.STOPPED,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN))
);
reason = "foo";
@ -65,17 +80,32 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.STOPPED,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.STOPPED, reason, null, stats, info, TransformHealth.GREEN))
);
}
public void testDeriveStatsFailed() {
String transformId = "transform-with-stats";
String reason = null;
TransformHealth expectedHealth = new TransformHealth(
HealthStatus.RED,
List.of(new TransformHealthIssue("Transform task state is [failed]", null, 1, null))
);
TransformIndexerStats stats = TransformIndexerStatsTests.randomStats();
TransformState failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, failedState, stats);
@ -89,24 +119,48 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.FAILED,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
expectedHealth
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth))
);
reason = "the task is failed";
expectedHealth = new TransformHealth(
HealthStatus.RED,
List.of(new TransformHealthIssue("Transform task state is [failed]", reason, 1, null))
);
failedState = new TransformState(TransformTaskState.FAILED, IndexerState.STOPPED, null, 0, reason, null, null, true);
withIdStateAndStats(transformId, failedState, stats);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.FAILED,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
expectedHealth
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.FAILED, reason, null, stats, info, expectedHealth))
);
}
@ -142,7 +196,8 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
"transform is set to stop at the next checkpoint",
null,
stats,
TransformCheckpointingInfo.EMPTY
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
@ -155,7 +210,8 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
"transform is set to stop at the next checkpoint",
null,
stats,
info
info,
TransformHealth.GREEN
)
)
);
@ -166,11 +222,21 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.STOPPING,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.STOPPING, reason, null, stats, info, TransformHealth.GREEN))
);
// Stop at next checkpoint is false.
@ -179,11 +245,21 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
assertThat(
TransportGetTransformStatsAction.deriveStats(task, null),
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, TransformCheckpointingInfo.EMPTY))
equalTo(
new TransformStats(
transformId,
TransformStats.State.INDEXING,
reason,
null,
stats,
TransformCheckpointingInfo.EMPTY,
TransformHealth.GREEN
)
)
);
assertThat(
TransportGetTransformStatsAction.deriveStats(task, info),
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info))
equalTo(new TransformStats(transformId, TransformStats.State.INDEXING, reason, null, stats, info, TransformHealth.GREEN))
);
}
@ -191,6 +267,7 @@ public class TransportGetTransformStatsActionTests extends ESTestCase {
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(state);
when(task.getStats()).thenReturn(stats);
when(task.getContext()).thenReturn(new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class)));
}
}

View file

@ -36,9 +36,9 @@ public class TransformContextTests extends ESTestCase {
public void testFailureCount() {
TransformContext context = new TransformContext(null, null, 0, listener);
assertThat(context.incrementAndGetFailureCount("some_exception"), is(equalTo(1)));
assertThat(context.incrementAndGetFailureCount(new RuntimeException("some_exception")), is(equalTo(1)));
assertThat(context.getFailureCount(), is(equalTo(1)));
assertThat(context.incrementAndGetFailureCount("some_other_exception"), is(equalTo(2)));
assertThat(context.incrementAndGetFailureCount(new IllegalArgumentException("some_other_exception")), is(equalTo(2)));
assertThat(context.getFailureCount(), is(equalTo(2)));
context.resetReasonAndFailureCounter();
assertThat(context.getFailureCount(), is(equalTo(0)));

View file

@ -0,0 +1,114 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
package org.elasticsearch.xpack.transform.transforms;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.health.HealthStatus;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformHealth;
import org.elasticsearch.xpack.core.transform.transforms.TransformState;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import java.time.Instant;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TransformHealthCheckerTests extends ESTestCase {
public void testGreen() {
TransformTask task = mock(TransformTask.class);
TransformContext context = createTestContext();
withIdStateAndContext(task, randomAlphaOfLength(10), context);
assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
}
public void testPersistenceFailure() {
TransformTask task = mock(TransformTask.class);
TransformContext context = createTestContext();
Instant now = Instant.now();
withIdStateAndContext(task, randomAlphaOfLength(10), context);
assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
context.incrementAndGetStatePersistenceFailureCount(new ElasticsearchException("failed to persist"));
TransformHealth health = TransformHealthChecker.checkTransform(task);
assertThat(health.getStatus(), equalTo(HealthStatus.YELLOW));
assertEquals(1, health.getIssues().size());
assertThat(health.getIssues().get(0).getIssue(), equalTo("Task encountered failures updating internal state"));
assertThat(health.getIssues().get(0).getFirstOccurrence(), greaterThanOrEqualTo(now));
assertThat(health.getIssues().get(0).getFirstOccurrence(), lessThan(Instant.MAX));
context.resetStatePersistenceFailureCount();
assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
}
public void testStatusSwitchingAndMultipleFailures() {
TransformTask task = mock(TransformTask.class);
TransformContext context = createTestContext();
Instant now = Instant.now();
withIdStateAndContext(task, randomAlphaOfLength(10), context);
assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
context.incrementAndGetFailureCount(new ElasticsearchException("internal error"));
TransformHealth health = TransformHealthChecker.checkTransform(task);
assertThat(health.getStatus(), equalTo(HealthStatus.YELLOW));
Instant firstOccurrence = health.getIssues().get(0).getFirstOccurrence();
assertThat(firstOccurrence, greaterThanOrEqualTo(now));
assertThat(firstOccurrence, lessThan(Instant.MAX));
for (int i = 1; i < TransformHealthChecker.RED_STATUS_FAILURE_COUNT_BOUNDARY; ++i) {
context.incrementAndGetFailureCount(new ElasticsearchException("internal error"));
assertThat(TransformHealthChecker.checkTransform(task).getStatus(), equalTo(HealthStatus.YELLOW));
assertEquals(1, TransformHealthChecker.checkTransform(task).getIssues().size());
assertThat(health.getIssues().get(0).getFirstOccurrence(), equalTo(firstOccurrence));
}
// turn RED
context.incrementAndGetFailureCount(new ElasticsearchException("internal error"));
assertThat(TransformHealthChecker.checkTransform(task).getStatus(), equalTo(HealthStatus.RED));
assertEquals(1, TransformHealthChecker.checkTransform(task).getIssues().size());
assertThat(health.getIssues().get(0).getFirstOccurrence(), equalTo(firstOccurrence));
// add a persistence error
context.incrementAndGetStatePersistenceFailureCount(new ElasticsearchException("failed to persist"));
assertThat(TransformHealthChecker.checkTransform(task).getStatus(), equalTo(HealthStatus.RED));
assertEquals(2, TransformHealthChecker.checkTransform(task).getIssues().size());
// reset the indexer error
context.resetReasonAndFailureCounter();
assertThat(TransformHealthChecker.checkTransform(task).getStatus(), equalTo(HealthStatus.YELLOW));
assertEquals(1, TransformHealthChecker.checkTransform(task).getIssues().size());
assertThat(
TransformHealthChecker.checkTransform(task).getIssues().get(0).getIssue(),
equalTo("Task encountered failures updating internal state")
);
context.resetStatePersistenceFailureCount();
assertThat(TransformHealthChecker.checkTransform(task), equalTo(TransformHealth.GREEN));
}
private TransformContext createTestContext() {
return new TransformContext(TransformTaskState.STARTED, "", 0, mock(TransformContext.Listener.class));
}
private static void withIdStateAndContext(TransformTask task, String transformId, TransformContext context) {
when(task.getTransformId()).thenReturn(transformId);
when(task.getState()).thenReturn(
new TransformState(TransformTaskState.STARTED, IndexerState.INDEXING, null, 0, "", null, null, false)
);
when(task.getContext()).thenReturn(context);
}
}

View file

@ -0,0 +1,56 @@
{
"definitions": {},
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://raw.githubusercontent.com/elastic/elasticsearch/master/x-pack/plugin/core/src/test/resources/rest-api-spec/schema/transform_health.schema.json",
"description": "schema definition for transform health",
"additionalProperties": false,
"title": "Root",
"type": "object",
"required": [
"status"
],
"properties": {
"status": {
"type": "string",
"enum": [
"green",
"yellow",
"red",
"unknown"
],
"description": "The transform health status"
},
"issues": {
"type": "array",
"description": "reason for the health status",
"items": [
{
"type": "object",
"description": "describes a single issue",
"additionalProperties": false,
"required": [
"issue"
],
"properties": {
"issue": {
"type": "string",
"description": "single issue description"
},
"details": {
"type": "string",
"description": "single issue details"
},
"first_occurrence": {
"type": "integer",
"description": "time the issue appeared for the first time"
},
"count": {
"type": "integer",
"description": "number of times the issue appeared"
}
}
}
]
}
}
}

View file

@ -63,6 +63,9 @@
"reason": {
"type": "string",
"description": "reason if failed"
},
"health": {
"$ref": "file:transform_health.schema.json"
}
}
}
}