Distinguish error types in Task Manager (#170981)

Resolves: #168633

This PR implements the Task Manager part of the [Distinguish Error Types
Research](https://github.com/elastic/kibana/pull/169306)
This commit is contained in:
Ersin Erdal 2023-11-15 19:13:56 +01:00 committed by GitHub
parent 09e23e51c5
commit 2c253bf980
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 60 deletions

View file

@ -355,7 +355,7 @@ export const generateRunnerResult = ({
alertInstances = {},
alertRecoveredInstances = {},
summaryActions = {},
hasError = false,
taskRunError,
}: GeneratorParams = {}) => {
return {
monitoring: {
@ -388,7 +388,7 @@ export const generateRunnerResult = ({
...(state && { previousStartedAt: new Date('1970-01-01T00:00:00.000Z').toISOString() }),
...(state && { summaryActions }),
},
hasError,
taskRunError,
};
};

View file

@ -1826,6 +1826,8 @@ describe('Task Runner', () => {
});
test('recovers gracefully when the RuleType executor throws an exception', async () => {
const taskRunError = new Error(GENERIC_ERROR_MESSAGE);
ruleType.executor.mockImplementation(
async ({
services: executorServices,
@ -1837,7 +1839,7 @@ describe('Task Runner', () => {
string,
RuleAlertData
>) => {
throw new Error(GENERIC_ERROR_MESSAGE);
throw taskRunError;
}
);
@ -1855,7 +1857,7 @@ describe('Task Runner', () => {
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(generateRunnerResult({ successRatio: 0, hasError: true }));
expect(runnerResult).toEqual(generateRunnerResult({ successRatio: 0, taskRunError }));
testAlertingEventLogCalls({
status: 'error',
@ -1877,9 +1879,11 @@ describe('Task Runner', () => {
});
test('recovers gracefully when the Rule Task Runner throws an exception when loading rule to prepare for run', async () => {
const taskRunError = new Error(GENERIC_ERROR_MESSAGE);
// used in loadIndirectParams() which is called to load rule data
rulesClient.getAlertFromRaw.mockImplementation(() => {
throw new Error(GENERIC_ERROR_MESSAGE);
throw taskRunError;
});
const taskRunner = new TaskRunner({
@ -1895,7 +1899,7 @@ describe('Task Runner', () => {
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(generateRunnerResult({ successRatio: 0, hasError: true }));
expect(runnerResult).toEqual(generateRunnerResult({ successRatio: 0, taskRunError }));
testAlertingEventLogCalls({
setRuleName: false,
@ -1908,8 +1912,10 @@ describe('Task Runner', () => {
});
test('recovers gracefully when the Runner of a legacy Alert task which has no schedule throws an exception when fetching attributes', async () => {
rulesClient.get.mockImplementation(() => {
throw new Error(GENERIC_ERROR_MESSAGE);
const taskRunError = new TypeError(GENERIC_ERROR_MESSAGE);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockImplementation(() => {
throw taskRunError;
});
// legacy alerts used to run by returning a new `runAt` instead of using a schedule
@ -1919,18 +1925,15 @@ describe('Task Runner', () => {
const taskRunner = new TaskRunner({
ruleType,
taskInstance: legacyTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalled();
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce(mockedRawRuleSO);
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(
generateRunnerResult({ successRatio: 0, interval: '5m', hasError: true })
generateRunnerResult({ successRatio: 0, interval: '5m', taskRunError })
);
expect(mockUsageCounter.incrementCounter).not.toHaveBeenCalled();
});
@ -1977,10 +1980,6 @@ describe('Task Runner', () => {
});
test('avoids rescheduling a failed Alert Task Runner when it throws due to failing to fetch the alert', async () => {
rulesClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('alert', '1');
});
const taskRunner = new TaskRunner({
ruleType,
taskInstance: {
@ -1990,7 +1989,6 @@ describe('Task Runner', () => {
spaceId: 'foo',
},
},
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
@ -2021,10 +2019,6 @@ describe('Task Runner', () => {
});
test('reschedules for next schedule interval if es connectivity error encountered and schedule interval is less than connectivity retry', async () => {
rulesClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundEsUnavailableError('alert', '1');
});
const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
@ -2067,10 +2061,6 @@ describe('Task Runner', () => {
});
test('correctly logs warning when Alert Task Runner throws due to failing to fetch the alert in a space', async () => {
rulesClient.get.mockImplementation(() => {
throw SavedObjectsErrorHelpers.createGenericNotFoundError('alert', '1');
});
const taskRunner = new TaskRunner({
ruleType,
taskInstance: {
@ -2607,6 +2597,7 @@ describe('Task Runner', () => {
});
test('successfully stores failure runs', async () => {
const taskRunError = new Error(GENERIC_ERROR_MESSAGE);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
@ -2629,16 +2620,17 @@ describe('Task Runner', () => {
string,
RuleAlertData
>) => {
throw new Error(GENERIC_ERROR_MESSAGE);
throw taskRunError;
}
);
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(
generateRunnerResult({ successRatio: 0, success: false, hasError: true })
generateRunnerResult({ successRatio: 0, success: false, taskRunError })
);
});
test('successfully stores the success ratio', async () => {
const taskRunError = new Error(GENERIC_ERROR_MESSAGE);
const taskRunner = new TaskRunner({
ruleType,
taskInstance: mockedTaskInstance,
@ -2665,7 +2657,7 @@ describe('Task Runner', () => {
string,
RuleAlertData
>) => {
throw new Error(GENERIC_ERROR_MESSAGE);
throw taskRunError;
}
);
const runnerResult = await taskRunner.run();
@ -2674,7 +2666,7 @@ describe('Task Runner', () => {
generateRunnerResult({
successRatio: 0.75,
history: [true, true, true, false],
hasError: true,
taskRunError,
})
);
});

View file

@ -10,7 +10,12 @@ import { omit, some } from 'lodash';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import { v4 as uuidv4 } from 'uuid';
import { Logger } from '@kbn/core/server';
import { ConcreteTaskInstance, throwUnrecoverableError } from '@kbn/task-manager-plugin/server';
import {
ConcreteTaskInstance,
throwUnrecoverableError,
createTaskRunError,
TaskErrorSource,
} from '@kbn/task-manager-plugin/server';
import { nanosToMillis } from '@kbn/event-log-plugin/server';
import { DEFAULT_NAMESPACE_STRING } from '@kbn/core-saved-objects-utils-server';
import { ExecutionHandler, RunResult } from './execution_handler';
@ -937,7 +942,9 @@ export class TaskRunner<
return { interval: retryInterval };
}),
monitoring: this.ruleMonitoring.getMonitoring(),
hasError: isErr(schedule),
...(isErr(schedule)
? { taskRunError: createTaskRunError(schedule.error, TaskErrorSource.FRAMEWORK) }
: {}),
};
}

View file

@ -6,7 +6,7 @@
*/
import { KibanaRequest, Logger } from '@kbn/core/server';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { ConcreteTaskInstance, DecoratedError } from '@kbn/task-manager-plugin/server';
import { PublicMethodsOf } from '@kbn/utility-types';
import { ActionsClient } from '@kbn/actions-plugin/server/actions_client';
import { IAlertsClient } from '../alerts_client/types';
@ -33,7 +33,7 @@ export interface RuleTaskRunResult {
state: RuleTaskState;
monitoring: RuleMonitoring | undefined;
schedule: IntervalSchedule | undefined;
hasError: boolean;
taskRunError?: DecoratedError;
}
// This is the state of the alerting task after rule execution, which includes run metrics plus the task state

View file

@ -36,7 +36,12 @@ export {
isEphemeralTaskRejectedDueToCapacityError,
isSkipError,
createSkipError,
createTaskRunError,
TaskErrorSource,
} from './task_running';
export type { DecoratedError } from './task_running';
export type { RunNowResult, BulkUpdateTaskResult } from './task_scheduling';
export { getOldestIdleActionTask } from './queries/oldest_idle_action_task';
export {

View file

@ -13,6 +13,7 @@ import { Observable, Subject } from 'rxjs';
import { Option, none } from 'fp-ts/lib/Option';
import { Logger } from '@kbn/core/server';
import { TaskErrorSource } from '../task_running';
import { Result, asOk, asErr } from '../lib/result_type';
type WorkFn<H> = () => Promise<H>;
@ -128,10 +129,12 @@ function asPollingError<T>(err: string | Error, type: PollingErrorType, data: Op
export class PollingError<T> extends Error {
public readonly type: PollingErrorType;
public readonly data: Option<T>;
public readonly source: TaskErrorSource;
constructor(message: string, type: PollingErrorType, data: Option<T>) {
super(message);
Object.setPrototypeOf(this, new.target.prototype);
this.type = type;
this.data = data;
this.source = TaskErrorSource.FRAMEWORK;
}
}

View file

@ -8,6 +8,7 @@
import { ObjectType, schema, TypeOf } from '@kbn/config-schema';
import { isErr, tryAsResult } from './lib/result_type';
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
import { DecoratedError } from './task_running';
/*
* Type definitions and validations for tasks.
@ -47,7 +48,7 @@ export type SuccessfulRunResult = {
* recurring task). See the RunContext type definition for more details.
*/
state: Record<string, unknown>;
hasError?: boolean;
taskRunError?: DecoratedError;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
{
@ -75,7 +76,7 @@ export type FailedRunResult = SuccessfulRunResult & {
* If specified, indicates that the task failed to accomplish its work. This is
* logged out as a warning, and the task will be reattempted after a delay.
*/
error: Error;
error: DecoratedError;
};
export type RunResult = FailedRunResult | SuccessfulRunResult;

View file

@ -12,7 +12,7 @@ import { ConcreteTaskInstance } from './task';
import { Result, Err } from './lib/result_type';
import { ClaimAndFillPoolResult } from './lib/fill_pool';
import { PollingError } from './polling';
import { TaskRunResult } from './task_running';
import { DecoratedError, TaskRunResult } from './task_running';
import { EphemeralTaskInstanceRequest } from './ephemeral_task_lifecycle';
import type { EventLoopDelayConfig } from './config';
import { TaskManagerMetrics } from './metrics/task_metrics_collector';
@ -75,7 +75,7 @@ export interface RanTask {
isExpired: boolean;
}
export type ErroredTask = RanTask & {
error: Error;
error: DecoratedError;
};
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;

View file

@ -13,10 +13,17 @@ const CODE_SKIP = 'TaskManager/skip';
const code = Symbol('TaskManagerErrorCode');
const retry = Symbol('TaskManagerErrorRetry');
const source = Symbol('TaskManagerErrorSource');
export enum TaskErrorSource {
FRAMEWORK = 'framework',
USER = 'user',
}
export interface DecoratedError extends Error {
[code]?: string;
[retry]?: Date | boolean;
[source]?: TaskErrorSource;
}
export class EphemeralTaskRejectedDueToCapacityError extends Error {
@ -40,8 +47,9 @@ export function isUnrecoverableError(error: Error | DecoratedError) {
return isTaskManagerError(error) && error[code] === CODE_UNRECOVERABLE;
}
export function throwUnrecoverableError(error: Error) {
export function throwUnrecoverableError(error: Error, errorSource = TaskErrorSource.FRAMEWORK) {
(error as DecoratedError)[code] = CODE_UNRECOVERABLE;
(error as DecoratedError)[source] = errorSource;
throw error;
}
@ -52,14 +60,10 @@ export function isRetryableError(error: Error | DecoratedError) {
return null;
}
export function createRetryableError(error: Error, shouldRetry: Date | boolean): DecoratedError {
export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
(error as DecoratedError)[code] = CODE_RETRYABLE;
(error as DecoratedError)[retry] = shouldRetry;
return error;
}
export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
throw createRetryableError(error, shouldRetry);
throw error;
}
export function isSkipError(error: Error | DecoratedError) {
@ -74,6 +78,14 @@ export function createSkipError(error: Error): DecoratedError {
return error;
}
export function createTaskRunError(
error: Error,
errorSource = TaskErrorSource.FRAMEWORK
): DecoratedError {
(error as DecoratedError)[source] = errorSource;
return error;
}
export function isEphemeralTaskRejectedDueToCapacityError(
error: Error | EphemeralTaskRejectedDueToCapacityError
) {

View file

@ -9,7 +9,13 @@ import _ from 'lodash';
import sinon from 'sinon';
import { secondsFromNow } from '../lib/intervals';
import { asOk, asErr } from '../lib/result_type';
import { TaskManagerRunner, TaskRunningStage, TaskRunResult } from '.';
import {
createTaskRunError,
TaskErrorSource,
TaskManagerRunner,
TaskRunningStage,
TaskRunResult,
} from '.';
import {
TaskEvent,
asTaskRunEvent,
@ -1393,7 +1399,7 @@ describe('TaskManagerRunner', () => {
);
});
test('emits TaskEvent when a recurring task returns a success result with hasError=true', async () => {
test('emits TaskEvent when a recurring task returns a success result with taskRunError', async () => {
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
const onTaskEvent = jest.fn();
@ -1408,7 +1414,11 @@ describe('TaskManagerRunner', () => {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { runAt, state: {}, hasError: true };
return {
runAt,
state: {},
taskRunError: createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK),
};
},
}),
},
@ -1433,7 +1443,7 @@ describe('TaskManagerRunner', () => {
);
});
test('emits TaskEvent when a recurring task returns a success result with hasError=true but completes after timeout', async () => {
test('emits TaskEvent when a recurring task returns a success result with taskRunError but completes after timeout', async () => {
fakeTimer = sinon.useFakeTimers(new Date(2023, 1, 1, 0, 0, 0, 0).valueOf());
const id = _.random(1, 20).toString();
const runAt = minutesFromNow(_.random(5));
@ -1450,7 +1460,11 @@ describe('TaskManagerRunner', () => {
timeout: `1s`,
createTaskRunner: () => ({
async run() {
return { runAt, state: {}, hasError: true };
return {
runAt,
state: {},
taskRunError: createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK),
};
},
}),
},
@ -2061,6 +2075,8 @@ describe('TaskManagerRunner', () => {
});
test('does not resets skip attempts for a recurring task as long as there is an error', async () => {
const taskRunError = createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK);
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
@ -2086,7 +2102,10 @@ describe('TaskManagerRunner', () => {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, hasError: true };
return {
state: {},
taskRunError,
};
},
}),
indirectParamsSchema: schema.object({
@ -2103,7 +2122,12 @@ describe('TaskManagerRunner', () => {
const instance = store.update.mock.calls[0][0];
expect(instance.numSkippedRuns).toBe(mockTaskInstance.numSkippedRuns);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: {}, hasError: true }));
expect(result).toEqual(
asOk({
state: {},
taskRunError,
})
);
});
test('does not resets skip attempts for a non-recurring task as long as there is an error', async () => {
@ -2212,6 +2236,8 @@ describe('TaskManagerRunner', () => {
});
test('stops skipping when the max skip limit is reached', async () => {
const taskRunError = createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK);
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
@ -2238,7 +2264,10 @@ describe('TaskManagerRunner', () => {
return { data: { indirectParams: { baz: 'bar' } } };
},
async run() {
return { state: {}, hasError: true };
return {
state: {},
taskRunError,
};
},
}),
indirectParamsSchema: schema.object({
@ -2267,7 +2296,12 @@ describe('TaskManagerRunner', () => {
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has reached the max skip attempts for task bar/foo'
);
expect(result).toEqual(asOk({ state: {}, hasError: true }));
expect(result).toEqual(
asOk({
state: {},
taskRunError,
})
);
});
});
});

View file

@ -664,12 +664,12 @@ export class TaskManagerRunner implements TaskRunner {
skipAttempts,
}: SuccessfulRunResult & { attempts: number; skipAttempts: number }) => {
const { startedAt, schedule, numSkippedRuns } = this.instance.task;
const { hasError } = unwrap(result);
const { taskRunError } = unwrap(result);
let requeueInvalidTaskAttempts = skipAttempts || numSkippedRuns || 0;
// Alerting TaskRunner returns SuccessResult even though there is an error
// therefore we use "hasError" to be sure that there wasn't any error
if (isUndefined(skipAttempts) && !hasError) {
// therefore we use "taskRunError" to be sure that there wasn't any error
if (isUndefined(skipAttempts) && taskRunError === undefined) {
requeueInvalidTaskAttempts = 0;
}
@ -747,7 +747,7 @@ export class TaskManagerRunner implements TaskRunner {
await eitherAsync(
result,
async ({ runAt, schedule, hasError }: SuccessfulRunResult) => {
async ({ runAt, schedule, taskRunError }: SuccessfulRunResult) => {
const processedResult = {
task,
persistence:
@ -757,10 +757,11 @@ export class TaskManagerRunner implements TaskRunner {
: this.processResultWhenDone()),
};
// Alerting task runner returns SuccessfulRunResult with hasError=true
// Alerting task runner returns SuccessfulRunResult with taskRunError
// when the alerting task fails, so we check for this condition in order
// to emit the correct task run event for metrics collection
const taskRunEvent = hasError
// taskRunError contains the "source" (TaskErrorSource) data
const taskRunEvent = !!taskRunError
? asTaskRunEvent(
this.id,
asErr({
@ -804,7 +805,6 @@ export class TaskManagerRunner implements TaskRunner {
}
);
}
return result;
}