[Actions] Fixed ad-hoc actions tasks remain as "running" when they timeout by adding cancellation support (#120853)

* [Actions] Fixed ad-hoc actions tasks remain as "running" when they timeout by adding cancellation support

* fixed test

* fixed tests

* fixed test

* removed test data

* fixed typechecks

* fixed typechecks

* fixed typechecks

* fixed tests

* fixed typechecks

* fixed tests

* fixed typechecks

* fixed test

* fixed tests

* fixed tests

* changed unit tests

* fixed tests

* fixed jest tests

* fixed typechecks

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Yuliia Naumenko 2022-01-19 14:09:31 -08:00 committed by GitHub
parent 70c2b8b98e
commit 158a9a53a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 422 additions and 59 deletions

View file

@ -10,4 +10,5 @@ export const EVENT_LOG_ACTIONS = {
execute: 'execute',
executeStart: 'execute-start',
executeViaHttp: 'execute-via-http',
executeTimeout: 'execute-timeout',
};

View file

@ -11,6 +11,7 @@ const createActionExecutorMock = () => {
const mocked: jest.Mocked<ActionExecutorContract> = {
initialize: jest.fn(),
execute: jest.fn().mockResolvedValue({ status: 'ok', actionId: '' }),
logCancellation: jest.fn(),
};
return mocked;
};

View file

@ -115,6 +115,7 @@ test('successfully executes', async () => {
Object {
"event": Object {
"action": "execute-start",
"kind": "action",
},
"kibana": Object {
"saved_objects": Array [
@ -134,6 +135,7 @@ test('successfully executes', async () => {
Object {
"event": Object {
"action": "execute",
"kind": "action",
"outcome": "success",
},
"kibana": Object {
@ -511,6 +513,34 @@ test('logs a warning when alert executor returns invalid status', async () => {
);
});
test('writes to event log for execute timeout', async () => {
setupActionExecutorMock();
await actionExecutor.logCancellation({
actionId: 'action1',
relatedSavedObjects: [],
request: {} as KibanaRequest,
});
expect(eventLogger.logEvent).toHaveBeenCalledTimes(1);
expect(eventLogger.logEvent.mock.calls[0][0]).toMatchObject({
event: {
action: 'execute-timeout',
},
kibana: {
saved_objects: [
{
rel: 'primary',
type: 'action',
id: 'action1',
type_id: 'test',
namespace: 'some-namespace',
},
],
},
message: `action: test:action1: 'action-1' execution cancelled due to timeout - exceeded default timeout of "5m"`,
});
});
test('writes to event log for execute and execute start', async () => {
const executorMock = setupActionExecutorMock();
executorMock.mockResolvedValue({

View file

@ -19,16 +19,17 @@ import {
ActionTypeExecutorResult,
ActionTypeRegistryContract,
GetServicesFunction,
RawAction,
PreConfiguredAction,
RawAction,
} from '../types';
import { EncryptedSavedObjectsClient } from '../../../encrypted_saved_objects/server';
import { SpacesServiceStart } from '../../../spaces/server';
import { EVENT_LOG_ACTIONS } from '../constants/event_log';
import { IEvent, IEventLogger, SAVED_OBJECT_REL_PRIMARY } from '../../../event_log/server';
import { IEventLogger, SAVED_OBJECT_REL_PRIMARY } from '../../../event_log/server';
import { ActionsClient } from '../actions_client';
import { ActionExecutionSource } from './action_execution_source';
import { RelatedSavedObjects } from './related_saved_objects';
import { createActionEventLogRecordObject } from './create_action_event_log_record_object';
// 1,000,000 nanoseconds in 1 millisecond
const Millis2Nanos = 1000 * 1000;
@ -68,6 +69,7 @@ export class ActionExecutor {
private isInitialized = false;
private actionExecutorContext?: ActionExecutorContext;
private readonly isESOCanEncrypt: boolean;
private actionInfo: ActionInfo | undefined;
constructor({ isESOCanEncrypt }: { isESOCanEncrypt: boolean }) {
this.isESOCanEncrypt = isESOCanEncrypt;
@ -124,7 +126,7 @@ export class ActionExecutor {
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
const { actionTypeId, name, config, secrets } = await getActionInfo(
const actionInfo = await getActionInfoInternal(
await getActionsClientWithRequest(request, source),
encryptedSavedObjectsClient,
preconfiguredActions,
@ -132,6 +134,12 @@ export class ActionExecutor {
namespace.namespace
);
const { actionTypeId, name, config, secrets } = actionInfo;
if (!this.actionInfo || this.actionInfo.actionId !== actionId) {
this.actionInfo = actionInfo;
}
if (span) {
span.name = `execute_action ${actionTypeId}`;
span.addLabels({
@ -169,26 +177,25 @@ export class ActionExecutor {
? {
task: {
scheduled: taskInfo.scheduled.toISOString(),
schedule_delay: Millis2Nanos * (Date.now() - taskInfo.scheduled.getTime()),
scheduleDelay: Millis2Nanos * (Date.now() - taskInfo.scheduled.getTime()),
},
}
: {};
const event: IEvent = {
event: { action: EVENT_LOG_ACTIONS.execute },
kibana: {
...task,
saved_objects: [
{
rel: SAVED_OBJECT_REL_PRIMARY,
type: 'action',
id: actionId,
type_id: actionTypeId,
...namespace,
},
],
},
};
const event = createActionEventLogRecordObject({
actionId,
action: EVENT_LOG_ACTIONS.execute,
...namespace,
...task,
savedObjects: [
{
type: 'action',
id: actionId,
typeId: actionTypeId,
relation: SAVED_OBJECT_REL_PRIMARY,
},
],
});
for (const relatedSavedObject of relatedSavedObjects || []) {
event.kibana?.saved_objects?.push({
@ -210,6 +217,7 @@ export class ActionExecutor {
},
message: `action started: ${actionLabel}`,
});
eventLogger.logEvent(startEvent);
let rawResult: ActionTypeExecutorResult<unknown>;
@ -269,22 +277,77 @@ export class ActionExecutor {
}
);
}
}
function actionErrorToMessage(result: ActionTypeExecutorResult<unknown>): string {
let message = result.message || 'unknown error running action';
public async logCancellation<Source = unknown>({
actionId,
request,
relatedSavedObjects,
source,
taskInfo,
}: {
actionId: string;
request: KibanaRequest;
taskInfo?: TaskInfo;
relatedSavedObjects: RelatedSavedObjects;
source?: ActionExecutionSource<Source>;
}) {
const {
spaces,
encryptedSavedObjectsClient,
preconfiguredActions,
eventLogger,
getActionsClientWithRequest,
} = this.actionExecutorContext!;
if (result.serviceMessage) {
message = `${message}: ${result.serviceMessage}`;
const spaceId = spaces && spaces.getSpaceId(request);
const namespace = spaceId && spaceId !== 'default' ? { namespace: spaceId } : {};
if (!this.actionInfo || this.actionInfo.actionId !== actionId) {
this.actionInfo = await getActionInfoInternal(
await getActionsClientWithRequest(request, source),
encryptedSavedObjectsClient,
preconfiguredActions,
actionId,
namespace.namespace
);
}
const task = taskInfo
? {
task: {
scheduled: taskInfo.scheduled.toISOString(),
scheduleDelay: Millis2Nanos * (Date.now() - taskInfo.scheduled.getTime()),
},
}
: {};
// Write event log entry
const event = createActionEventLogRecordObject({
actionId,
action: EVENT_LOG_ACTIONS.executeTimeout,
message: `action: ${this.actionInfo.actionTypeId}:${actionId}: '${
this.actionInfo.name ?? ''
}' execution cancelled due to timeout - exceeded default timeout of "5m"`,
...namespace,
...task,
savedObjects: [
{
type: 'action',
id: actionId,
typeId: this.actionInfo.actionTypeId,
relation: SAVED_OBJECT_REL_PRIMARY,
},
],
});
for (const relatedSavedObject of (relatedSavedObjects || []) as RelatedSavedObjects) {
event.kibana?.saved_objects?.push({
rel: SAVED_OBJECT_REL_PRIMARY,
type: relatedSavedObject.type,
id: relatedSavedObject.id,
type_id: relatedSavedObject.typeId,
namespace: relatedSavedObject.namespace,
});
}
eventLogger.logEvent(event);
}
if (result.retry instanceof Date) {
message = `${message}; retry at ${result.retry.toISOString()}`;
} else if (result.retry) {
message = `${message}; retry: ${JSON.stringify(result.retry)}`;
}
return message;
}
interface ActionInfo {
@ -292,9 +355,10 @@ interface ActionInfo {
name: string;
config: unknown;
secrets: unknown;
actionId: string;
}
async function getActionInfo(
async function getActionInfoInternal(
actionsClient: PublicMethodsOf<ActionsClient>,
encryptedSavedObjectsClient: EncryptedSavedObjectsClient,
preconfiguredActions: PreConfiguredAction[],
@ -311,6 +375,7 @@ async function getActionInfo(
name: pcAction.name,
config: pcAction.config,
secrets: pcAction.secrets,
actionId,
};
}
@ -329,5 +394,22 @@ async function getActionInfo(
name,
config,
secrets,
actionId,
};
}
function actionErrorToMessage(result: ActionTypeExecutorResult<unknown>): string {
let message = result.message || 'unknown error running action';
if (result.serviceMessage) {
message = `${message}: ${result.serviceMessage}`;
}
if (result.retry instanceof Date) {
message = `${message}; retry at ${result.retry.toISOString()}`;
} else if (result.retry) {
message = `${message}; retry: ${JSON.stringify(result.retry)}`;
}
return message;
}

View file

@ -0,0 +1,128 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { createActionEventLogRecordObject } from './create_action_event_log_record_object';
describe('createActionEventLogRecordObject', () => {
test('created action event "execute-start"', async () => {
expect(
createActionEventLogRecordObject({
actionId: '1',
action: 'execute-start',
timestamp: '1970-01-01T00:00:00.000Z',
task: {
scheduled: '1970-01-01T00:00:00.000Z',
scheduleDelay: 0,
},
savedObjects: [
{
id: '1',
type: 'action',
typeId: 'test',
relation: 'primary',
},
],
})
).toStrictEqual({
'@timestamp': '1970-01-01T00:00:00.000Z',
event: {
action: 'execute-start',
kind: 'action',
},
kibana: {
saved_objects: [
{
id: '1',
rel: 'primary',
type: 'action',
type_id: 'test',
},
],
task: {
schedule_delay: 0,
scheduled: '1970-01-01T00:00:00.000Z',
},
},
});
});
test('created action event "execute"', async () => {
expect(
createActionEventLogRecordObject({
actionId: '1',
name: 'test name',
action: 'execute',
message: 'action execution start',
namespace: 'default',
savedObjects: [
{
id: '2',
type: 'action',
typeId: '.email',
relation: 'primary',
},
],
})
).toStrictEqual({
event: {
action: 'execute',
kind: 'action',
},
kibana: {
saved_objects: [
{
id: '2',
namespace: 'default',
rel: 'primary',
type: 'action',
type_id: '.email',
},
],
},
message: 'action execution start',
});
});
test('created action event "execute-timeout"', async () => {
expect(
createActionEventLogRecordObject({
actionId: '1',
action: 'execute-timeout',
task: {
scheduled: '1970-01-01T00:00:00.000Z',
},
savedObjects: [
{
id: '1',
type: 'action',
typeId: 'test',
relation: 'primary',
},
],
})
).toStrictEqual({
event: {
action: 'execute-timeout',
kind: 'action',
},
kibana: {
saved_objects: [
{
id: '1',
rel: 'primary',
type: 'action',
type_id: 'test',
},
],
task: {
schedule_delay: undefined,
scheduled: '1970-01-01T00:00:00.000Z',
},
},
});
});
});

View file

@ -0,0 +1,53 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { IEvent } from '../../../event_log/server';
export type Event = Exclude<IEvent, undefined>;
interface CreateActionEventLogRecordParams {
actionId: string;
action: string;
name?: string;
message?: string;
namespace?: string;
timestamp?: string;
task?: {
scheduled?: string;
scheduleDelay?: number;
};
savedObjects: Array<{
type: string;
id: string;
typeId: string;
relation?: string;
}>;
}
export function createActionEventLogRecordObject(params: CreateActionEventLogRecordParams): Event {
const { action, message, task, namespace } = params;
const event: Event = {
...(params.timestamp ? { '@timestamp': params.timestamp } : {}),
event: {
action,
kind: 'action',
},
kibana: {
saved_objects: params.savedObjects.map((so) => ({
...(so.relation ? { rel: so.relation } : {}),
type: so.type,
id: so.id,
type_id: so.typeId,
...(namespace ? { namespace } : {}),
})),
...(task ? { task: { scheduled: task.scheduled, schedule_delay: task.scheduleDelay } } : {}),
},
...(message ? { message } : {}),
};
return event;
}

View file

@ -22,6 +22,7 @@ const spaceIdToNamespace = jest.fn();
const actionTypeRegistry = actionTypeRegistryMock.create();
const mockedEncryptedSavedObjectsClient = encryptedSavedObjectsMock.createClient();
const mockedActionExecutor = actionExecutorMock.create();
const eventLogger = eventLoggerMock.create();
let fakeTimer: sinon.SinonFakeTimers;
let taskRunnerFactory: TaskRunnerFactory;
@ -62,7 +63,7 @@ const actionExecutorInitializerParams = {
actionTypeRegistry,
getActionsClientWithRequest: jest.fn(async () => actionsClientMock.create()),
encryptedSavedObjectsClient: mockedEncryptedSavedObjectsClient,
eventLogger: eventLoggerMock.create(),
eventLogger,
preconfiguredActions: [],
};
const taskRunnerFactoryInitializerParams = {
@ -236,6 +237,37 @@ test('cleans up action_task_params object', async () => {
expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3');
});
test('task runner should implement CancellableTask cancel method with logging warning message', async () => {
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});
await taskRunner.cancel();
expect(mockedActionExecutor.logCancellation.mock.calls[0][0].actionId).toBe('2');
expect(mockedActionExecutor.logCancellation.mock.calls.length).toBe(1);
expect(taskRunnerFactoryInitializerParams.logger.debug).toHaveBeenCalledWith(
`Cancelling action task for action with id 2 - execution error due to timeout.`
);
});
test('runs successfully when cleanup fails and logs the error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,

View file

@ -93,31 +93,10 @@ export class TaskRunnerFactory {
encryptedSavedObjectsClient,
spaceIdToNamespace
);
const requestHeaders: Record<string, string> = {};
if (apiKey) {
requestHeaders.authorization = `ApiKey ${apiKey}`;
}
const path = addSpaceIdToPath('/', spaceId);
// Since we're using API keys and accessing elasticsearch can only be done
// via a request, we're faking one with the proper authorization headers.
const fakeRequest = KibanaRequest.from({
headers: requestHeaders,
path: '/',
route: { settings: {} },
url: {
href: '/',
},
raw: {
req: {
url: '/',
},
},
} as unknown as Request);
basePathService.set(fakeRequest, path);
const request = getFakeRequest(apiKey);
basePathService.set(request, path);
// Throwing an executor error means we will attempt to retry the task
// TM will treat a task as a failure if `attempts >= maxAttempts`
@ -132,7 +111,7 @@ export class TaskRunnerFactory {
params,
actionId: actionId as string,
isEphemeral: !isPersistedActionTask(actionTaskExecutorParams),
request: fakeRequest,
request,
...getSourceFromReferences(references),
taskInfo,
relatedSavedObjects: validatedRelatedSavedObjects(logger, relatedSavedObjects),
@ -181,7 +160,7 @@ export class TaskRunnerFactory {
// We would idealy secure every operation but in order to support clean up of legacy alerts
// we allow this operation in an unsecured manner
// Once support for legacy alert RBAC is dropped, this can be secured
await getUnsecuredSavedObjectsClient(fakeRequest).delete(
await getUnsecuredSavedObjectsClient(request).delete(
ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
actionTaskExecutorParams.actionTaskParamsId
);
@ -193,10 +172,65 @@ export class TaskRunnerFactory {
}
}
},
cancel: async () => {
// Write event log entry
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
const { spaceId } = actionTaskExecutorParams;
const {
attributes: { actionId, apiKey, relatedSavedObjects },
references,
} = await getActionTaskParams(
actionTaskExecutorParams,
encryptedSavedObjectsClient,
spaceIdToNamespace
);
const request = getFakeRequest(apiKey);
const path = addSpaceIdToPath('/', spaceId);
basePathService.set(request, path);
await actionExecutor.logCancellation({
actionId,
request,
relatedSavedObjects: (relatedSavedObjects || []) as RelatedSavedObjects,
...getSourceFromReferences(references),
});
logger.debug(
`Cancelling action task for action with id ${actionId} - execution error due to timeout.`
);
return { state: {} };
},
};
}
}
function getFakeRequest(apiKey?: string) {
const requestHeaders: Record<string, string> = {};
if (apiKey) {
requestHeaders.authorization = `ApiKey ${apiKey}`;
}
// Since we're using API keys and accessing elasticsearch can only be done
// via a request, we're faking one with the proper authorization headers.
const fakeRequest = KibanaRequest.from({
headers: requestHeaders,
path: '/',
route: { settings: {} },
url: {
href: '/',
},
raw: {
req: {
url: '/',
},
},
} as unknown as Request);
return fakeRequest;
}
async function getActionTaskParams(
executorParams: ActionTaskExecutorParams,
encryptedSavedObjectsClient: EncryptedSavedObjectsClient,

View file

@ -304,6 +304,7 @@ export class EphemeralTaskManagerRunner implements TaskRunner {
public async cancel() {
const { task } = this;
if (task?.cancel) {
// it will cause the task state of "running" to be cleared
this.task = undefined;
return task.cancel();
}

View file

@ -431,6 +431,7 @@ export class TaskManagerRunner implements TaskRunner {
public async cancel() {
const { task } = this;
if (task?.cancel) {
// it will cause the task state of "running" to be cleared
this.task = undefined;
return task.cancel();
}