Skip tasks with invalid params (#158022)

Resolves: #155766 
Resolves: #159302

With this PR we aim to skip a task that has invalid direct and indirect
params.

In order to do that, 
1- We validate the task params before calling the subtask's run method
and skip if if the task params are invalid.
2- We skip execution of a subtask (rule, action etc) when the run method
of it returns a `SkipError`

Therefore, validations in the run methods needs to be moved to top of
the run method and executed before anything else to return skip if the
data is invalid.

We also added a config to enable/disable the skip feature, and define
the delay duration of task reschedule.

As this may become an infinitive loop, we are supposed to limit the
attempts.
Follow on issue to implement that: #159302

---------

Co-authored-by: Patryk Kopycinski <contact@patrykkopycinski.com>
Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ersin Erdal 2023-07-11 21:47:23 +02:00 committed by GitHub
parent d4e96665ec
commit cb853a1d9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
54 changed files with 3452 additions and 1620 deletions

View file

@ -15,6 +15,7 @@ import { licenseStateMock } from './lib/license_state.mock';
import { ActionsConfigurationUtilities } from './actions_config';
import { licensingMock } from '@kbn/licensing-plugin/server/mocks';
import { inMemoryMetricsMock } from './monitoring/in_memory_metrics.mock';
import { rawConnectorSchema } from './raw_connector_schema';
const mockTaskManager = taskManagerMock.createSetup();
const inMemoryMetrics = inMemoryMetricsMock.create();
@ -82,17 +83,18 @@ describe('actionTypeRegistry', () => {
});
expect(actionTypeRegistry.has('my-action-type')).toEqual(true);
expect(mockTaskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(mockTaskManager.registerTaskDefinitions.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"actions:my-action-type": Object {
"createTaskRunner": [Function],
"maxAttempts": 3,
"title": "My action type",
expect(mockTaskManager.registerTaskDefinitions.mock.calls[0]).toEqual(
expect.objectContaining([
{
'actions:my-action-type': {
createTaskRunner: expect.any(Function),
maxAttempts: 3,
title: 'My action type',
indirectParamsSchema: rawConnectorSchema,
},
},
},
]
`);
])
);
expect(actionTypeRegistryParams.licensing.featureUsage.register).toHaveBeenCalledWith(
'Connector: My action type',
'gold'

View file

@ -9,6 +9,7 @@ import Boom from '@hapi/boom';
import { i18n } from '@kbn/i18n';
import { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import { rawConnectorSchema } from './raw_connector_schema';
import { ActionType as CommonActionType, areValidFeatures } from '../common';
import { ActionsConfigurationUtilities } from './actions_config';
import { getActionTypeFeatureUsageName, TaskRunnerFactory, ILicenseState } from './lib';
@ -158,6 +159,7 @@ export class ActionTypeRegistry {
title: actionType.name,
maxAttempts,
createTaskRunner: (context: RunContext) => this.taskRunnerFactory.create(context),
indirectParamsSchema: rawConnectorSchema,
},
});
// No need to notify usage on basic action types

View file

@ -12,6 +12,7 @@ const createActionExecutorMock = () => {
initialize: jest.fn(),
execute: jest.fn().mockResolvedValue({ status: 'ok', actionId: '' }),
logCancellation: jest.fn(),
getActionInfoInternal: jest.fn(),
};
return mocked;
};

View file

@ -122,6 +122,7 @@ test('successfully executes', async () => {
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};
@ -264,6 +265,7 @@ test('successfully executes when http_request source is specified', async () =>
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};
@ -415,6 +417,7 @@ test('successfully executes when saved_object source is specified', async () =>
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};
@ -806,9 +809,9 @@ test('successfully executes as a task', async () => {
minimumLicenseRequired: 'basic',
supportedFeatureIds: ['alerting'],
validate: {
config: { schema: schema.object({}) },
secrets: { schema: schema.object({}) },
params: { schema: schema.object({}) },
config: { schema: schema.object({ bar: schema.boolean() }) },
secrets: { schema: schema.object({ baz: schema.boolean() }) },
params: { schema: schema.object({ foo: schema.boolean() }) },
},
executor: jest.fn(),
};
@ -818,6 +821,7 @@ test('successfully executes as a task', async () => {
attributes: {
name: '1',
actionTypeId: 'test',
isMissingSecrets: false,
config: {
bar: true,
},
@ -867,6 +871,9 @@ test('provides empty config when config and / or secrets is empty', async () =>
attributes: {
name: '1',
actionTypeId: 'test',
isMissingSecrets: false,
config: {},
secrets: {},
},
references: [],
};
@ -917,7 +924,7 @@ test('throws an error when config is invalid', async () => {
});
});
test('throws an error when connector is invalid', async () => {
test('returns an error when connector is invalid', async () => {
const actionType: jest.Mocked<ActionType> = {
id: 'test',
name: 'Test',
@ -939,6 +946,8 @@ test('throws an error when connector is invalid', async () => {
attributes: {
name: '1',
actionTypeId: 'test',
isMissingSecrets: false,
secrets: {},
},
references: [],
};
@ -1060,6 +1069,7 @@ test('should not throws an error if actionType is preconfigured', async () => {
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};
@ -1729,6 +1739,67 @@ test('writes usage data to event log for gen ai events', async () => {
});
});
test('does not fetches actionInfo if passed as param', async () => {
const actionType: jest.Mocked<ActionType> = {
id: 'test',
name: 'Test',
minimumLicenseRequired: 'basic',
supportedFeatureIds: ['alerting'],
validate: {
config: { schema: schema.object({ bar: schema.boolean() }) },
secrets: { schema: schema.object({ baz: schema.boolean() }) },
params: { schema: schema.object({ foo: schema.boolean() }) },
},
executor: jest.fn(),
};
const mockAction = {
id: '1',
type: 'action',
attributes: {
name: '1',
actionTypeId: 'test',
config: {
bar: true,
},
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};
const mockActionInfo = {
actionTypeId: mockAction.attributes.actionTypeId,
name: mockAction.attributes.name,
config: mockAction.attributes.config,
secrets: mockAction.attributes.secrets,
actionId: mockAction.id,
rawAction: mockAction.attributes,
};
actionTypeRegistry.get.mockReturnValueOnce(actionType);
await actionExecutor.execute({
...executeParams,
actionInfo: mockActionInfo,
});
expect(encryptedSavedObjectsClient.getDecryptedAsInternalUser).not.toHaveBeenCalled();
expect(actionType.executor).toHaveBeenCalledWith(
expect.objectContaining({
actionId: '1',
config: {
bar: true,
},
secrets: {
baz: true,
},
params: { foo: true },
})
);
});
function setupActionExecutorMock(actionTypeId = 'test') {
const actionType: jest.Mocked<ActionType> = {
id: 'test',
@ -1754,6 +1825,7 @@ function setupActionExecutorMock(actionTypeId = 'test') {
secrets: {
baz: true,
},
isMissingSecrets: false,
},
references: [],
};

View file

@ -28,6 +28,8 @@ import {
InMemoryConnector,
RawAction,
ValidatorServices,
ActionTypeSecrets,
ActionTypeConfig,
} from '../types';
import { EVENT_LOG_ACTIONS } from '../constants/event_log';
import { ActionExecutionSource } from './action_execution_source';
@ -52,6 +54,7 @@ export interface ActionExecutorContext {
export interface TaskInfo {
scheduled: Date;
attempts: number;
numSkippedRuns?: number;
}
export interface ExecuteOptions<Source = unknown> {
@ -62,6 +65,7 @@ export interface ExecuteOptions<Source = unknown> {
params: Record<string, unknown>;
source?: ActionExecutionSource<Source>;
taskInfo?: TaskInfo;
actionInfo?: ActionInfo;
executionId?: string;
consumer?: string;
relatedSavedObjects?: RelatedSavedObjects;
@ -95,6 +99,7 @@ export class ActionExecutor {
source,
isEphemeral,
taskInfo,
actionInfo: actionInfoFromTaskRunner,
executionId,
consumer,
relatedSavedObjects,
@ -112,37 +117,55 @@ export class ActionExecutor {
},
},
async (span) => {
const {
spaces,
getServices,
encryptedSavedObjectsClient,
actionTypeRegistry,
eventLogger,
inMemoryConnectors,
security,
} = this.actionExecutorContext!;
const { spaces, getServices, actionTypeRegistry, eventLogger, security } =
this.actionExecutorContext!;
const services = getServices(request);
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
const actionInfo = await getActionInfoInternal(
this.isESOCanEncrypt,
encryptedSavedObjectsClient,
inMemoryConnectors,
actionId,
namespace.namespace
);
const actionInfo =
actionInfoFromTaskRunner ||
(await this.getActionInfoInternal(actionId, request, namespace.namespace));
const { actionTypeId, name, config, secrets } = actionInfo;
const loggerId = actionTypeId.startsWith('.') ? actionTypeId.substring(1) : actionTypeId;
let { logger } = this.actionExecutorContext!;
logger = logger.get(loggerId);
if (!this.actionInfo || this.actionInfo.actionId !== actionId) {
this.actionInfo = actionInfo;
}
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
const actionType = actionTypeRegistry.get(actionTypeId);
const configurationUtilities = actionTypeRegistry.getUtils();
let validatedParams;
let validatedConfig;
let validatedSecrets;
try {
const validationResult = validateAction(
{
actionId,
actionType,
params,
config,
secrets,
taskInfo,
},
{ configurationUtilities }
);
validatedParams = validationResult.validatedParams;
validatedConfig = validationResult.validatedConfig;
validatedSecrets = validationResult.validatedSecrets;
} catch (err) {
return err.result;
}
const loggerId = actionTypeId.startsWith('.') ? actionTypeId.substring(1) : actionTypeId;
let { logger } = this.actionExecutorContext!;
logger = logger.get(loggerId);
if (span) {
span.name = `execute_action ${actionTypeId}`;
span.addLabels({
@ -150,11 +173,6 @@ export class ActionExecutor {
});
}
if (!actionTypeRegistry.isActionExecutable(actionId, actionTypeId, { notifyUsage: true })) {
actionTypeRegistry.ensureActionTypeEnabled(actionTypeId);
}
const actionType = actionTypeRegistry.get(actionTypeId);
const actionLabel = `${actionTypeId}:${actionId}: ${name}`;
logger.debug(`executing action ${actionLabel}`);
@ -205,18 +223,6 @@ export class ActionExecutor {
let rawResult: ActionTypeExecutorRawResult<unknown>;
try {
const configurationUtilities = actionTypeRegistry.getUtils();
const { validatedParams, validatedConfig, validatedSecrets } = validateAction(
{
actionId,
actionType,
params,
config,
secrets,
},
{ configurationUtilities }
);
rawResult = await actionType.executor({
actionId,
services,
@ -341,19 +347,12 @@ export class ActionExecutor {
source?: ActionExecutionSource<Source>;
consumer?: string;
}) {
const { spaces, encryptedSavedObjectsClient, inMemoryConnectors, eventLogger } =
this.actionExecutorContext!;
const { spaces, eventLogger } = this.actionExecutorContext!;
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
if (!this.actionInfo || this.actionInfo.actionId !== actionId) {
this.actionInfo = await getActionInfoInternal(
this.isESOCanEncrypt,
encryptedSavedObjectsClient,
inMemoryConnectors,
actionId,
namespace.namespace
);
this.actionInfo = await this.getActionInfoInternal(actionId, request, namespace.namespace);
}
const task = taskInfo
? {
@ -391,59 +390,67 @@ export class ActionExecutor {
eventLogger.logEvent(event);
}
}
interface ActionInfo {
actionTypeId: string;
name: string;
config: unknown;
secrets: unknown;
actionId: string;
isInMemory?: boolean;
}
public async getActionInfoInternal(
actionId: string,
request: KibanaRequest,
namespace: string | undefined
): Promise<ActionInfo> {
const { encryptedSavedObjectsClient, inMemoryConnectors } = this.actionExecutorContext!;
async function getActionInfoInternal(
isESOCanEncrypt: boolean,
encryptedSavedObjectsClient: EncryptedSavedObjectsClient,
inMemoryConnectors: InMemoryConnector[],
actionId: string,
namespace: string | undefined
): Promise<ActionInfo> {
// check to see if it's in memory action first
const inMemoryAction = inMemoryConnectors.find(
(inMemoryConnector) => inMemoryConnector.id === actionId
);
// check to see if it's in memory action first
const inMemoryAction = inMemoryConnectors.find(
(inMemoryConnector) => inMemoryConnector.id === actionId
);
if (inMemoryAction) {
return {
actionTypeId: inMemoryAction.actionTypeId,
name: inMemoryAction.name,
config: inMemoryAction.config,
secrets: inMemoryAction.secrets,
actionId,
isInMemory: true,
rawAction: { ...inMemoryAction, isMissingSecrets: false },
};
}
if (inMemoryAction) {
return {
actionTypeId: inMemoryAction.actionTypeId,
name: inMemoryAction.name,
config: inMemoryAction.config,
secrets: inMemoryAction.secrets,
if (!this.isESOCanEncrypt) {
throw new Error(
`Unable to execute action because the Encrypted Saved Objects plugin is missing encryption key. Please set xpack.encryptedSavedObjects.encryptionKey in the kibana.yml or use the bin/kibana-encryption-keys command.`
);
}
const rawAction = await encryptedSavedObjectsClient.getDecryptedAsInternalUser<RawAction>(
'action',
actionId,
isInMemory: true,
{
namespace: namespace === 'default' ? undefined : namespace,
}
);
const {
attributes: { secrets, actionTypeId, config, name },
} = rawAction;
return {
actionTypeId,
name,
config,
secrets,
actionId,
rawAction: rawAction.attributes,
};
}
}
if (!isESOCanEncrypt) {
throw new Error(
`Unable to execute action because the Encrypted Saved Objects plugin is missing encryption key. Please set xpack.encryptedSavedObjects.encryptionKey in the kibana.yml or use the bin/kibana-encryption-keys command.`
);
}
const {
attributes: { secrets, actionTypeId, config, name },
} = await encryptedSavedObjectsClient.getDecryptedAsInternalUser<RawAction>('action', actionId, {
namespace: namespace === 'default' ? undefined : namespace,
});
return {
actionTypeId,
name,
config,
secrets,
actionId,
};
export interface ActionInfo {
actionTypeId: string;
name: string;
config: ActionTypeConfig;
secrets: ActionTypeSecrets;
actionId: string;
isInMemory?: boolean;
rawAction: RawAction;
}
function actionErrorToMessage(result: ActionTypeExecutorRawResult<unknown>): string {
@ -466,12 +473,13 @@ interface ValidateActionOpts {
actionId: string;
actionType: ActionType;
params: Record<string, unknown>;
config: unknown;
secrets: unknown;
config: Record<string, unknown>;
secrets: Record<string, unknown>;
taskInfo?: TaskInfo;
}
function validateAction(
{ actionId, actionType, params, config, secrets }: ValidateActionOpts,
{ actionId, actionType, params, config, secrets, taskInfo }: ValidateActionOpts,
validatorServices: ValidatorServices
) {
let validatedParams: Record<string, unknown>;
@ -495,7 +503,7 @@ function validateAction(
actionId,
status: 'error',
message: err.message,
retry: false,
retry: !!taskInfo,
});
}
}

File diff suppressed because it is too large Load diff

View file

@ -9,29 +9,32 @@ import { v4 as uuidv4 } from 'uuid';
import { pick } from 'lodash';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/server';
import {
Logger,
CoreKibanaRequest,
IBasePath,
SavedObject,
Headers,
FakeRawRequest,
SavedObjectReference,
Headers,
IBasePath,
ISavedObjectsRepository,
Logger,
SavedObject,
SavedObjectReference,
} from '@kbn/core/server';
import { RunContext } from '@kbn/task-manager-plugin/server';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import {
LoadIndirectParamsResult,
RunContext,
throwRetryableError,
throwUnrecoverableError,
} from '@kbn/task-manager-plugin/server/task_running';
import { ActionExecutorContract } from './action_executor';
} from '@kbn/task-manager-plugin/server';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import { LoadedIndirectParams } from '@kbn/task-manager-plugin/server/task';
import { ActionExecutorContract, ActionInfo } from './action_executor';
import {
ActionTaskParams,
ActionTypeRegistryContract,
SpaceIdToNamespaceFunction,
ActionTypeExecutorResult,
ActionTaskExecutorParams,
ActionTaskParams,
ActionTypeExecutorResult,
ActionTypeRegistryContract,
isPersistedActionTask,
RawAction,
SpaceIdToNamespaceFunction,
} from '../types';
import { ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE } from '../constants/saved_objects';
import {
@ -53,6 +56,16 @@ export interface TaskRunnerContext {
savedObjectsRepository: ISavedObjectsRepository;
}
export interface ActionData extends LoadedIndirectParams<RawAction> {
indirectParams: RawAction;
actionInfo: ActionInfo;
taskParams: TaskParams;
}
export type ActionDataResult<T extends LoadedIndirectParams> = LoadIndirectParamsResult<T>;
type TaskParams = Omit<SavedObject<ActionTaskParams>, 'id' | 'type'>;
export class TaskRunnerFactory {
private isInitialized = false;
private taskRunnerContext?: TaskRunnerContext;
@ -89,14 +102,54 @@ export class TaskRunnerFactory {
const taskInfo = {
scheduled: taskInstance.runAt,
attempts: taskInstance.attempts,
numSkippedRuns: taskInstance.numSkippedRuns,
};
const actionExecutionId = uuidv4();
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
return {
async run() {
const { spaceId } = actionTaskExecutorParams;
let actionData: ActionDataResult<ActionData>;
return {
async loadIndirectParams(): Promise<ActionDataResult<ActionData>> {
try {
const taskParams = await getActionTaskParams(
actionTaskExecutorParams,
encryptedSavedObjectsClient,
spaceIdToNamespace
);
const { spaceId } = actionTaskExecutorParams;
const request = getFakeRequest(taskParams.attributes.apiKey);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
const actionInfo = await actionExecutor.getActionInfoInternal(
taskParams.attributes.actionId,
request,
namespace.namespace
);
actionData = {
data: {
indirectParams: actionInfo.rawAction,
taskParams,
actionInfo,
},
};
return actionData;
} catch (error) {
actionData = { error };
return { error };
}
},
async run() {
if (!actionData) {
actionData = await this.loadIndirectParams();
}
if (actionData.error) {
return throwRetryableError(actionData.error, true);
}
const { spaceId } = actionTaskExecutorParams;
const { taskParams, actionInfo } = actionData.data;
const {
attributes: {
actionId,
@ -108,11 +161,8 @@ export class TaskRunnerFactory {
relatedSavedObjects,
},
references,
} = await getActionTaskParams(
actionTaskExecutorParams,
encryptedSavedObjectsClient,
spaceIdToNamespace
);
} = taskParams;
const path = addSpaceIdToPath('/', spaceId);
const request = getFakeRequest(apiKey);
@ -126,6 +176,7 @@ export class TaskRunnerFactory {
isEphemeral: !isPersistedActionTask(actionTaskExecutorParams),
request,
taskInfo,
actionInfo,
executionId,
consumer,
relatedSavedObjects: validatedRelatedSavedObjects(logger, relatedSavedObjects),
@ -221,16 +272,14 @@ function getFakeRequest(apiKey?: string) {
// Since we're using API keys and accessing elasticsearch can only be done
// via a request, we're faking one with the proper authorization headers.
const fakeRequest = CoreKibanaRequest.from(fakeRawRequest);
return fakeRequest;
return CoreKibanaRequest.from(fakeRawRequest);
}
async function getActionTaskParams(
executorParams: ActionTaskExecutorParams,
encryptedSavedObjectsClient: EncryptedSavedObjectsClient,
spaceIdToNamespace: SpaceIdToNamespaceFunction
): Promise<Omit<SavedObject<ActionTaskParams>, 'id' | 'type'>> {
): Promise<TaskParams> {
const { spaceId } = executorParams;
const namespace = spaceIdToNamespace(spaceId);
if (isPersistedActionTask(executorParams)) {
@ -240,7 +289,6 @@ async function getActionTaskParams(
executorParams.actionTaskParamsId,
{ namespace }
);
const {
attributes: { relatedSavedObjects },
references,

View file

@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { rawConnectorSchema } from './raw_connector_schema';
const action = {
actionTypeId: '12345',
name: 'test-action-name',
isMissingSecrets: true,
config: {
foo: 'bar',
},
secrets: {
pass: 'foo',
},
isPreconfigured: false,
isSystemAction: false,
};
const preconfiguredAction = {
...action,
isPreconfigured: true,
id: '6789',
isDeprecated: false,
};
describe('Raw Connector Schema', () => {
test('valid action', () => {
expect(rawConnectorSchema.validate(action)).toEqual(action);
});
test('valid preconfigured action', () => {
expect(rawConnectorSchema.validate(preconfiguredAction)).toEqual(preconfiguredAction);
});
test('invalid action', () => {
expect(() => rawConnectorSchema.validate({ ...action, foo: 'bar' })).toThrow(
'[foo]: definition for this key is missing'
);
});
test('invalid action with missing params', () => {
const { name, ...actionWithoutName } = action;
expect(() => rawConnectorSchema.validate(actionWithoutName)).toThrow(
'[name]: expected value of type [string] but got [undefined]'
);
});
test('invalid preconfigured action', () => {
expect(() => rawConnectorSchema.validate({ ...preconfiguredAction, foo: '1' })).toThrow(
'[foo]: definition for this key is missing'
);
});
});

View file

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
export const rawConnectorSchema = schema.object({
actionTypeId: schema.string(),
name: schema.string(),
isMissingSecrets: schema.boolean(),
config: schema.recordOf(schema.string(), schema.any()),
secrets: schema.recordOf(schema.string(), schema.any()),
isPreconfigured: schema.maybe(schema.boolean()),
isSystemAction: schema.boolean(),
id: schema.maybe(schema.string()),
isDeprecated: schema.maybe(schema.boolean()),
});

View file

@ -17,7 +17,10 @@ const isFunction = (v: unknown): v is Function => {
const getConnectorErrorMsg = (actionId: string, connector: { id: string; name: string }) =>
`Connector id: ${actionId}. Connector name: ${connector.name}. Connector type: ${connector.id}`;
export const buildExecutor = <Config, Secrets>({
export const buildExecutor = <
Config extends Record<string, unknown>,
Secrets extends Record<string, unknown>
>({
configurationUtilities,
connector,
logger,

View file

@ -60,7 +60,11 @@ export interface ActionsPlugin {
}
// the parameters passed to an action type executor function
export interface ActionTypeExecutorOptions<Config, Secrets, Params> {
export interface ActionTypeExecutorOptions<
Config extends Record<string, unknown>,
Secrets extends Record<string, unknown>,
Params
> {
actionId: string;
services: Services;
config: Config;
@ -89,6 +93,7 @@ export interface InMemoryConnector<
Secrets extends ActionTypeSecrets = ActionTypeSecrets
> extends ActionResult<Config> {
secrets: Secrets;
config: Config;
}
export interface FindActionResult extends ActionResult {
@ -96,7 +101,12 @@ export interface FindActionResult extends ActionResult {
}
// signature of the action type executor function
export type ExecutorType<Config, Secrets, Params, ResultData> = (
export type ExecutorType<
Config extends Record<string, unknown>,
Secrets extends Record<string, unknown>,
Params,
ResultData
> = (
options: ActionTypeExecutorOptions<Config, Secrets, Params>
) => Promise<ActionTypeExecutorResult<ResultData>>;
@ -145,12 +155,12 @@ export interface ActionType<
executor: ExecutorType<Config, Secrets, Params, ExecutorResultData>;
}
export interface RawAction extends SavedObjectAttributes {
export interface RawAction extends Record<string, unknown> {
actionTypeId: string;
name: string;
isMissingSecrets: boolean;
config: SavedObjectAttributes;
secrets: SavedObjectAttributes;
config: Record<string, unknown>;
secrets: Record<string, unknown>;
}
export interface ActionTaskParams extends SavedObjectAttributes {

View file

@ -286,6 +286,7 @@ Object {
isDeprecated: false,
isSystemAction: false,
secrets: {},
config: {},
},
]);
@ -425,6 +426,7 @@ Object {
isDeprecated: false,
isSystemAction: false,
secrets: {},
config: {},
},
{
id: 'anotherServerLog',
@ -434,6 +436,7 @@ Object {
isDeprecated: false,
isSystemAction: false,
secrets: {},
config: {},
},
]);
@ -541,6 +544,7 @@ Object {
isDeprecated: false,
isSystemAction: false,
secrets: {},
config: {},
},
]);

View file

@ -38,6 +38,7 @@
"@kbn/core-saved-objects-utils-server",
"@kbn/core-saved-objects-api-server",
"@kbn/core-elasticsearch-server",
"@kbn/core-http-router-server-internal",
],
"exclude": [
"target/**/*",

View file

@ -170,7 +170,7 @@ export interface Rule<Params extends RuleTypeParams = never> {
actions: RuleAction[];
params: Params;
mapped_params?: MappedParams;
scheduledTaskId?: string;
scheduledTaskId?: string | null;
createdBy: string | null;
updatedBy: string | null;
createdAt: Date;

View file

@ -83,12 +83,15 @@ const action = {
alertGroup: 'aGroup',
};
let runDate: Date;
describe('AlertingEventLogger', () => {
let alertingEventLogger: AlertingEventLogger;
beforeAll(() => {
jest.useFakeTimers();
jest.setSystemTime(new Date(mockNow));
runDate = new Date();
});
beforeEach(() => {
@ -115,28 +118,28 @@ describe('AlertingEventLogger', () => {
describe('start()', () => {
test('should throw error if alertingEventLogger has not been initialized', () => {
expect(() => alertingEventLogger.start()).toThrowErrorMatchingInlineSnapshot(
expect(() => alertingEventLogger.start(runDate)).toThrowErrorMatchingInlineSnapshot(
`"AlertingEventLogger not initialized"`
);
});
test('should throw error if alertingEventLogger rule context is null', () => {
alertingEventLogger.initialize(null as unknown as RuleContextOpts);
expect(() => alertingEventLogger.start()).toThrowErrorMatchingInlineSnapshot(
expect(() => alertingEventLogger.start(runDate)).toThrowErrorMatchingInlineSnapshot(
`"AlertingEventLogger not initialized"`
);
});
test('should throw error if alertingEventLogger rule context is undefined', () => {
alertingEventLogger.initialize(undefined as unknown as RuleContextOpts);
expect(() => alertingEventLogger.start()).toThrowErrorMatchingInlineSnapshot(
expect(() => alertingEventLogger.start(runDate)).toThrowErrorMatchingInlineSnapshot(
`"AlertingEventLogger not initialized"`
);
});
test('should call eventLogger "startTiming" and "logEvent"', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
expect(eventLogger.startTiming).toHaveBeenCalledTimes(1);
expect(eventLogger.logEvent).toHaveBeenCalledTimes(1);
@ -154,7 +157,7 @@ describe('AlertingEventLogger', () => {
mockEventLoggerStartTiming();
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
const event = initializeExecuteRecord(contextWithScheduleDelay);
expect(alertingEventLogger.getEvent()).toEqual({
@ -183,7 +186,7 @@ describe('AlertingEventLogger', () => {
test('should update event with rule name correctly', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setRuleName('my-super-cool-rule');
const event = initializeExecuteRecord(contextWithScheduleDelay);
@ -215,7 +218,7 @@ describe('AlertingEventLogger', () => {
mockEventLoggerStartTiming();
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setRuleName('my-super-cool-rule');
alertingEventLogger.setExecutionSucceeded('success!');
@ -260,7 +263,7 @@ describe('AlertingEventLogger', () => {
mockEventLoggerStartTiming();
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setExecutionFailed('rule failed!', 'something went wrong!');
const event = initializeExecuteRecord(contextWithScheduleDelay);
@ -301,7 +304,7 @@ describe('AlertingEventLogger', () => {
it('should update event maintenance window IDs correctly', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setMaintenanceWindowIds([]);
const event = initializeExecuteRecord(contextWithScheduleDelay);
@ -453,7 +456,7 @@ describe('AlertingEventLogger', () => {
test('should log event if no status or metrics are provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({});
const event = initializeExecuteRecord(contextWithScheduleDelay);
@ -463,7 +466,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: { lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'), status: 'active' },
});
@ -484,7 +487,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is error', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -523,7 +526,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is error and uses "unknown" if no reason is provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -562,7 +565,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is error and does not overwrite existing error message', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -605,7 +608,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is warning', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -640,7 +643,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is warning and uses "unknown" if no reason is provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -675,7 +678,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution status if execution status is warning and uses existing message if no message is provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
status: {
lastExecutionDate: new Date('2022-05-05T15:59:54.480Z'),
@ -712,7 +715,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution metrics if provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
metrics: {
numberOfTriggeredActions: 1,
@ -763,7 +766,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution timings if provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
timings: {
[TaskRunnerTimerSpan.StartTaskRun]: 10,
@ -810,7 +813,7 @@ describe('AlertingEventLogger', () => {
test('should set fields from execution metrics and timings if both provided', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
metrics: {
numberOfTriggeredActions: 1,
@ -879,7 +882,7 @@ describe('AlertingEventLogger', () => {
test('should set fields to 0 execution metrics are provided but undefined', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.done({
metrics: {} as unknown as RuleRunMetrics,
});
@ -919,7 +922,7 @@ describe('AlertingEventLogger', () => {
test('overwrites the message when the final status is error', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setExecutionSucceeded('success message');
expect(alertingEventLogger.getEvent()!.message).toBe('success message');
@ -937,7 +940,7 @@ describe('AlertingEventLogger', () => {
test('does not overwrites the message when there is already a failure message', () => {
alertingEventLogger.initialize(context);
alertingEventLogger.start();
alertingEventLogger.start(runDate);
alertingEventLogger.setExecutionFailed('first failure message', 'failure error message');
expect(alertingEventLogger.getEvent()!.message).toBe('first failure message');

View file

@ -92,12 +92,12 @@ export class AlertingEventLogger {
this.ruleContext = context;
}
public start() {
public start(runDate: Date) {
if (!this.isInitialized || !this.ruleContext) {
throw new Error('AlertingEventLogger not initialized');
}
this.startTime = new Date();
this.startTime = runDate;
const context = {
...this.ruleContext,

View file

@ -0,0 +1,269 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
const executionStatusWarningReason = schema.oneOf([
schema.literal('maxExecutableActions'),
schema.literal('maxAlerts'),
]);
const executionStatusErrorReason = schema.oneOf([
schema.literal('read'),
schema.literal('decrypt'),
schema.literal('execute'),
schema.literal('unknown'),
schema.literal('license'),
schema.literal('timeout'),
schema.literal('disabled'),
schema.literal('validate'),
]);
const rawRuleExecutionStatusSchema = schema.object({
status: schema.oneOf([
schema.literal('ok'),
schema.literal('active'),
schema.literal('error'),
schema.literal('pending'),
schema.literal('unknown'),
schema.literal('warning'),
]),
lastExecutionDate: schema.string(),
lastDuration: schema.maybe(schema.number()),
error: schema.nullable(
schema.object({
reason: executionStatusErrorReason,
message: schema.string(),
})
),
warning: schema.nullable(
schema.object({
reason: executionStatusWarningReason,
message: schema.string(),
})
),
});
const ISOWeekdaysSchema = schema.oneOf([
schema.literal(1),
schema.literal(2),
schema.literal(3),
schema.literal(4),
schema.literal(5),
schema.literal(6),
schema.literal(7),
]);
const rRuleSchema = schema.object({
dtstart: schema.string(),
tzid: schema.string(),
freq: schema.maybe(
schema.oneOf([
schema.literal(0),
schema.literal(1),
schema.literal(2),
schema.literal(3),
schema.literal(4),
schema.literal(5),
schema.literal(6),
])
),
until: schema.maybe(schema.string()),
count: schema.maybe(schema.number()),
interval: schema.maybe(schema.number()),
wkst: schema.maybe(
schema.oneOf([
schema.literal('MO'),
schema.literal('TU'),
schema.literal('WE'),
schema.literal('TH'),
schema.literal('FR'),
schema.literal('SA'),
schema.literal('SU'),
])
),
byweekday: schema.maybe(schema.arrayOf(schema.oneOf([schema.string(), schema.number()]))),
bymonth: schema.maybe(schema.number()),
bysetpos: schema.maybe(schema.number()),
bymonthday: schema.maybe(schema.number()),
byyearday: schema.maybe(schema.number()),
byweekno: schema.maybe(schema.number()),
byhour: schema.maybe(schema.number()),
byminute: schema.maybe(schema.number()),
bysecond: schema.maybe(schema.number()),
});
const outcome = schema.oneOf([
schema.literal('succeeded'),
schema.literal('warning'),
schema.literal('failed'),
]);
const rawRuleLastRunSchema = schema.object({
outcome,
outcomeOrder: schema.maybe(schema.number()),
alertsCount: schema.object({
new: schema.maybe(schema.nullable(schema.number())),
active: schema.maybe(schema.nullable(schema.number())),
recovered: schema.maybe(schema.nullable(schema.number())),
ignored: schema.maybe(schema.nullable(schema.number())),
}),
outcomeMsg: schema.maybe(schema.nullable(schema.arrayOf(schema.string()))),
warning: schema.maybe(
schema.nullable(schema.oneOf([executionStatusErrorReason, executionStatusWarningReason]))
),
});
const rawRuleMonitoringSchema = schema.object({
run: schema.object({
history: schema.arrayOf(
schema.object({
success: schema.boolean(),
timestamp: schema.number(),
duration: schema.maybe(schema.number()),
outcome: schema.maybe(outcome),
})
),
calculated_metrics: schema.object({
p50: schema.maybe(schema.number()),
p95: schema.maybe(schema.number()),
p99: schema.maybe(schema.number()),
success_ratio: schema.number(),
}),
last_run: schema.object({
timestamp: schema.string(),
metrics: schema.object({
duration: schema.maybe(schema.number()),
total_search_duration_ms: schema.maybe(schema.nullable(schema.number())),
total_indexing_duration_ms: schema.maybe(schema.nullable(schema.number())),
total_alerts_detected: schema.maybe(schema.nullable(schema.number())),
total_alerts_created: schema.maybe(schema.nullable(schema.number())),
gap_duration_s: schema.maybe(schema.nullable(schema.number())),
}),
}),
}),
});
const rawRuleAlertsFilterSchema = schema.object({
query: schema.maybe(
schema.object({
kql: schema.string(),
filters: schema.arrayOf(
schema.object({
query: schema.maybe(schema.recordOf(schema.string(), schema.any())),
meta: schema.object({
alias: schema.maybe(schema.nullable(schema.string())),
disabled: schema.maybe(schema.boolean()),
negate: schema.maybe(schema.boolean()),
controlledBy: schema.maybe(schema.string()),
group: schema.maybe(schema.string()),
index: schema.maybe(schema.string()),
isMultiIndex: schema.maybe(schema.boolean()),
type: schema.maybe(schema.string()),
key: schema.maybe(schema.string()),
params: schema.maybe(schema.recordOf(schema.string(), schema.any())), // better type?
value: schema.maybe(schema.string()),
}),
state$: schema.maybe(
schema.object({
store: schema.oneOf([schema.literal('appState'), schema.literal('globalState')]),
})
),
})
),
dsl: schema.maybe(schema.string()),
})
),
timeframe: schema.maybe(
schema.object({
days: schema.arrayOf(ISOWeekdaysSchema),
hours: schema.object({
start: schema.string(),
end: schema.string(),
}),
timezone: schema.string(),
})
),
});
const rawRuleActionSchema = schema.object({
uuid: schema.maybe(schema.string()),
group: schema.string(),
actionRef: schema.string(),
actionTypeId: schema.string(),
params: schema.recordOf(schema.string(), schema.any()),
frequency: schema.maybe(
schema.object({
summary: schema.boolean(),
notifyWhen: schema.oneOf([
schema.literal('onActionGroupChange'),
schema.literal('onActiveAlert'),
schema.literal('onThrottleInterval'),
]),
throttle: schema.nullable(schema.string()),
})
),
alertsFilter: schema.maybe(rawRuleAlertsFilterSchema),
});
export const rawRuleSchema = schema.object({
name: schema.string(),
enabled: schema.boolean(),
consumer: schema.string(),
tags: schema.arrayOf(schema.string()),
alertTypeId: schema.string(),
apiKeyOwner: schema.nullable(schema.string()),
apiKey: schema.nullable(schema.string()),
apiKeyCreatedByUser: schema.maybe(schema.nullable(schema.boolean())),
createdBy: schema.nullable(schema.string()),
updatedBy: schema.nullable(schema.string()),
updatedAt: schema.string(),
createdAt: schema.string(),
muteAll: schema.boolean(),
mutedInstanceIds: schema.arrayOf(schema.string()),
throttle: schema.maybe(schema.nullable(schema.string())),
revision: schema.number(),
running: schema.maybe(schema.nullable(schema.boolean())),
schedule: schema.object({
interval: schema.string(),
}),
legacyId: schema.nullable(schema.string()),
scheduledTaskId: schema.maybe(schema.nullable(schema.string())),
isSnoozedUntil: schema.maybe(schema.nullable(schema.string())),
snoozeSchedule: schema.maybe(
schema.arrayOf(
schema.object({
duration: schema.number(),
rRule: rRuleSchema,
id: schema.maybe(schema.string()),
skipRecurrences: schema.maybe(schema.arrayOf(schema.string())),
})
)
),
meta: schema.maybe(schema.object({ versionApiKeyLastmodified: schema.maybe(schema.string()) })),
actions: schema.arrayOf(rawRuleActionSchema),
executionStatus: rawRuleExecutionStatusSchema,
notifyWhen: schema.maybe(
schema.nullable(
schema.oneOf([
schema.literal('onActionGroupChange'),
schema.literal('onActiveAlert'),
schema.literal('onThrottleInterval'),
])
)
),
monitoring: schema.maybe(rawRuleMonitoringSchema),
lastRun: schema.maybe(schema.nullable(rawRuleLastRunSchema)),
nextRun: schema.maybe(schema.nullable(schema.string())),
mapped_params: schema.maybe(
schema.object({
risk_score: schema.maybe(schema.number()),
severity: schema.maybe(schema.string()),
})
),
params: schema.recordOf(schema.string(), schema.any()),
});

View file

@ -17,6 +17,7 @@ import { inMemoryMetricsMock } from './monitoring/in_memory_metrics.mock';
import { alertsServiceMock } from './alerts_service/alerts_service.mock';
import { schema } from '@kbn/config-schema';
import { RecoveredActionGroupId } from '../common';
import { rawRuleSchema } from './raw_rule_schema';
const logger = loggingSystemMock.create().get();
let mockedLicenseState: jest.Mocked<ILicenseState>;
@ -434,17 +435,17 @@ describe('Create Lifecycle', () => {
const registry = new RuleTypeRegistry(ruleTypeRegistryParams);
registry.register(ruleType);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"alerting:test": Object {
"createTaskRunner": [Function],
"timeout": "20m",
"title": "Test",
},
},
]
`);
expect(taskManager.registerTaskDefinitions.mock.calls[0]).toEqual([
{
'alerting:test': {
createTaskRunner: expect.any(Function),
paramsSchema: expect.any(Object),
indirectParamsSchema: rawRuleSchema,
timeout: '20m',
title: 'Test',
},
},
]);
});
test('shallow clones the given rule type', () => {

View file

@ -13,6 +13,7 @@ import { intersection } from 'lodash';
import { Logger } from '@kbn/core/server';
import { LicensingPluginSetup } from '@kbn/licensing-plugin/server';
import { RunContext, TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { rawRuleSchema } from './raw_rule_schema';
import { TaskRunnerFactory } from './task_runner';
import {
RuleType,
@ -290,9 +291,14 @@ export class RuleTypeRegistry {
RecoveryActionGroupId | RecoveredActionGroupId,
AlertData
>(normalizedRuleType, context, this.inMemoryMetrics),
paramsSchema: schema.object({
alertId: schema.string(),
spaceId: schema.string(),
consumer: schema.maybe(schema.string()),
}),
indirectParamsSchema: rawRuleSchema,
},
});
if (this.alertsService && ruleType.alerts) {
this.alertsService.register(ruleType.alerts);
}

View file

@ -23,7 +23,7 @@ import {
IntervalSchedule,
SanitizedRule,
RuleSnoozeSchedule,
RawAlertsFilter,
RawRuleAlertsFilter,
} from '../types';
import { AlertingAuthorization } from '../authorization';
import { AlertingRulesConfig } from '../config';
@ -79,7 +79,7 @@ export type NormalizedAlertActionWithGeneratedValues = Omit<
'uuid' | 'alertsFilter'
> & {
uuid: string;
alertsFilter?: RawAlertsFilter;
alertsFilter?: RawRuleAlertsFilter;
};
export interface RegistryAlertTypeWithAuth extends RegistryRuleType {

View file

@ -17,7 +17,6 @@ import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event
import {
GetSummarizedAlertsFnOpts,
parseDuration,
RawRule,
CombinedSummarizedAlerts,
ThrottledActions,
} from '../types';
@ -84,7 +83,7 @@ export class ExecutionHandler<
private taskRunnerContext: TaskRunnerContext;
private taskInstance: RuleTaskInstance;
private ruleRunMetricsStore: RuleRunMetricsStore;
private apiKey: RawRule['apiKey'];
private apiKey: string | null;
private ruleConsumer: string;
private executionId: string;
private ruleLabel: string;

View file

@ -6,6 +6,7 @@
*/
import { TaskStatus } from '@kbn/task-manager-plugin/server';
import { SavedObject } from '@kbn/core/server';
import {
Rule,
RuleTypeParams,
@ -13,10 +14,12 @@ import {
RuleMonitoring,
RuleLastRunOutcomeOrderMap,
RuleLastRunOutcomes,
SanitizedRule,
} from '../../common';
import { getDefaultMonitoring } from '../lib/monitoring';
import { UntypedNormalizedRuleType } from '../rule_type_registry';
import { EVENT_LOG_ACTIONS } from '../plugin';
import { RawRule } from '../types';
interface GeneratorParams {
[key: string]: string | number | boolean | undefined | object[] | boolean[] | object;
@ -31,17 +34,6 @@ export const DATE_1970_5_MIN = '1969-12-31T23:55:00.000Z';
export const DATE_9999 = '9999-12-31T12:34:56.789Z';
export const MOCK_DURATION = '86400000000000';
export const SAVED_OBJECT = {
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
consumer: 'bar',
enabled: true,
},
references: [],
};
export const RULE_ACTIONS = [
{
actionTypeId: 'action',
@ -216,6 +208,83 @@ export const mockedRuleTypeSavedObject: Rule<RuleTypeParams> = {
revision: 0,
};
export const mockedRawRuleSO: SavedObject<RawRule> = {
id: '1',
type: 'alert',
references: [],
attributes: {
legacyId: '1',
consumer: 'bar',
createdAt: mockDate.toString(),
updatedAt: mockDate.toString(),
throttle: null,
muteAll: false,
notifyWhen: 'onActiveAlert',
enabled: true,
alertTypeId: ruleType.id,
apiKey: 'MTIzOmFiYw==',
apiKeyOwner: 'elastic',
schedule: { interval: '10s' },
name: RULE_NAME,
tags: ['rule-', '-tags'],
createdBy: 'rule-creator',
updatedBy: 'rule-updater',
mutedInstanceIds: [],
params: {
bar: true,
},
actions: [
{
group: 'default',
actionTypeId: 'action',
params: {
foo: true,
},
uuid: '111-111',
actionRef: '1',
},
{
group: RecoveredActionGroup.id,
actionTypeId: 'action',
params: {
isResolved: true,
},
uuid: '222-222',
actionRef: '2',
},
],
executionStatus: {
status: 'unknown',
lastExecutionDate: new Date('2020-08-20T19:23:38Z').toString(),
error: null,
warning: null,
},
monitoring: getDefaultMonitoring('2020-08-20T19:23:38Z'),
revision: 0,
},
};
export const mockedRule: SanitizedRule<typeof mockedRawRuleSO.attributes.params> = {
id: mockedRawRuleSO.id,
...mockedRawRuleSO.attributes,
nextRun: undefined,
createdAt: new Date(mockedRawRuleSO.attributes.createdAt),
updatedAt: new Date(mockedRawRuleSO.attributes.updatedAt),
executionStatus: {
...mockedRawRuleSO.attributes.executionStatus,
lastExecutionDate: new Date(mockedRawRuleSO.attributes.executionStatus.lastExecutionDate),
error: undefined,
warning: undefined,
},
actions: mockedRawRuleSO.attributes.actions.map((action) => {
return {
...action,
id: action.uuid,
};
}),
isSnoozedUntil: undefined,
};
export const mockTaskInstance = () => ({
id: '',
attempts: 0,
@ -283,6 +352,7 @@ export const generateRunnerResult = ({
alertInstances = {},
alertRecoveredInstances = {},
summaryActions = {},
hasError = false,
}: GeneratorParams = {}) => {
return {
monitoring: {
@ -315,6 +385,7 @@ export const generateRunnerResult = ({
...(state && { previousStartedAt: new Date('1970-01-01T00:00:00.000Z') }),
...(state && { summaryActions }),
},
hasError,
};
};

View file

@ -9,14 +9,15 @@ import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/s
import { CoreKibanaRequest } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { getRuleAttributes, getFakeKibanaRequest, loadRule } from './rule_loader';
import { getRuleAttributes, getFakeKibanaRequest, validateRule } from './rule_loader';
import { TaskRunnerContext } from './task_runner_factory';
import { ruleTypeRegistryMock } from '../rule_type_registry.mock';
import { rulesClientMock } from '../rules_client.mock';
import { Rule } from '../types';
import { MONITORING_HISTORY_LIMIT, RuleExecutionStatusErrorReasons } from '../../common';
import { getReasonFromError } from '../lib/error_with_reason';
import { ErrorWithReason, getReasonFromError } from '../lib/error_with_reason';
import { alertingEventLoggerMock } from '../lib/alerting_event_logger/alerting_event_logger.mock';
import { mockedRawRuleSO, mockedRule } from './fixtures';
// create mocks
const rulesClient = rulesClientMock.create();
@ -26,35 +27,57 @@ const encryptedSavedObjects = encryptedSavedObjectsMock.createClient();
const mockBasePathService = { set: jest.fn() };
// assign default parameters/data
const apiKey = 'rule-apikey';
const apiKey = mockedRawRuleSO.attributes.apiKey!;
const ruleId = 'rule-id-1';
const enabled = true;
const spaceId = 'rule-spaceId';
const ruleName = 'rule-name';
const consumer = 'rule-consumer';
const ruleTypeId = 'rule-type-id';
const ruleParams = { paramA: 42 };
const ruleName = mockedRule.name;
const consumer = mockedRule.consumer;
const ruleTypeId = mockedRule.alertTypeId;
const ruleParams = mockedRule.params;
describe('rule_loader', () => {
let context: TaskRunnerContext;
let contextMock: ReturnType<typeof getTaskRunnerContext>;
const paramValidator = schema.object({
paramA: schema.number(),
bar: schema.boolean(),
});
const DefaultLoadRuleParams = {
const getDefaultValidateRuleParams = ({
fakeRequest,
error,
enabled: ruleEnabled = true,
params = mockedRule.params,
}: {
fakeRequest: CoreKibanaRequest<unknown, unknown, unknown>;
error?: ErrorWithReason;
enabled?: boolean;
params?: typeof mockedRule.params;
}) => ({
paramValidator,
ruleId,
spaceId,
ruleTypeRegistry,
alertingEventLogger,
};
ruleData: error
? { error }
: {
data: {
indirectParams: { ...mockedRawRuleSO.attributes, enabled: ruleEnabled },
rule: { ...mockedRule, params },
rulesClient,
version: '1',
fakeRequest,
},
},
});
beforeEach(() => {
jest.resetAllMocks();
encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(
mockGetDecrypted({
...mockedRawRuleSO.attributes,
apiKey,
enabled,
consumer,
@ -68,10 +91,14 @@ describe('rule_loader', () => {
jest.restoreAllMocks();
});
describe('loadRule()', () => {
describe('validateRule()', () => {
describe('succeeds', () => {
test('with API key, a full execution history, and validator', async () => {
const result = await loadRule({ ...DefaultLoadRuleParams, context });
test('validates and returns the results', () => {
const fakeRequest = getFakeKibanaRequest(context, 'default', apiKey);
const result = validateRule({
...getDefaultValidateRuleParams({ fakeRequest }),
context,
});
expect(result.apiKey).toBe(apiKey);
expect(result.validatedParams).toEqual(ruleParams);
@ -79,57 +106,38 @@ describe('rule_loader', () => {
expect(result.rule.alertTypeId).toBe(ruleTypeId);
expect(result.rule.name).toBe(ruleName);
expect(result.rule.params).toBe(ruleParams);
expect(result.rule.monitoring?.run.history.length).toBe(MONITORING_HISTORY_LIMIT - 1);
});
test('without API key, any execution history, or validator', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(
mockGetDecrypted({ enabled, consumer })
);
contextMock = getTaskRunnerContext(ruleParams, 0);
context = contextMock as unknown as TaskRunnerContext;
const result = await loadRule({
...DefaultLoadRuleParams,
context,
paramValidator: undefined,
});
expect(result.apiKey).toBe(undefined);
expect(result.validatedParams).toEqual(ruleParams);
expect(result.fakeRequest.headers.authorization).toBe(undefined);
expect(result.rule.alertTypeId).toBe(ruleTypeId);
expect(result.rule.name).toBe(ruleName);
expect(result.rule.params).toBe(ruleParams);
expect(result.rule.monitoring?.run.history.length).toBe(0);
expect(result.indirectParams).toEqual(mockedRawRuleSO.attributes);
expect(result.version).toBe('1');
expect(result.rulesClient).toBe(rulesClient);
});
});
test('throws when cannot decrypt attributes', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(() => {
throw new Error('eso-error: 42');
});
test('throws when there is decrypt attributes error', () => {
const fakeRequest = getFakeKibanaRequest(context, 'default', apiKey);
let outcome = 'success';
try {
await loadRule({ ...DefaultLoadRuleParams, context });
validateRule({
...getDefaultValidateRuleParams({
fakeRequest,
error: new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, new Error('test')),
}),
context,
});
} catch (err) {
outcome = 'failure';
expect(err.message).toBe('eso-error: 42');
expect(getReasonFromError(err)).toBe(RuleExecutionStatusErrorReasons.Decrypt);
}
expect(outcome).toBe('failure');
});
test('throws when rule is not enabled', async () => {
encryptedSavedObjects.getDecryptedAsInternalUser.mockImplementation(
mockGetDecrypted({ apiKey, enabled: false, consumer })
);
const fakeRequest = getFakeKibanaRequest(context, 'default', apiKey);
let outcome = 'success';
try {
await loadRule({ ...DefaultLoadRuleParams, context });
validateRule({
...getDefaultValidateRuleParams({ fakeRequest, enabled: false }),
context,
});
} catch (err) {
outcome = 'failure';
expect(getReasonFromError(err)).toBe(RuleExecutionStatusErrorReasons.Disabled);
@ -138,13 +146,17 @@ describe('rule_loader', () => {
});
test('throws when rule type is not enabled', async () => {
const fakeRequest = getFakeKibanaRequest(context, 'default', apiKey);
ruleTypeRegistry.ensureRuleTypeEnabled.mockImplementation(() => {
throw new Error('rule-type-not-enabled: 2112');
});
let outcome = 'success';
try {
await loadRule({ ...DefaultLoadRuleParams, context });
validateRule({
...getDefaultValidateRuleParams({ fakeRequest }),
context,
});
} catch (err) {
outcome = 'failure';
expect(err.message).toBe('rule-type-not-enabled: 2112');
@ -154,20 +166,16 @@ describe('rule_loader', () => {
});
test('throws when rule params fail validation', async () => {
const parameterValidator = schema.object({
paramA: schema.string(),
});
const fakeRequest = getFakeKibanaRequest(context, 'default', apiKey);
let outcome = 'success';
try {
await loadRule({
...DefaultLoadRuleParams,
validateRule({
...getDefaultValidateRuleParams({ fakeRequest, params: { bar: 'foo' } }),
context,
paramValidator: parameterValidator,
});
} catch (err) {
outcome = 'failure';
expect(err.message).toMatch('[paramA]: expected value of type [string] but got [number]');
expect(err.message).toMatch('[bar]: expected value of type [boolean] but got [string]');
expect(getReasonFromError(err)).toBe(RuleExecutionStatusErrorReasons.Validate);
}
expect(outcome).toBe('failure');
@ -179,11 +187,14 @@ describe('rule_loader', () => {
contextMock.spaceIdToNamespace.mockReturnValue(undefined);
const result = await getRuleAttributes(context, ruleId, 'default');
expect(result.apiKey).toBe(apiKey);
expect(result.consumer).toBe(consumer);
expect(result.enabled).toBe(true);
expect(result.fakeRequest).toEqual(expect.any(CoreKibanaRequest));
expect(result.rule.alertTypeId).toBe(ruleTypeId);
expect(result.indirectParams).toEqual({
...mockedRawRuleSO.attributes,
apiKey,
enabled,
consumer,
});
expect(result.rulesClient).toBeTruthy();
expect(contextMock.spaceIdToNamespace.mock.calls[0]).toEqual(['default']);
@ -195,13 +206,16 @@ describe('rule_loader', () => {
contextMock.spaceIdToNamespace.mockReturnValue(spaceId);
const result = await getRuleAttributes(context, ruleId, spaceId);
expect(result.apiKey).toBe(apiKey);
expect(result.consumer).toBe(consumer);
expect(result.enabled).toBe(true);
expect(result.fakeRequest).toEqual(expect.any(CoreKibanaRequest));
expect(result.rule.alertTypeId).toBe(ruleTypeId);
expect(result.rulesClient).toBeTruthy();
expect(contextMock.spaceIdToNamespace.mock.calls[0]).toEqual([spaceId]);
expect(result.indirectParams).toEqual({
...mockedRawRuleSO.attributes,
apiKey,
enabled,
consumer,
});
const esoArgs = encryptedSavedObjects.getDecryptedAsInternalUser.mock.calls[0];
expect(esoArgs).toEqual(['alert', ruleId, { namespace: spaceId }]);
@ -231,7 +245,7 @@ describe('rule_loader', () => {
Array [
Object {
"headers": Object {
"authorization": "ApiKey rule-apikey",
"authorization": "ApiKey MTIzOmFiYw==",
},
"path": "/",
},
@ -250,7 +264,7 @@ describe('rule_loader', () => {
Array [
Object {
"headers": Object {
"authorization": "ApiKey rule-apikey",
"authorization": "ApiKey MTIzOmFiYw==",
},
"path": "/",
},

View file

@ -5,9 +5,13 @@
* 2.0.
*/
import { PublicMethodsOf } from '@kbn/utility-types';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/server';
import { CoreKibanaRequest, FakeRawRequest, Headers } from '@kbn/core/server';
import { PublicMethodsOf } from '@kbn/utility-types';
import {
LoadedIndirectParams,
LoadIndirectParamsResult,
} from '@kbn/task-manager-plugin/server/task';
import { TaskRunnerContext } from './task_runner_factory';
import { ErrorWithReason, validateRuleTypeParams } from '../lib';
import {
@ -21,46 +25,56 @@ import {
import { MONITORING_HISTORY_LIMIT, RuleTypeParams } from '../../common';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
export interface LoadRuleParams<Params extends RuleTypeParams> {
export interface RuleData<Params extends RuleTypeParams> extends LoadedIndirectParams<RawRule> {
indirectParams: RawRule;
rule: SanitizedRule<Params>;
version: string | undefined;
fakeRequest: CoreKibanaRequest;
rulesClient: RulesClientApi;
}
export type RuleDataResult<T extends LoadedIndirectParams> = LoadIndirectParamsResult<T>;
export interface ValidatedRuleData<Params extends RuleTypeParams> extends RuleData<Params> {
validatedParams: Params;
apiKey: string | null;
}
interface ValidateRuleParams<Params extends RuleTypeParams> {
alertingEventLogger: PublicMethodsOf<AlertingEventLogger>;
paramValidator?: RuleTypeParamsValidator<Params>;
ruleId: string;
spaceId: string;
context: TaskRunnerContext;
ruleTypeRegistry: RuleTypeRegistry;
alertingEventLogger: PublicMethodsOf<AlertingEventLogger>;
ruleData: RuleDataResult<RuleData<Params>>;
}
export async function loadRule<Params extends RuleTypeParams>(params: LoadRuleParams<Params>) {
const { paramValidator, ruleId, spaceId, context, ruleTypeRegistry, alertingEventLogger } =
params;
let enabled: boolean;
let apiKey: string | null;
let rule: SanitizedRule<Params>;
let fakeRequest: CoreKibanaRequest;
let rulesClient: RulesClientApi;
let version: string | undefined;
try {
const attributes = await getRuleAttributes<Params>(context, ruleId, spaceId);
apiKey = attributes.apiKey;
enabled = attributes.enabled;
rule = attributes.rule;
fakeRequest = attributes.fakeRequest;
rulesClient = attributes.rulesClient;
version = attributes.version;
} catch (err) {
throw new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, err);
export function validateRule<Params extends RuleTypeParams>(
params: ValidateRuleParams<Params>
): ValidatedRuleData<Params> {
if (params.ruleData.error) {
throw params.ruleData.error;
}
const {
ruleData: {
data: { indirectParams, rule, fakeRequest, rulesClient, version },
},
ruleTypeRegistry,
paramValidator,
alertingEventLogger,
} = params;
const { enabled, apiKey } = indirectParams;
if (!enabled) {
throw new ErrorWithReason(
RuleExecutionStatusErrorReasons.Disabled,
new Error(`Rule failed to execute because rule ran after it was disabled.`)
);
}
alertingEventLogger.setRuleName(rule.name);
try {
ruleTypeRegistry.ensureRuleTypeEnabled(rule.alertTypeId);
} catch (err) {
@ -83,6 +97,7 @@ export async function loadRule<Params extends RuleTypeParams>(params: LoadRulePa
return {
rule,
indirectParams,
fakeRequest,
apiKey,
rulesClient,
@ -95,15 +110,7 @@ export async function getRuleAttributes<Params extends RuleTypeParams>(
context: TaskRunnerContext,
ruleId: string,
spaceId: string
): Promise<{
apiKey: string | null;
enabled: boolean;
consumer: string;
rule: SanitizedRule<Params>;
fakeRequest: CoreKibanaRequest;
rulesClient: RulesClientApi;
version?: string;
}> {
): Promise<RuleData<Params>> {
const namespace = context.spaceIdToNamespace(spaceId);
const rawRule = await context.encryptedSavedObjectsClient.getDecryptedAsInternalUser<RawRule>(
@ -126,9 +133,7 @@ export async function getRuleAttributes<Params extends RuleTypeParams>(
return {
rule,
version: rawRule.version,
apiKey: rawRule.attributes.apiKey,
enabled: rawRule.attributes.enabled,
consumer: rawRule.attributes.consumer,
indirectParams: rawRule.attributes,
fakeRequest,
rulesClient,
};

View file

@ -35,7 +35,7 @@ import {
RuleTypeRegistry,
RawRuleLastRun,
} from '../types';
import { asErr, asOk, isOk, map, resolveErr, Result } from '../lib/result_type';
import { asErr, asOk, isErr, isOk, map, resolveErr, Result } from '../lib/result_type';
import { taskInstanceToAlertTaskInstance } from './alert_task_instance';
import { isAlertSavedObjectNotFoundError, isEsUnavailableError } from '../lib/is_alerting_error';
import { partiallyUpdateAlert } from '../saved_objects';
@ -65,7 +65,13 @@ import { IExecutionStatusAndMetrics } from '../lib/rule_execution_status';
import { RuleRunMetricsStore } from '../lib/rule_run_metrics_store';
import { wrapSearchSourceClient } from '../lib/wrap_search_source_client';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
import { loadRule } from './rule_loader';
import {
getRuleAttributes,
RuleData,
RuleDataResult,
ValidatedRuleData,
validateRule,
} from './rule_loader';
import { TaskRunnerTimer, TaskRunnerTimerSpan } from './task_runner_timer';
import { RuleMonitoringService } from '../monitoring/rule_monitoring_service';
import { ILastRun, lastRunFromState, lastRunToRaw } from '../lib/last_run_status';
@ -82,11 +88,36 @@ interface StackTraceLog {
stackTrace?: string;
}
interface TaskRunnerConstructorParams<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
AlertState extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
AlertData extends RuleAlertData
> {
ruleType: NormalizedRuleType<
Params,
ExtractedParams,
RuleState,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>;
taskInstance: ConcreteTaskInstance;
context: TaskRunnerContext;
inMemoryMetrics: InMemoryMetrics;
}
export class TaskRunner<
Params extends RuleTypeParams,
ExtractedParams extends RuleTypeParams,
RuleState extends RuleTypeState,
State extends AlertInstanceState,
AlertState extends AlertInstanceState,
Context extends AlertInstanceContext,
ActionGroupIds extends string,
RecoveryActionGroupId extends string,
@ -100,7 +131,7 @@ export class TaskRunner<
Params,
ExtractedParams,
RuleState,
State,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId,
@ -119,22 +150,24 @@ export class TaskRunner<
private ruleMonitoring: RuleMonitoringService;
private ruleRunning: RunningHandler;
private ruleResult: RuleResultService;
private ruleData?: RuleDataResult<RuleData<Params>>;
private runDate = new Date();
constructor(
ruleType: NormalizedRuleType<
Params,
ExtractedParams,
RuleState,
State,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>,
taskInstance: ConcreteTaskInstance,
context: TaskRunnerContext,
inMemoryMetrics: InMemoryMetrics
) {
constructor({
ruleType,
taskInstance,
context,
inMemoryMetrics,
}: TaskRunnerConstructorParams<
Params,
ExtractedParams,
RuleState,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>) {
this.context = context;
const loggerId = ruleType.id.startsWith('.') ? ruleType.id.substring(1) : ruleType.id;
this.logger = context.logger.get(loggerId);
@ -238,7 +271,6 @@ export class TaskRunner<
private async runRule({
fakeRequest,
rulesClient,
rule,
apiKey,
validatedParams: params,
@ -304,7 +336,7 @@ export class TaskRunner<
// of the LegacyAlertsClient and into the AlertsClient.
let alertsClient: IAlertsClient<
AlertData,
State,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId
@ -314,7 +346,7 @@ export class TaskRunner<
const client =
(await this.context.alertsService?.createAlertsClient<
AlertData,
State,
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId
@ -326,7 +358,7 @@ export class TaskRunner<
alertsClient = client
? client
: new LegacyAlertsClient<State, Context, ActionGroupIds, RecoveryActionGroupId>(
: new LegacyAlertsClient<AlertState, Context, ActionGroupIds, RecoveryActionGroupId>(
alertsClientParams
);
} catch (err) {
@ -334,9 +366,12 @@ export class TaskRunner<
`Error initializing AlertsClient for context ${this.ruleType.alerts?.context}. Using legacy alerts client instead. - ${err.message}`
);
alertsClient = new LegacyAlertsClient<State, Context, ActionGroupIds, RecoveryActionGroupId>(
alertsClientParams
);
alertsClient = new LegacyAlertsClient<
AlertState,
Context,
ActionGroupIds,
RecoveryActionGroupId
>(alertsClientParams);
}
await alertsClient.initializeExecution({
@ -579,7 +614,11 @@ export class TaskRunner<
/**
* Initialize event logger, load and validate the rule
*/
private async prepareToRun() {
private async prepareToRun(): Promise<ValidatedRuleData<Params>> {
if (!this.ruleData) {
this.ruleData = await this.loadIndirectParams();
}
const {
params: { alertId: ruleId, spaceId, consumer },
} = this.taskInstance;
@ -616,25 +655,24 @@ export class TaskRunner<
...(namespace ? { namespace } : {}),
});
this.alertingEventLogger.start();
this.alertingEventLogger.start(this.runDate);
return await loadRule<Params>({
return validateRule({
alertingEventLogger: this.alertingEventLogger,
ruleData: this.ruleData,
paramValidator: this.ruleType.validate.params,
ruleId,
spaceId,
context: this.context,
ruleTypeRegistry: this.ruleTypeRegistry,
alertingEventLogger: this.alertingEventLogger,
});
}
private async processRunResults({
nextRun,
runDate,
stateWithMetrics,
}: {
nextRun: string | null;
runDate: Date;
stateWithMetrics: Result<RuleTaskStateAndMetrics, Error>;
}) {
const {
@ -650,8 +688,8 @@ export class TaskRunner<
IExecutionStatusAndMetrics
>(
stateWithMetrics,
(ruleRunStateWithMetrics) => executionStatusFromState(ruleRunStateWithMetrics, runDate),
(err: ElasticsearchError) => executionStatusFromError(err, runDate)
(ruleRunStateWithMetrics) => executionStatusFromState(ruleRunStateWithMetrics, this.runDate),
(err: ElasticsearchError) => executionStatusFromError(err, this.runDate)
);
// New consolidated statuses for lastRun
@ -704,7 +742,7 @@ export class TaskRunner<
this.ruleMonitoring.addHistory({
duration: executionStatus.lastDuration,
hasError: executionStatus.error != null,
runDate,
runDate: this.runDate,
});
if (!this.cancelled) {
@ -730,6 +768,23 @@ export class TaskRunner<
return { executionStatus, executionMetrics };
}
async loadIndirectParams(): Promise<RuleDataResult<RuleData<Params>>> {
this.runDate = new Date();
return await this.timer.runWithTimer(TaskRunnerTimerSpan.PrepareRule, async () => {
try {
const {
params: { alertId: ruleId, spaceId },
} = this.taskInstance;
const data = await getRuleAttributes<Params>(this.context, ruleId, spaceId);
this.ruleData = { data };
} catch (err) {
const error = new ErrorWithReason(RuleExecutionStatusErrorReasons.Decrypt, err);
this.ruleData = { error };
}
return this.ruleData;
});
}
async run(): Promise<RuleTaskRunResult> {
const {
params: { alertId: ruleId, spaceId },
@ -739,8 +794,10 @@ export class TaskRunner<
} = this.taskInstance;
this.ruleRunning.start(ruleId, this.context.spaceIdToNamespace(spaceId));
const runDate = new Date();
this.logger.debug(`executing rule ${this.ruleType.id}:${ruleId} at ${runDate.toISOString()}`);
this.logger.debug(
`executing rule ${this.ruleType.id}:${ruleId} at ${this.runDate.toISOString()}`
);
if (startedAt) {
// Capture how long it took for the rule to start running after being claimed
@ -750,10 +807,8 @@ export class TaskRunner<
let stateWithMetrics: Result<RuleTaskStateAndMetrics, Error>;
let schedule: Result<IntervalSchedule, Error>;
try {
const preparedResult = await this.timer.runWithTimer(
TaskRunnerTimerSpan.PrepareRule,
async () => this.prepareToRun()
);
const preparedResult = await this.prepareToRun();
this.ruleMonitoring.setMonitoring(preparedResult.rule.monitoring);
(async () => {
@ -772,18 +827,8 @@ export class TaskRunner<
// fetch the rule again to ensure we return the correct schedule as it may have
// changed during the task execution
schedule = asOk(
(
await loadRule<Params>({
paramValidator: this.ruleType.validate.params,
ruleId,
spaceId,
context: this.context,
ruleTypeRegistry: this.ruleTypeRegistry,
alertingEventLogger: this.alertingEventLogger,
})
).rule.schedule
);
const attributes = await getRuleAttributes<Params>(this.context, ruleId, spaceId);
schedule = asOk(attributes.rule.schedule);
} catch (err) {
stateWithMetrics = asErr(err);
schedule = asErr(err);
@ -801,7 +846,6 @@ export class TaskRunner<
async () =>
this.processRunResults({
nextRun,
runDate,
stateWithMetrics,
})
);
@ -873,6 +917,7 @@ export class TaskRunner<
return { interval: retryInterval };
}),
monitoring: this.ruleMonitoring.getMonitoring(),
hasError: isErr(schedule),
};
}

View file

@ -42,13 +42,13 @@ import {
mockedRuleTypeSavedObject,
ruleType,
RULE_NAME,
SAVED_OBJECT,
generateRunnerResult,
RULE_ACTIONS,
generateSavedObjectParams,
mockTaskInstance,
DATE_1970,
DATE_1970_5_MIN,
mockedRawRuleSO,
} from './fixtures';
import { dataPluginMock } from '@kbn/data-plugin/server/mocks';
import { AlertingEventLogger } from '../lib/alerting_event_logger/alerting_event_logger';
@ -240,22 +240,22 @@ describe('Task Runner', () => {
hasReachedAlertLimit: false,
triggeredActionsStatus: 'complete',
});
const taskRunner = new TaskRunner(
ruleTypeWithAlerts,
{
const taskRunner = new TaskRunner({
ruleType: ruleTypeWithAlerts,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
taskRunnerFactoryInitializerParams,
inMemoryMetrics
);
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
await taskRunner.run();
@ -332,24 +332,24 @@ describe('Task Runner', () => {
.spyOn(alertsService, 'getContextInitializationPromise')
.mockResolvedValue({ result: true });
const taskRunner = new TaskRunner(
ruleTypeWithAlerts,
{
const taskRunner = new TaskRunner({
ruleType: ruleTypeWithAlerts,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
{
context: {
...taskRunnerFactoryInitializerParams,
alertsService,
},
inMemoryMetrics
);
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
const runnerResult = await taskRunner.run();
expect(runnerResult).toEqual(generateRunnerResult({ state: true, history: [true] }));
@ -448,18 +448,18 @@ describe('Task Runner', () => {
}
);
const taskRunner = new TaskRunner(
ruleTypeWithAlerts,
mockedTaskInstance,
{
const taskRunner = new TaskRunner({
ruleType: ruleTypeWithAlerts,
taskInstance: mockedTaskInstance,
context: {
...taskRunnerFactoryInitializerParams,
alertsService,
},
inMemoryMetrics
);
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
await taskRunner.run();
expect(ruleType.executor).toHaveBeenCalledTimes(1);
@ -551,22 +551,22 @@ describe('Task Runner', () => {
hasReachedAlertLimit: false,
triggeredActionsStatus: 'complete',
});
const taskRunner = new TaskRunner(
ruleTypeWithAlerts,
{
const taskRunner = new TaskRunner({
ruleType: ruleTypeWithAlerts,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
taskRunnerFactoryInitializerParams,
inMemoryMetrics
);
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
await taskRunner.run();
@ -634,22 +634,22 @@ describe('Task Runner', () => {
hasReachedAlertLimit: false,
triggeredActionsStatus: 'complete',
});
const taskRunner = new TaskRunner(
ruleTypeWithAlerts,
{
const taskRunner = new TaskRunner({
ruleType: ruleTypeWithAlerts,
taskInstance: {
...mockedTaskInstance,
state: {
...mockedTaskInstance.state,
previousStartedAt: new Date(Date.now() - 5 * 60 * 1000).toISOString(),
},
},
{ ...taskRunnerFactoryInitializerParams, alertsService: null },
inMemoryMetrics
);
context: { ...taskRunnerFactoryInitializerParams, alertsService: null },
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(SAVED_OBJECT);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
await taskRunner.run();

View file

@ -49,6 +49,7 @@ import {
generateAlertOpts,
DATE_1970,
generateActionOpts,
mockedRawRuleSO,
} from './fixtures';
import { EVENT_LOG_ACTIONS } from '../plugin';
import { SharePluginStart } from '@kbn/share-plugin/server';
@ -183,16 +184,7 @@ describe('Task Runner Cancel', () => {
);
rulesClient.getAlertFromRaw.mockReturnValue(mockedRuleTypeSavedObject as Rule);
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue({
id: '1',
type: 'alert',
attributes: {
apiKey: Buffer.from('123:abc').toString('base64'),
enabled: true,
consumer: 'bar',
},
references: [],
});
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
taskRunnerFactoryInitializerParams.actionsPlugin.isActionTypeEnabled.mockReturnValue(true);
taskRunnerFactoryInitializerParams.actionsPlugin.isActionExecutable.mockReturnValue(true);
alertingEventLogger.getStartAndDuration.mockImplementation(() => ({ start: new Date() }));
@ -201,12 +193,12 @@ describe('Task Runner Cancel', () => {
});
test('updates rule saved object execution status and writes to event log entry when task is cancelled mid-execution', async () => {
const taskRunner = new TaskRunner(
const taskRunner = new TaskRunner({
ruleType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams,
inMemoryMetrics
);
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
const promise = taskRunner.run();
@ -297,15 +289,15 @@ describe('Task Runner Cancel', () => {
}
);
// setting cancelAlertsOnRuleTimeout to false here
const taskRunner = new TaskRunner(
const taskRunner = new TaskRunner({
ruleType,
mockedTaskInstance,
{
taskInstance: mockedTaskInstance,
context: {
...taskRunnerFactoryInitializerParams,
cancelAlertsOnRuleTimeout: false,
},
inMemoryMetrics
);
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
const promise = taskRunner.run();
@ -366,12 +358,12 @@ describe('Task Runner Cancel', () => {
}
);
// setting cancelAlertsOnRuleTimeout for ruleType to false here
const taskRunner = new TaskRunner(
updatedRuleType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams,
inMemoryMetrics
);
const taskRunner = new TaskRunner({
ruleType: updatedRuleType,
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
const promise = taskRunner.run();
@ -428,12 +420,12 @@ describe('Task Runner Cancel', () => {
return { state: {} };
}
);
const taskRunner = new TaskRunner(
const taskRunner = new TaskRunner({
ruleType,
mockedTaskInstance,
taskRunnerFactoryInitializerParams,
inMemoryMetrics
);
taskInstance: mockedTaskInstance,
context: taskRunnerFactoryInitializerParams,
inMemoryMetrics,
});
expect(AlertingEventLogger).toHaveBeenCalledTimes(1);
const promise = taskRunner.run();

View file

@ -118,6 +118,11 @@ export class TaskRunnerFactory {
ActionGroupIds,
RecoveryActionGroupId,
AlertData
>(ruleType, taskInstance, this.taskRunnerContext!, inMemoryMetrics);
>({
ruleType,
taskInstance,
context: this.taskRunnerContext!,
inMemoryMetrics,
});
}
}

View file

@ -32,6 +32,7 @@ export interface RuleTaskRunResult {
state: RuleTaskState;
monitoring: RuleMonitoring | undefined;
schedule: IntervalSchedule | undefined;
hasError: boolean;
}
// This is the state of the alerting task after rule execution, which includes run metrics plus the task state
@ -39,11 +40,6 @@ export type RuleTaskStateAndMetrics = RuleTaskState & {
metrics: RuleRunMetrics;
};
export type RuleRunResult = Pick<RuleTaskRunResult, 'monitoring' | 'schedule'> & {
rulesClient: RulesClientApi;
stateWithMetrics: RuleTaskStateAndMetrics;
};
export interface RunRuleParams<Params extends RuleTypeParams> {
fakeRequest: KibanaRequest;
rulesClient: RulesClientApi;

View file

@ -22,7 +22,8 @@ import {
} from '@kbn/core/server';
import type { PublicMethodsOf } from '@kbn/utility-types';
import { SharePluginStart } from '@kbn/share-plugin/server';
import { Alert, type FieldMap } from '@kbn/alerts-as-data-utils';
import type { FieldMap } from '@kbn/alerts-as-data-utils';
import { Alert } from '@kbn/alerts-as-data-utils';
import { Filter } from '@kbn/es-query';
import { RuleTypeRegistry as OrigruleTypeRegistry } from './rule_type_registry';
import { PluginSetupContract, PluginStartContract } from './plugin';
@ -321,50 +322,10 @@ export type UntypedRuleType = RuleType<
AlertInstanceContext
>;
export interface RawAlertsFilter extends AlertsFilter {
query?: {
kql: string;
filters: Filter[];
dsl: string;
};
timeframe?: AlertsFilterTimeframe;
}
export interface RawRuleAction extends SavedObjectAttributes {
uuid: string;
group: string;
actionRef: string;
actionTypeId: string;
params: RuleActionParams;
frequency?: {
summary: boolean;
notifyWhen: RuleNotifyWhenType;
throttle: string | null;
};
alertsFilter?: RawAlertsFilter;
}
export interface RuleMeta extends SavedObjectAttributes {
versionApiKeyLastmodified?: string;
}
// note that the `error` property is "null-able", as we're doing a partial
// update on the rule when we update this data, but need to ensure we
// delete any previous error if the current status has no error
export interface RawRuleExecutionStatus extends SavedObjectAttributes {
status: RuleExecutionStatuses;
lastExecutionDate: string;
lastDuration?: number;
error: null | {
reason: RuleExecutionStatusErrorReasons;
message: string;
};
warning: null | {
reason: RuleExecutionStatusWarningReasons;
message: string;
};
}
export type PartialRule<Params extends RuleTypeParams = never> = Pick<Rule<Params>, 'id'> &
Partial<Omit<Rule<Params>, 'id'>>;
@ -383,40 +344,6 @@ export type PartialRuleWithLegacyId<Params extends RuleTypeParams = never> = Pic
> &
Partial<Omit<RuleWithLegacyId<Params>, 'id'>>;
export interface RawRule extends SavedObjectAttributes {
enabled: boolean;
name: string;
tags: string[];
alertTypeId: string; // this cannot be renamed since it is in the saved object
consumer: string;
legacyId: string | null;
schedule: IntervalSchedule;
actions: RawRuleAction[];
params: SavedObjectAttributes;
mapped_params?: MappedParams;
scheduledTaskId?: string | null;
createdBy: string | null;
updatedBy: string | null;
createdAt: string;
updatedAt: string;
apiKey: string | null;
apiKeyOwner: string | null;
apiKeyCreatedByUser?: boolean | null;
throttle?: string | null;
notifyWhen?: RuleNotifyWhenType | null;
muteAll: boolean;
mutedInstanceIds: string[];
meta?: RuleMeta;
executionStatus: RawRuleExecutionStatus;
monitoring?: RawRuleMonitoring;
snoozeSchedule?: RuleSnooze; // Remove ? when this parameter is made available in the public API
isSnoozedUntil?: string | null;
lastRun?: RawRuleLastRun | null;
nextRun?: string | null;
revision: number;
running?: boolean | null;
}
export interface AlertingPlugin {
setup: PluginSetupContract;
start: PluginStartContract;
@ -469,3 +396,77 @@ export type PublicRuleResultService = PublicLastRunSetters;
export interface RawRuleLastRun extends SavedObjectAttributes, RuleLastRun {}
export interface RawRuleMonitoring extends SavedObjectAttributes, RuleMonitoring {}
export interface RawRuleAlertsFilter extends AlertsFilter {
query?: {
kql: string;
filters: Filter[];
dsl: string;
};
timeframe?: AlertsFilterTimeframe;
}
export interface RawRuleAction extends SavedObjectAttributes {
uuid: string;
group: string;
actionRef: string;
actionTypeId: string;
params: RuleActionParams;
frequency?: {
summary: boolean;
notifyWhen: RuleNotifyWhenType;
throttle: string | null;
};
alertsFilter?: RawRuleAlertsFilter;
}
// note that the `error` property is "null-able", as we're doing a partial
// update on the rule when we update this data, but need to ensure we
// delete any previous error if the current status has no error
export interface RawRuleExecutionStatus extends SavedObjectAttributes {
status: RuleExecutionStatuses;
lastExecutionDate: string;
lastDuration?: number;
error: null | {
reason: RuleExecutionStatusErrorReasons;
message: string;
};
warning: null | {
reason: RuleExecutionStatusWarningReasons;
message: string;
};
}
export interface RawRule extends SavedObjectAttributes {
enabled: boolean;
name: string;
tags: string[];
alertTypeId: string; // this cannot be renamed since it is in the saved object
consumer: string;
legacyId: string | null;
schedule: IntervalSchedule;
actions: RawRuleAction[];
params: SavedObjectAttributes;
mapped_params?: MappedParams;
scheduledTaskId?: string | null;
createdBy: string | null;
updatedBy: string | null;
createdAt: string;
updatedAt: string;
apiKey: string | null;
apiKeyOwner: string | null;
apiKeyCreatedByUser?: boolean | null;
throttle?: string | null;
notifyWhen?: RuleNotifyWhenType | null;
muteAll: boolean;
mutedInstanceIds: string[];
meta?: RuleMeta;
executionStatus: RawRuleExecutionStatus;
monitoring?: RawRuleMonitoring;
snoozeSchedule?: RuleSnooze; // Remove ? when this parameter is made available in the public API
isSnoozedUntil?: string | null;
lastRun?: RawRuleLastRun | null;
nextRun?: string | null;
revision: number;
running?: boolean | null;
}

View file

@ -40,6 +40,11 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
@ -93,6 +98,11 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
@ -149,6 +159,11 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],

View file

@ -51,6 +51,12 @@ const eventLoopDelaySchema = schema.object({
}),
});
const requeueInvalidTasksConfig = schema.object({
enabled: schema.boolean({ defaultValue: false }),
delay: schema.number({ defaultValue: 3000, min: 0 }),
max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }),
});
export const configSchema = schema.object(
{
/* The maximum number of times a task will be attempted before being abandoned as failed */
@ -137,6 +143,7 @@ export const configSchema = schema.object(
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
}),
requeue_invalid_tasks: requeueInvalidTasksConfig,
allow_reading_invalid_state: schema.boolean({ defaultValue: true }),
},
{
@ -152,6 +159,7 @@ export const configSchema = schema.object(
}
);
export type RequeueInvalidTasksConfig = TypeOf<typeof requeueInvalidTasksConfig>;
export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;

View file

@ -79,6 +79,11 @@ describe('EphemeralTaskLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
...config,
},
elasticsearchAndSOAvailability$,

View file

@ -19,6 +19,7 @@ export type {
TaskRunCreatorFunction,
RunContext,
IntervalSchedule,
LoadIndirectParamsResult,
} from './task';
export { TaskStatus } from './task';
@ -29,7 +30,10 @@ export { asInterval } from './lib/intervals';
export {
isUnrecoverableError,
throwUnrecoverableError,
throwRetryableError,
isEphemeralTaskRejectedDueToCapacityError,
isSkipError,
createSkipError,
} from './task_running';
export type { RunNowResult, BulkUpdateTaskResult } from './task_scheduling';
export { getOldestIdleActionTask } from './queries/oldest_idle_action_task';

View file

@ -74,6 +74,11 @@ describe('managed configuration', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
});
logger = context.logger.get('taskManager');

View file

@ -47,6 +47,11 @@ describe('Configuration Statistics Aggregator', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
};
const managedConfig = {

View file

@ -51,6 +51,11 @@ describe('createMonitoringStatsStream', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
};
it('returns the initial config used to configure Task Manager', async () => {

View file

@ -72,6 +72,11 @@ const pluginInitializerContextParams = {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
};
describe('TaskManagerPlugin', () => {

View file

@ -77,6 +77,11 @@ describe('TaskPollingLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,

View file

@ -209,6 +209,7 @@ export class TaskPollingLifecycle {
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
requeueInvalidTasksConfig: this.config.requeue_invalid_tasks,
});
};

View file

@ -909,7 +909,7 @@ if (doc['task.runAt'].size()!=0) {
mockInstance({
id: 'aaa',
runAt,
taskType: 'foo',
taskType: 'yawn',
schedule: undefined,
attempts: 0,
status: TaskStatus.Claiming,
@ -982,7 +982,7 @@ if (doc['task.runAt'].size()!=0) {
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'claiming',
taskType: 'foo',
taskType: 'yawn',
user: 'jimbo',
ownerId: taskManagerId,
},
@ -997,7 +997,7 @@ if (doc['task.runAt'].size()!=0) {
mockInstance({
id: 'aaa',
runAt,
taskType: 'foo',
taskType: 'yawn',
schedule: undefined,
attempts: 0,
status: TaskStatus.Claiming,
@ -1010,7 +1010,7 @@ if (doc['task.runAt'].size()!=0) {
mockInstance({
id: 'bbb',
runAt,
taskType: 'bar',
taskType: 'yawn',
schedule: { interval: '5m' },
attempts: 2,
status: TaskStatus.Claiming,
@ -1083,7 +1083,7 @@ if (doc['task.runAt'].size()!=0) {
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'claiming',
taskType: 'foo',
taskType: 'yawn',
user: 'jimbo',
ownerId: taskManagerId,
},
@ -1096,7 +1096,7 @@ if (doc['task.runAt'].size()!=0) {
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'claiming',
taskType: 'bar',
taskType: 'yawn',
user: 'dabo',
ownerId: taskManagerId,
},

View file

@ -5,9 +5,9 @@
* 2.0.
*/
import { schema, TypeOf, ObjectType } from '@kbn/config-schema';
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
import { ObjectType, schema, TypeOf } from '@kbn/config-schema';
import { isErr, tryAsResult } from './lib/result_type';
import { Interval, isInterval, parseIntervalAsMillisecond } from './lib/intervals';
/*
* Type definitions and validations for tasks.
@ -47,6 +47,7 @@ export type SuccessfulRunResult = {
* recurring task). See the RunContext type definition for more details.
*/
state: Record<string, unknown>;
hasError?: boolean;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
{
@ -83,18 +84,40 @@ export const isFailedRunResult = (result: unknown): result is FailedRunResult =>
!!((result as FailedRunResult)?.error ?? false);
export interface FailedTaskResult {
status: TaskStatus.Failed;
status: TaskStatus.Failed | TaskStatus.DeadLetter;
}
type IndirectParamsType = Record<string, unknown>;
export interface LoadedIndirectParams<
IndirectParams extends IndirectParamsType = IndirectParamsType
> {
[key: string]: unknown;
indirectParams: IndirectParams;
}
export type LoadIndirectParamsResult<T extends LoadedIndirectParams = LoadedIndirectParams> =
| {
data: T;
error?: never;
}
| {
data?: never;
error: Error;
};
export type LoadIndirectParamsFunction = () => Promise<LoadIndirectParamsResult>;
export type RunFunction = () => Promise<RunResult | undefined | void>;
export type CancelFunction = () => Promise<RunResult | undefined | void>;
export interface CancellableTask {
export interface CancellableTask<T = never> {
loadIndirectParams?: LoadIndirectParamsFunction;
run: RunFunction;
cancel?: CancelFunction;
cleanup?: () => Promise<void>;
}
export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask;
export type TaskRunCreatorFunction = (
context: RunContext
) => CancellableTask<RunContext['taskInstance']>;
export const taskDefinitionSchema = schema.object(
{
@ -147,6 +170,10 @@ export const taskDefinitionSchema = schema.object(
})
)
),
paramsSchema: schema.maybe(schema.any()),
// schema of the data fetched by the task runner (in loadIndirectParams) e.g. rule, action etc.
indirectParamsSchema: schema.maybe(schema.any()),
},
{
validate({ timeout }) {
@ -161,7 +188,7 @@ export const taskDefinitionSchema = schema.object(
* Defines a task which can be scheduled and run by the Kibana
* task manager.
*/
export type TaskDefinition = TypeOf<typeof taskDefinitionSchema> & {
export type TaskDefinition = Omit<TypeOf<typeof taskDefinitionSchema>, 'paramsSchema'> & {
/**
* Creates an object that has a run function which performs the task's work,
* and an optional cancel function which cancels the task.
@ -174,6 +201,8 @@ export type TaskDefinition = TypeOf<typeof taskDefinitionSchema> & {
up: (state: Record<string, unknown>) => Record<string, unknown>;
}
>;
paramsSchema?: ObjectType;
indirectParamsSchema?: ObjectType;
};
export enum TaskStatus {
@ -182,6 +211,7 @@ export enum TaskStatus {
Running = 'running',
Failed = 'failed',
Unrecognized = 'unrecognized',
DeadLetter = 'dead_letter',
}
export enum TaskLifecycleResult {
@ -291,6 +321,11 @@ export interface TaskInstance {
* Indicates whether the task is currently enabled. Disabled tasks will not be claimed.
*/
enabled?: boolean;
/**
* Indicates the number of skipped executions.
*/
numSkippedRuns?: number;
}
/**

View file

@ -5,7 +5,12 @@
* 2.0.
*/
import { isUnrecoverableError, throwUnrecoverableError } from './errors';
import {
createSkipError,
isSkipError,
isUnrecoverableError,
throwUnrecoverableError,
} from './errors';
describe('Error Types', () => {
describe('Unrecoverable error', () => {
@ -26,5 +31,9 @@ describe('Error Types', () => {
it('idnentifies normal errors', () => {
expect(isUnrecoverableError(new Error('OMG'))).toBeFalsy();
});
it('createSkipError', () => {
expect(isSkipError(createSkipError(new Error('OMG')))).toBeTruthy();
});
});
});

View file

@ -9,6 +9,7 @@ import { EphemeralTask } from '../task';
// Unrecoverable
const CODE_UNRECOVERABLE = 'TaskManager/unrecoverable';
const CODE_RETRYABLE = 'TaskManager/retryable';
const CODE_SKIP = 'TaskManager/skip';
const code = Symbol('TaskManagerErrorCode');
const retry = Symbol('TaskManagerErrorRetry');
@ -51,10 +52,26 @@ export function isRetryableError(error: Error | DecoratedError) {
return null;
}
export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
export function createRetryableError(error: Error, shouldRetry: Date | boolean): DecoratedError {
(error as DecoratedError)[code] = CODE_RETRYABLE;
(error as DecoratedError)[retry] = shouldRetry;
throw error;
return error;
}
export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
throw createRetryableError(error, shouldRetry);
}
export function isSkipError(error: Error | DecoratedError) {
if (isTaskManagerError(error) && error[code] === CODE_SKIP) {
return true;
}
return false;
}
export function createSkipError(error: Error): DecoratedError {
(error as DecoratedError)[code] = CODE_SKIP;
return error;
}
export function isEphemeralTaskRejectedDueToCapacityError(

View file

@ -22,7 +22,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import moment from 'moment';
import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';
import { throwRetryableError, throwUnrecoverableError } from './errors';
import { createSkipError, throwRetryableError, throwUnrecoverableError } from './errors';
import apm from 'elastic-apm-node';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock';
@ -33,9 +33,16 @@ import {
TASK_MANAGER_TRANSACTION_TYPE,
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { RequeueInvalidTasksConfig } from '../config';
const executionContext = executionContextServiceMock.createSetupContract();
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
const mockRequeueInvalidTasksConfig = {
enabled: false,
delay: 3000,
max_attempts: 20,
};
let fakeTimer: sinon.SinonFakeTimers;
@ -1506,6 +1513,531 @@ describe('TaskManagerRunner', () => {
tags: ['task:end', 'foo', 'bar'],
});
});
describe('Skip Tasks', () => {
test('skips task.run when the task has invalid params', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Idle,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
params: { foo: 'bar' },
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { state: { foo: 'bar' } };
},
}),
paramsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBe(
new Date(Date.now()).getTime() + mockRequeueInvalidTasksConfig.delay
);
expect(instance.state).toEqual(mockTaskInstance.state);
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(1);
expect(result).toEqual(
asErr({
error: createSkipError(
new Error('[baz]: expected value of type [string] but got [undefined]')
),
state: {
existingStatePAram: 'foo',
},
})
);
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error: [baz]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has skipped executing the Task (bar/foo) 1 times as it has invalid params.'
);
});
test('skips task.run when the task has invalid indirect params e.g. rule', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: true,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBe(
new Date(Date.now()).getTime() + mockRequeueInvalidTasksConfig.delay
);
expect(instance.state).toEqual(mockTaskInstance.state);
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(1);
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error in its indirect params: [baz]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has skipped executing the Task (bar/foo) 1 times as it has invalid params.'
);
expect(result).toEqual(
asErr({
state: mockTaskInstance.state,
error: createSkipError(
new Error('[baz]: expected value of type [string] but got [undefined]')
),
})
);
});
test('does not skip when disabled (recurring task)', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
attempts: 1,
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
numSkippedRuns: mockRequeueInvalidTasksConfig.max_attempts,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: false,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: { new: 'foo' } };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBeGreaterThan(mockTaskInstance.runAt!.getTime()); // reschedule attempt
expect(instance.state).toEqual({ new: 'foo' });
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(0);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: { new: 'foo' } }));
});
test('does not skip when disabled (non-recurring task)', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
attempts: 5, // defaultMaxAttempts
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
numSkippedRuns: 0,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: false,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: { new: 'foo' } };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
await runner.run();
expect(store.update).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(store.remove).toHaveBeenCalled();
});
test('resets skip attempts on the first successful run', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.state).toEqual({});
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(0);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: {} }));
});
test('removes the non-recurring tasks on the first successful run after skipping', async () => {
const cleanupFn = jest.fn();
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
cleanup: cleanupFn,
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(cleanupFn).toHaveBeenCalled();
expect(store.remove).toHaveBeenCalledWith('foo');
expect(result).toEqual(asOk({ state: {} }));
});
test('does not resets skip attempts for a recurring task as long as there is an error', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, hasError: true };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
expect(store.remove).not.toHaveBeenCalled();
const instance = store.update.mock.calls[0][0];
expect(instance.numSkippedRuns).toBe(mockTaskInstance.numSkippedRuns);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: {}, hasError: true }));
});
test('does not resets skip attempts for a non-recurring task as long as there is an error', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
attempts: 3,
};
const error = new Error('test');
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, error };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: 3,
numSkippedRuns: 20,
state: {},
status: TaskStatus.Idle,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asErr({ state: {}, error }));
});
test("sets non recurring task's status as dead_letter after skip and retry attempts ", async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20, // max
attempts: 5, // default max
};
const error = new Error('test');
const { runner, store } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: true,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, error };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: mockTaskInstance.attempts, // default max
numSkippedRuns: mockTaskInstance.numSkippedRuns,
state: mockTaskInstance.state,
status: TaskStatus.DeadLetter,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(result).toEqual(asErr({ state: {}, error }));
});
test('stops skipping when the max skip limit is reached', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
schedule: { interval: '3s' },
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
attempts: 0,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'bar' } } };
},
async run() {
return { state: {}, hasError: true };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: 0,
numSkippedRuns: 20,
state: {},
status: TaskStatus.Idle,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error in its indirect params: [foo]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has reached the max skip attempts for task bar/foo'
);
expect(result).toEqual(asOk({ state: {}, hasError: true }));
});
});
});
describe('isAdHocTaskAndOutOfAttempts', () => {
@ -1619,6 +2151,7 @@ describe('TaskManagerRunner', () => {
instance?: Partial<ConcreteTaskInstance>;
definitions?: TaskDefinitionRegistry;
onTaskEvent?: jest.Mock<(event: TaskEvent<unknown, unknown>) => void>;
requeueInvalidTasksConfig?: RequeueInvalidTasksConfig;
}
function withAnyTiming(taskRun: TaskRun) {
@ -1694,6 +2227,7 @@ describe('TaskManagerRunner', () => {
monitor: true,
warn_threshold: 5000,
},
requeueInvalidTasksConfig: opts.requeueInvalidTasksConfig || mockRequeueInvalidTasksConfig,
});
if (stage === TaskRunningStage.READY_TO_RUN) {

View file

@ -14,45 +14,47 @@
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
import { identity, defaults, flow, omit } from 'lodash';
import { Logger, SavedObjectsErrorHelpers, ExecutionContextStart } from '@kbn/core/server';
import { defaults, flow, identity, isUndefined, omit } from 'lodash';
import { ExecutionContextStart, Logger, SavedObjectsErrorHelpers } from '@kbn/core/server';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import moment from 'moment';
import { Middleware } from '../lib/middleware';
import {
asOk,
asErr,
mapErr,
asOk,
eitherAsync,
unwrap,
isOk,
mapErr,
mapOk,
Result,
promiseResult,
Result,
unwrap,
} from '../lib/result_type';
import {
TaskRun,
TaskMarkRunning,
asTaskRunEvent,
asTaskMarkRunningEvent,
asTaskRunEvent,
startTaskTimerWithEventLoopMonitoring,
TaskTiming,
TaskMarkRunning,
TaskPersistence,
TaskRun,
TaskTiming,
} from '../task_events';
import { intervalFromDate, maxIntervalFromDate } from '../lib/intervals';
import {
CancelFunction,
CancellableTask,
ConcreteTaskInstance,
isFailedRunResult,
SuccessfulRunResult,
FailedRunResult,
FailedTaskResult,
isFailedRunResult,
SuccessfulRunResult,
TaskDefinition,
TaskStatus,
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { isRetryableError, isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';
import { createSkipError, isRetryableError, isSkipError, isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig, RequeueInvalidTasksConfig } from '../config';
export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
export const TASK_MANAGER_RUN_TRANSACTION_TYPE = 'task-run';
@ -103,6 +105,7 @@ type Opts = {
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
requeueInvalidTasksConfig: RequeueInvalidTasksConfig;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
export enum TaskRunResult {
@ -151,6 +154,7 @@ export class TaskManagerRunner implements TaskRunner {
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private readonly requeueInvalidTasksConfig: RequeueInvalidTasksConfig;
/**
* Creates an instance of TaskManagerRunner.
@ -174,6 +178,7 @@ export class TaskManagerRunner implements TaskRunner {
executionContext,
usageCounter,
eventLoopDelayConfig,
requeueInvalidTasksConfig,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
this.definitions = definitions;
@ -187,6 +192,7 @@ export class TaskManagerRunner implements TaskRunner {
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.requeueInvalidTasksConfig = requeueInvalidTasksConfig;
}
/**
@ -305,15 +311,28 @@ export class TaskManagerRunner implements TaskRunner {
try {
this.task = this.definition.createTaskRunner(modifiedContext);
const ctx = {
type: 'task manager',
name: `run ${this.instance.task.taskType}`,
id: this.instance.task.id,
description: 'run task',
};
const result = await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);
let taskParamsValidation;
if (this.requeueInvalidTasksConfig.enabled) {
taskParamsValidation = this.validateTaskParams();
if (!taskParamsValidation.error) {
taskParamsValidation = await this.validateIndirectTaskParams();
}
}
const result = taskParamsValidation?.error
? taskParamsValidation
: await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);
const validatedResult = this.validateResult(result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(validatedResult, stopTaskTimer())
@ -340,6 +359,61 @@ export class TaskManagerRunner implements TaskRunner {
}
}
private validateTaskParams() {
let error;
const { state, taskType, params, id, numSkippedRuns = 0 } = this.instance.task;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
try {
const paramsSchema = this.definition.paramsSchema;
if (paramsSchema) {
paramsSchema.validate(params);
}
} catch (err) {
this.logger.warn(`Task (${taskType}/${id}) has a validation error: ${err.message}`);
if (numSkippedRuns < maxAttempts) {
error = createSkipError(err);
} else {
this.logger.warn(
`Task Manager has reached the max skip attempts for task ${taskType}/${id}`
);
}
}
return { ...(error ? { error } : {}), state };
}
private async validateIndirectTaskParams() {
let error;
const { state, taskType, id, numSkippedRuns = 0 } = this.instance.task;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
const indirectParamsSchema = this.definition.indirectParamsSchema;
if (this.task?.loadIndirectParams && !!indirectParamsSchema) {
const { data } = await this.task.loadIndirectParams();
if (data) {
try {
if (indirectParamsSchema) {
indirectParamsSchema.validate(data.indirectParams);
}
} catch (err) {
this.logger.warn(
`Task (${taskType}/${id}) has a validation error in its indirect params: ${err.message}`
);
if (numSkippedRuns < maxAttempts) {
error = createSkipError(err);
} else {
this.logger.warn(
`Task Manager has reached the max skip attempts for task ${taskType}/${id}`
);
}
}
}
}
return { ...(error ? { error } : {}), state };
}
public async removeTask(): Promise<void> {
await this.bufferedTaskStore.remove(this.id);
if (this.task?.cleanup) {
@ -510,9 +584,26 @@ export class TaskManagerRunner implements TaskRunner {
failureResult: FailedRunResult
): Result<SuccessfulRunResult, FailedTaskResult> => {
const { state, error } = failureResult;
const { schedule, attempts } = this.instance.task;
const { max_attempts: maxSkipAttempts, enabled, delay } = this.requeueInvalidTasksConfig;
let skipAttempts = this.instance.task.numSkippedRuns ?? 0;
if (isSkipError(error) && enabled) {
skipAttempts = skipAttempts + 1;
const { taskType, id } = this.instance.task;
this.logger.warn(
`Task Manager has skipped executing the Task (${taskType}/${id}) ${skipAttempts} times as it has invalid params.`
);
return asOk({
state: this.instance.task.state,
runAt: moment().add(delay, 'millisecond').toDate(),
attempts: 0,
skipAttempts,
});
}
if (this.shouldTryToScheduleRetry() && !isUnrecoverableError(error)) {
// if we're retrying, keep the number of attempts
const { schedule, attempts } = this.instance.task;
const reschedule = failureResult.runAt
? { runAt: failureResult.runAt }
@ -532,10 +623,16 @@ export class TaskManagerRunner implements TaskRunner {
return asOk({
state,
attempts,
skipAttempts,
...reschedule,
});
}
}
if (skipAttempts >= maxSkipAttempts && enabled) {
return asErr({ status: TaskStatus.DeadLetter });
}
// scheduling a retry isn't possible,mark task as failed
return asErr({ status: TaskStatus.Failed });
};
@ -549,8 +646,23 @@ export class TaskManagerRunner implements TaskRunner {
mapErr(this.rescheduleFailedRun),
// if retrying is possible (new runAt) or this is an recurring task - reschedule
mapOk(
({ runAt, schedule: reschedule, state, attempts = 0 }: Partial<ConcreteTaskInstance>) => {
const { startedAt, schedule } = this.instance.task;
({
runAt,
schedule: reschedule,
state,
attempts = 0,
skipAttempts,
}: SuccessfulRunResult & { attempts: number; skipAttempts: number }) => {
const { startedAt, schedule, numSkippedRuns } = this.instance.task;
const { hasError } = unwrap(result);
let requeueInvalidTaskAttempts = skipAttempts || numSkippedRuns || 0;
// Alerting TaskRunner returns SuccessResult even though there is an error
// therefore we use "hasError" to be sure that there wasn't any error
if (isUndefined(skipAttempts) && !hasError) {
requeueInvalidTaskAttempts = 0;
}
return asOk({
runAt:
runAt || intervalFromDate(startedAt!, reschedule?.interval ?? schedule?.interval)!,
@ -558,6 +670,7 @@ export class TaskManagerRunner implements TaskRunner {
schedule: reschedule ?? schedule,
attempts,
status: TaskStatus.Idle,
numSkippedRuns: requeueInvalidTaskAttempts,
});
}
),
@ -619,6 +732,7 @@ export class TaskManagerRunner implements TaskRunner {
taskTiming: TaskTiming
): Promise<Result<SuccessfulRunResult, FailedRunResult>> {
const { task } = this.instance;
await eitherAsync(
result,
async ({ runAt, schedule }: SuccessfulRunResult) => {

View file

@ -871,7 +871,7 @@ describe('TaskStore', () => {
describe('getLifecycle', () => {
test('returns the task status if the task exists ', async () => {
expect.assertions(5);
expect.assertions(6);
return Promise.all(
Object.values(TaskStatus).map(async (status) => {
const task = {

View file

@ -73,6 +73,9 @@ export interface TaskRegisterDefinition {
up: (state: Record<string, unknown>) => Record<string, unknown>;
}
>;
paramsSchema?: ObjectType;
indirectParamsSchema?: ObjectType;
}
/**

View file

@ -36,6 +36,9 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',
'--xpack.task_manager.requeue_invalid_tasks.enabled=true',
'--xpack.task_manager.requeue_invalid_tasks.delay=1000',
'--xpack.task_manager.requeue_invalid_tasks.max_attempts=2',
...findTestPluginPaths(path.resolve(__dirname, 'plugins')),
],
},

View file

@ -17,6 +17,7 @@ import {
EphemeralTask,
} from '@kbn/task-manager-plugin/server';
import { DEFAULT_MAX_WORKERS } from '@kbn/task-manager-plugin/server/config';
import { schema } from '@kbn/config-schema';
import { initRoutes } from './init_routes';
// this plugin's dependendencies
@ -154,6 +155,52 @@ export class SampleTaskManagerFixturePlugin
},
}),
},
sampleRecurringTaskWithInvalidIndirectParam: {
title: 'Sample Recurring Task that has invalid indirect params',
description: 'A sample task that returns invalid params in loadIndirectParams all the time',
maxAttempts: 1,
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'foo' } } }; // invalid
},
async run() {
return { state: {}, schedule: { interval: '1s' }, hasError: true };
},
}),
indirectParamsSchema: schema.object({
param: schema.string(),
}),
},
sampleOneTimeTaskWithInvalidIndirectParam: {
title: 'Sample One Time Task that has invalid indirect params',
description:
'A sample task that returns invalid params in loadIndirectParams all the time and throws error in the run method',
maxAttempts: 1,
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'foo' } } }; // invalid
},
async run() {
throwRetryableError(new Error('Retry'), true);
},
}),
indirectParamsSchema: schema.object({
param: schema.string(),
}),
},
sampleTaskWithParamsSchema: {
title: 'Sample Task That has paramsSchema',
description: 'A sample task that has paramsSchema to validate params',
maxAttempts: 1,
paramsSchema: schema.object({
param: schema.string(),
}),
createTaskRunner: () => ({
async run() {
throwRetryableError(new Error('Retry'), true);
},
}),
},
});
const taskWithTiming = {

View file

@ -31,6 +31,9 @@ export default function ({ getService }: FtrProviderContext) {
'timedTask',
'timedTaskWithLimitedConcurrency',
'timedTaskWithSingleConcurrency',
'sampleRecurringTaskWithInvalidIndirectParam',
'sampleOneTimeTaskWithInvalidIndirectParam',
'sampleTaskWithParamsSchema',
];
// This test is meant to fail when any change is made in task manager registered types.

View file

@ -15,6 +15,7 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./task_management_scheduled_at'));
loadTestFile(require.resolve('./task_management_removed_types'));
loadTestFile(require.resolve('./check_registered_task_types'));
loadTestFile(require.resolve('./skip'));
loadTestFile(require.resolve('./migrations'));
});

View file

@ -0,0 +1,131 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SerializedConcreteTaskInstance, TaskStatus } from '@kbn/task-manager-plugin/server/task';
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const log = getService('log');
const retry = getService('retry');
describe('Skip invalid tasks', () => {
function currentTask(task: string): Promise<SerializedConcreteTaskInstance> {
return supertest
.get(`/api/sample_tasks/task/${task}`)
.send({ task })
.expect((response) => {
expect(response.status).to.eql(200);
expect(typeof JSON.parse(response.text).id).to.eql(`string`);
})
.then((response) => response.body);
}
after(async () => {
// clean up after last test
return await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
});
it('Skips recurring tasks that has invalid indirect param', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleRecurringTaskWithInvalidIndirectParam',
params: {},
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
let lastRunAt: string;
await retry.try(async () => {
const task = await currentTask(createdTask.id);
lastRunAt = task.runAt;
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
expect(task.attempts).to.eql(0);
expect(task.retryAt).to.eql(null);
// skip attempts remains as it is
expect(task.numSkippedRuns).to.eql(2);
// keeps rescheduling after skips
expect(new Date(task.runAt).getTime()).to.greaterThan(new Date(lastRunAt).getTime());
});
});
it('Skips non-recurring tasks that have invalid indirect params and sets status as "dead_letter" after 1 reschedule attempt', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleOneTimeTaskWithInvalidIndirectParam',
params: {},
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// reschedules 1 more time and set the status as 'dead_letter'
expect(task.attempts).to.eql(1);
expect(task.status).to.eql(TaskStatus.DeadLetter);
expect(task.numSkippedRuns).to.eql(2);
});
});
it('Skips the tasks with invalid params and sets status as "dead_letter" after 1 reschedule attempt', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleTaskWithParamsSchema',
params: { foo: 'bar' }, // invalid params
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// reschedules 1 more time and set the status as 'dead_letter' as the task throws an error
expect(task.attempts).to.eql(1);
expect(task.status).to.eql(TaskStatus.DeadLetter);
expect(task.numSkippedRuns).to.eql(2);
});
});
});
}