[Actions] Treat failures as successes for Task Manager (#109655)

* Support retry with email as an example

* Fix tests

* Add logic to treat as failure if there is a retry

* Handle retry better

* Make this optional

* Tweaks

* Remove unnecessary code

* Fix existing tests

* Add some unit tests

* Add test

* Add doc note

* More docs

* PR feedback

* Update docs/management/action-types.asciidoc

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update docs/management/action-types.asciidoc

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update docs/management/action-types.asciidoc

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update docs/management/action-types.asciidoc

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

* Update docs/management/action-types.asciidoc

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>

Co-authored-by: gchaps <33642766+gchaps@users.noreply.github.com>
Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Chris Roberson 2021-09-09 12:51:39 -04:00 committed by GitHub
parent b546762668
commit b9e6f935c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 324 additions and 27 deletions

View file

@ -135,5 +135,14 @@ image::images/connectors-with-missing-secrets.png[Connectors with missing secret
For out-of-the-box and standardized connectors, you can <<preconfigured-connector-example, preconfigure connectors>>
before {kib} starts.
[float]
[[montoring-connectors]]
=== Monitoring connectors
The <<task-manager-health-monitoring,Task Manager health API>> helps you understand the performance of all tasks in your environment.
However, if connectors fail to execute, they will report as successful to Task Manager. The failure stats will not
accurately depict the performance of connectors.
For more information on connector successes and failures, refer to the <<event-log-index,Event log index>>.
include::connectors/index.asciidoc[]

View file

@ -111,6 +111,7 @@ a| Runtime
| This section tracks excution performance of Task Manager, tracking task _drift_, worker _load_, and execution stats broken down by type, including duration and execution results.
a| Capacity Estimation
| This section provides a rough estimate about the sufficiency of its capacity. As the name suggests, these are estimates based on historical data and should not be used as predictions. Use these estimations when following the Task Manager <<task-manager-scaling-guidance>>.
@ -123,6 +124,14 @@ The root `status` indicates the `status` of the system overall.
The Runtime `status` indicates whether task executions have exceeded any of the <<task-manager-configuring-health-monitoring,configured health thresholds>>. An `OK` status means none of the threshold have been exceeded. A `Warning` status means that at least one warning threshold has been exceeded. An `Error` status means that at least one error threshold has been exceeded.
[IMPORTANT]
==============================================
Some tasks (such as <<action-types,connectors>>) will incorrectly report their status as successful even if the task failed.
The runtime and workload block will return data about success and failures and will not take this into consideration.
To get a better sense of action failures, please refer to the <<event-log-index,Event log index>> for more accurate context into failures and successes.
==============================================
The Capacity Estimation `status` indicates the sufficiency of the observed capacity. An `OK` status means capacity is sufficient. A `Warning` status means that capacity is sufficient for the scheduled recurring tasks, but non-recurring tasks often cause the cluster to exceed capacity. An `Error` status means that there is insufficient capacity across all types of tasks.
By monitoring the `status` of the system overall, and the `status` of specific task types of interest, you can evaluate the health of the {kib} Task Management system.

View file

@ -134,7 +134,8 @@ export class ActionTypeRegistry {
// Don't retry other kinds of errors
return false;
},
createTaskRunner: (context: RunContext) => this.taskRunnerFactory.create(context),
createTaskRunner: (context: RunContext) =>
this.taskRunnerFactory.create(context, actionType.maxAttempts),
},
});
// No need to notify usage on basic action types

View file

@ -187,10 +187,12 @@ test('successfully executes as a task', async () => {
const scheduleDelay = 10000; // milliseconds
const scheduled = new Date(Date.now() - scheduleDelay);
const attempts = 1;
await actionExecutor.execute({
...executeParams,
taskInfo: {
scheduled,
attempts,
},
});

View file

@ -44,6 +44,7 @@ export interface ActionExecutorContext {
export interface TaskInfo {
scheduled: Date;
attempts: number;
}
export interface ExecuteOptions<Source = unknown> {
@ -210,6 +211,7 @@ export class ActionExecutor {
config: validatedConfig,
secrets: validatedSecrets,
isEphemeral,
taskInfo,
});
} catch (err) {
rawResult = {

View file

@ -136,6 +136,7 @@ test('executes the task by calling the executor with proper parameters, using gi
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
@ -191,6 +192,7 @@ test('executes the task by calling the executor with proper parameters, using st
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
@ -341,6 +343,7 @@ test('uses API key when provided', async () => {
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
@ -401,6 +404,7 @@ test('uses relatedSavedObjects merged with references when provided', async () =
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
});
@ -451,6 +455,7 @@ test('uses relatedSavedObjects as is when references are empty', async () => {
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
});
@ -499,6 +504,7 @@ test('sanitizes invalid relatedSavedObjects when provided', async () => {
relatedSavedObjects: [],
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
});
@ -538,6 +544,7 @@ test(`doesn't use API key when not provided`, async () => {
}),
taskInfo: {
scheduled: new Date(),
attempts: 0,
},
});
@ -549,9 +556,15 @@ test(`doesn't use API key when not provided`, async () => {
});
test(`throws an error when license doesn't support the action type`, async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});
const taskRunner = taskRunnerFactory.create(
{
taskInstance: {
...mockedTaskInstance,
attempts: 1,
},
},
2
);
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
@ -579,6 +592,138 @@ test(`throws an error when license doesn't support the action type`, async () =>
} catch (e) {
expect(e instanceof ExecutorError).toEqual(true);
expect(e.data).toEqual({});
expect(e.retry).toEqual(false);
expect(e.retry).toEqual(true);
}
});
test(`treats errors as errors if the task is retryable`, async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
attempts: 0,
},
});
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
mockedActionExecutor.execute.mockResolvedValueOnce({
status: 'error',
actionId: '2',
message: 'Error message',
data: { foo: true },
retry: false,
});
let err;
try {
await taskRunner.run();
} catch (e) {
err = e;
}
expect(err).toBeDefined();
expect(err instanceof ExecutorError).toEqual(true);
expect(err.data).toEqual({ foo: true });
expect(err.retry).toEqual(false);
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will not retry: Error message`
);
});
test(`treats errors as successes if the task is not retryable`, async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
attempts: 1,
},
});
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
mockedActionExecutor.execute.mockResolvedValueOnce({
status: 'error',
actionId: '2',
message: 'Error message',
data: { foo: true },
retry: false,
});
let err;
try {
await taskRunner.run();
} catch (e) {
err = e;
}
expect(err).toBeUndefined();
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will not retry: Error message`
);
});
test('treats errors as errors if the error is thrown instead of returned', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
attempts: 0,
},
});
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
mockedActionExecutor.execute.mockRejectedValueOnce({});
let err;
try {
await taskRunner.run();
} catch (e) {
err = e;
}
expect(err).toBeDefined();
expect(err instanceof ExecutorError).toEqual(true);
expect(err.data).toEqual({});
expect(err.retry).toEqual(true);
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will retry: undefined`
);
});

View file

@ -22,7 +22,6 @@ import { ActionExecutorContract } from './action_executor';
import { ExecutorError } from './executor_error';
import { RunContext } from '../../../task_manager/server';
import { EncryptedSavedObjectsClient } from '../../../encrypted_saved_objects/server';
import { ActionTypeDisabledError } from './errors';
import {
ActionTaskParams,
ActionTypeRegistryContract,
@ -62,7 +61,7 @@ export class TaskRunnerFactory {
this.taskRunnerContext = taskRunnerContext;
}
public create({ taskInstance }: RunContext) {
public create({ taskInstance }: RunContext, maxAttempts: number = 1) {
if (!this.isInitialized) {
throw new Error('TaskRunnerFactory not initialized');
}
@ -78,6 +77,7 @@ export class TaskRunnerFactory {
const taskInfo = {
scheduled: taskInstance.runAt,
attempts: taskInstance.attempts,
};
return {
@ -119,7 +119,14 @@ export class TaskRunnerFactory {
basePathService.set(fakeRequest, path);
let executorResult: ActionTypeExecutorResult<unknown>;
// Throwing an executor error means we will attempt to retry the task
// TM will treat a task as a failure if `attempts >= maxAttempts`
// so we need to handle that here to avoid TM persisting the failed task
const isRetryableBasedOnAttempts = taskInfo.attempts < (maxAttempts ?? 1);
const willRetryMessage = `and will retry`;
const willNotRetryMessage = `and will not retry`;
let executorResult: ActionTypeExecutorResult<unknown> | undefined;
try {
executorResult = await actionExecutor.execute({
params,
@ -131,20 +138,39 @@ export class TaskRunnerFactory {
relatedSavedObjects: validatedRelatedSavedObjects(logger, relatedSavedObjects),
});
} catch (e) {
if (e instanceof ActionTypeDisabledError) {
// We'll stop re-trying due to action being forbidden
throw new ExecutorError(e.message, {}, false);
logger.error(
`Action '${actionId}' failed ${
isRetryableBasedOnAttempts ? willRetryMessage : willNotRetryMessage
}: ${e.message}`
);
if (isRetryableBasedOnAttempts) {
// In order for retry to work, we need to indicate to task manager this task
// failed
throw new ExecutorError(e.message, {}, true);
}
throw e;
}
if (executorResult.status === 'error') {
if (
executorResult &&
executorResult?.status === 'error' &&
executorResult?.retry !== undefined &&
isRetryableBasedOnAttempts
) {
logger.error(
`Action '${actionId}' failed ${
!!executorResult.retry ? willRetryMessage : willNotRetryMessage
}: ${executorResult.message}`
);
// Task manager error handler only kicks in when an error thrown (at this time)
// So what we have to do is throw when the return status is `error`.
throw new ExecutorError(
executorResult.message,
executorResult.data,
executorResult.retry == null ? false : executorResult.retry
executorResult.retry as boolean | Date
);
} else if (executorResult && executorResult?.status === 'error') {
logger.error(
`Action '${actionId}' failed ${willNotRetryMessage}: ${executorResult.message}`
);
}

View file

@ -19,6 +19,7 @@ import {
SavedObjectReference,
} from '../../../../src/core/server';
import { ActionTypeExecutorResult } from '../common';
import { TaskInfo } from './lib/action_executor';
export { ActionTypeExecutorResult } from '../common';
export { GetFieldsByIssueTypeResponse as JiraGetFieldsResponse } from './builtin_action_types/jira/types';
export { GetCommonFieldsResponse as ServiceNowGetFieldsResponse } from './builtin_action_types/servicenow/types';
@ -59,6 +60,7 @@ export interface ActionTypeExecutorOptions<Config, Secrets, Params> {
secrets: Secrets;
params: Params;
isEphemeral?: boolean;
taskInfo?: TaskInfo;
}
export interface ActionResult<Config extends ActionTypeConfig = ActionTypeConfig> {

View file

@ -44,6 +44,7 @@ const enabledActionTypes = [
'test.noop',
'test.delayed',
'test.rate-limit',
'test.no-attempts-rate-limit',
'test.throw',
];

View file

@ -37,6 +37,7 @@ export function defineActionTypes(
actions.registerType(getDelayedActionType());
actions.registerType(getFailingActionType());
actions.registerType(getRateLimitedActionType());
actions.registerType(getNoAttemptsRateLimitedActionType());
actions.registerType(getAuthorizationActionType(core));
}
@ -183,6 +184,42 @@ function getRateLimitedActionType() {
return result;
}
function getNoAttemptsRateLimitedActionType() {
const paramsSchema = schema.object({
index: schema.string(),
reference: schema.string(),
retryAt: schema.number(),
});
type ParamsType = TypeOf<typeof paramsSchema>;
const result: ActionType<{}, {}, ParamsType> = {
id: 'test.no-attempts-rate-limit',
name: 'Test: Rate Limit',
minimumLicenseRequired: 'gold',
maxAttempts: 0,
validate: {
params: paramsSchema,
},
async executor({ config, params, services }) {
await services.scopedClusterClient.index({
index: params.index,
refresh: 'wait_for',
body: {
params,
config,
reference: params.reference,
source: 'action:test.rate-limit',
},
});
return {
status: 'error',
retry: new Date(params.retryAt),
actionId: '',
};
},
};
return result;
}
function getAuthorizationActionType(core: CoreSetup<FixtureStartDeps>) {
const paramsSchema = schema.object({
callClusterAuthorizationIndex: schema.string(),

View file

@ -97,21 +97,8 @@ export default function ({ getService }: FtrProviderContext) {
},
})
.expect(204);
await esTestIndexTool.waitForDocs('action:test.failing', reference, 1);
await supertest
.put(
`${getUrlPrefix(
Spaces.space1.id
)}/api/alerts_fixture/Actions-cleanup_failed_action_executions/reschedule_task`
)
.set('kbn-xsrf', 'foo')
.send({
runAt: new Date().toISOString(),
})
.expect(200);
await retry.try(async () => {
const searchResult = await es.search({
index: '.kibana_task_manager',
@ -139,5 +126,81 @@ export default function ({ getService }: FtrProviderContext) {
expect((searchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(0);
});
});
it('should never leaved a failed task, even if max attempts is reached', async () => {
// We have to provide the test.rate-limit the next runAt, for testing purposes
const retryDate = new Date(Date.now() + 1);
const { body: createdAction } = await supertest
.post(`${getUrlPrefix(Spaces.space1.id)}/api/actions/connector`)
.set('kbn-xsrf', 'foo')
.send({
name: 'My action',
connector_type_id: 'test.no-attempts-rate-limit',
config: {},
secrets: {},
})
.expect(200);
objectRemover.add(Spaces.space1.id, createdAction.id, 'action', 'actions');
const reference = `actions-enqueue-2:${Spaces.space1.id}:${createdAction.id}`;
await supertest
.post(
`${getUrlPrefix(Spaces.space1.id)}/api/alerts_fixture/${createdAction.id}/enqueue_action`
)
.set('kbn-xsrf', 'foo')
.send({
params: {
reference,
index: ES_TEST_INDEX_NAME,
retryAt: retryDate.getTime(),
},
})
.expect(204);
await retry.try(async () => {
const runningSearchResult = await es.search({
index: '.kibana_task_manager',
body: {
query: {
bool: {
must: [
{
term: {
'task.taskType': 'actions:test.no-attempts-rate-limit',
},
},
{
term: {
'task.status': 'running',
},
},
],
},
},
},
});
expect((runningSearchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(1);
});
await retry.try(async () => {
const searchResult = await es.search({
index: '.kibana_task_manager',
body: {
query: {
bool: {
must: [
{
term: {
'task.taskType': 'actions:test.no-attempts-rate-limit',
},
},
],
},
},
},
});
expect((searchResult.body.hits.total as estypes.SearchTotalHits).value).to.eql(0);
});
});
});
}