Remove task to cleanup action_task_params of failed tasks (#151873)

Part of https://github.com/elastic/kibana/issues/79977 (step 2).
Resolves https://github.com/elastic/kibana/issues/79977.

In this PR, I'm removing the recurring task defined by the actions
plugin that removes unused `action_task_params` SOs. With the
https://github.com/elastic/kibana/pull/152841 PR, tasks will no longer
get marked as failed and we have a migration script (`excludeOnUpgrade`)
that removes all tasks and action_task_params that are leftover during
the migration
https://github.com/elastic/kibana/blob/main/x-pack/plugins/actions/server/saved_objects/index.ts#L81-L94.

~~NOTE: I will hold off merging this PR until
https://github.com/elastic/kibana/pull/152841 is merged.~~ (merged)

## To verify

Not much to test here, but on a Kibana from `main` there will be this
task type running in the background and moving to this PR will cause the
task to get deleted because it is part of the `REMOVED_TYPES` array in
Task Manager.

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-03-29 13:22:08 -04:00 committed by GitHub
parent 733a6c2244
commit a199cfc774
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 4 additions and 1053 deletions

View file

@ -542,12 +542,6 @@ describe('create()', () => {
proxyOnlyHosts: undefined,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration('60s'),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
ssl: {
verificationMode: 'full',
proxyVerificationMode: 'full',

View file

@ -5,7 +5,6 @@
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { ByteSizeValue } from '@kbn/config-schema';
import { ActionsConfig } from './config';
import {
@ -30,12 +29,6 @@ const defaultActionsConfig: ActionsConfig = {
rejectUnauthorized: true, // legacy
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration(60000),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
ssl: {
proxyVerificationMode: 'full',
verificationMode: 'full',

View file

@ -1,138 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SavedObjectsFindResult, ISavedObjectsSerializer } from '@kbn/core/server';
import { loggingSystemMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { spacesMock } from '@kbn/spaces-plugin/server/mocks';
import { CleanupTasksOpts, cleanupTasks } from './cleanup_tasks';
import { TaskInstance } from '@kbn/task-manager-plugin/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
describe('cleanupTasks', () => {
const logger = loggingSystemMock.create().get();
const esClient = elasticsearchServiceMock.createElasticsearchClient();
const spaces = spacesMock.createStart();
const savedObjectsSerializer = {
generateRawId: jest
.fn()
.mockImplementation((namespace: string | undefined, type: string, id: string) => {
const namespacePrefix = namespace ? `${namespace}:` : '';
return `${namespacePrefix}${type}:${id}`;
}),
} as unknown as ISavedObjectsSerializer;
const cleanupTasksOpts: CleanupTasksOpts = {
logger,
esClient,
spaces,
savedObjectsSerializer,
kibanaIndex: '.kibana',
taskManagerIndex: '.kibana_task_manager',
tasks: [],
};
const taskSO: SavedObjectsFindResult<TaskInstance> = {
id: '123',
type: 'task',
references: [],
score: 0,
attributes: {
id: '123',
taskType: 'foo',
scheduledAt: new Date(),
state: {},
runAt: new Date(),
startedAt: new Date(),
retryAt: new Date(),
ownerId: '234',
params: { spaceId: undefined, actionTaskParamsId: '123' },
schedule: { interval: '5m' },
},
};
beforeEach(() => {
esClient.bulk.mockReset();
});
it('should skip cleanup when there are no tasks to cleanup', async () => {
const result = await cleanupTasks(cleanupTasksOpts);
expect(result).toEqual({
success: true,
successCount: 0,
failureCount: 0,
});
expect(esClient.bulk).not.toHaveBeenCalled();
});
it('should delete action_task_params and task objects', async () => {
esClient.bulk.mockResponse({
items: [],
errors: false,
took: 1,
} as unknown as estypes.BulkResponse);
const result = await cleanupTasks({
...cleanupTasksOpts,
tasks: [taskSO],
});
expect(esClient.bulk).toHaveBeenCalledWith(
{
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }],
},
{ meta: true }
);
expect(esClient.bulk).toHaveBeenCalledWith(
{
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }],
},
{ meta: true }
);
expect(result).toEqual({
success: true,
successCount: 1,
failureCount: 0,
});
});
it('should not delete the task if the action_task_params failed to delete', async () => {
esClient.bulk.mockResponse({
items: [
{
delete: {
_index: cleanupTasksOpts.kibanaIndex,
_id: 'action_task_params:123',
status: 500,
result: 'Failure',
error: true,
},
},
],
errors: true,
took: 1,
} as unknown as estypes.BulkResponse);
const result = await cleanupTasks({
...cleanupTasksOpts,
tasks: [taskSO],
});
expect(esClient.bulk).toHaveBeenCalledWith(
{
body: [{ delete: { _index: cleanupTasksOpts.kibanaIndex, _id: 'action_task_params:123' } }],
},
{ meta: true }
);
expect(esClient.bulk).not.toHaveBeenCalledWith(
{
body: [{ delete: { _index: cleanupTasksOpts.taskManagerIndex, _id: 'task:123' } }],
},
{ meta: true }
);
expect(result).toEqual({
success: false,
successCount: 0,
failureCount: 1,
});
});
});

View file

@ -1,109 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import {
Logger,
ElasticsearchClient,
SavedObjectsFindResult,
ISavedObjectsSerializer,
} from '@kbn/core/server';
import { TaskInstance } from '@kbn/task-manager-plugin/server';
import { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import {
bulkDelete,
extractBulkResponseDeleteFailures,
getRawActionTaskParamsIdFromTask,
} from './lib';
export interface CleanupTasksOpts {
logger: Logger;
esClient: ElasticsearchClient;
tasks: Array<SavedObjectsFindResult<TaskInstance>>;
spaces?: SpacesPluginStart;
savedObjectsSerializer: ISavedObjectsSerializer;
kibanaIndex: string;
taskManagerIndex: string;
}
export interface CleanupTasksResult {
success: boolean;
successCount: number;
failureCount: number;
}
/**
* Cleanup tasks
*
* This function receives action execution tasks that are in a failed state, removes
* the linked "action_task_params" object first and then if successful, the task manager's task.
*/
export async function cleanupTasks({
logger,
esClient,
tasks,
spaces,
savedObjectsSerializer,
kibanaIndex,
taskManagerIndex,
}: CleanupTasksOpts): Promise<CleanupTasksResult> {
const deserializedTasks = tasks.map((task) => ({
...task,
attributes: {
...task.attributes,
params:
typeof task.attributes.params === 'string'
? JSON.parse(task.attributes.params)
: task.attributes.params || {},
},
}));
// Remove accumulated action task params
const actionTaskParamIdsToDelete = deserializedTasks.map((task) =>
getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer })
);
const actionTaskParamBulkDeleteResult = await bulkDelete(
esClient,
kibanaIndex,
actionTaskParamIdsToDelete
);
const failedActionTaskParams = actionTaskParamBulkDeleteResult
? extractBulkResponseDeleteFailures(actionTaskParamBulkDeleteResult)
: [];
if (failedActionTaskParams?.length) {
logger.debug(
`Failed to delete the following action_task_params [${JSON.stringify(
failedActionTaskParams
)}]`
);
}
// Remove accumulated tasks
const taskIdsToDelete = deserializedTasks
.map((task) => {
const rawId = getRawActionTaskParamsIdFromTask({ task, spaces, savedObjectsSerializer });
// Avoid removing tasks that failed to remove linked objects
if (failedActionTaskParams?.find((item) => item._id === rawId)) {
return null;
}
const rawTaskId = savedObjectsSerializer.generateRawId(undefined, 'task', task.id);
return rawTaskId;
})
.filter((id) => !!id) as string[];
const taskBulkDeleteResult = await bulkDelete(esClient, taskManagerIndex, taskIdsToDelete);
const failedTasks = taskBulkDeleteResult
? extractBulkResponseDeleteFailures(taskBulkDeleteResult)
: [];
if (failedTasks?.length) {
logger.debug(`Failed to delete the following tasks [${JSON.stringify(failedTasks)}]`);
}
return {
success: failedActionTaskParams?.length === 0 && failedTasks.length === 0,
successCount: tasks.length - failedActionTaskParams.length - failedTasks.length,
failureCount: failedActionTaskParams.length + failedTasks.length,
};
}

View file

@ -1,9 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export const TASK_TYPE = 'cleanup_failed_action_executions';
export const TASK_ID = `Actions-${TASK_TYPE}`;

View file

@ -1,55 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
import { ActionsConfig } from '../config';
import { ensureScheduled } from './ensure_scheduled';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { loggingSystemMock } from '@kbn/core/server/mocks';
describe('ensureScheduled', () => {
const logger = loggingSystemMock.create().get();
const taskManager = taskManagerMock.createStart();
const config: ActionsConfig['cleanupFailedExecutionsTask'] = {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
};
beforeEach(() => jest.resetAllMocks());
it(`should call task manager's ensureScheduled function with proper params`, async () => {
await ensureScheduled(taskManager, logger, config);
expect(taskManager.ensureScheduled).toHaveBeenCalledTimes(1);
expect(taskManager.ensureScheduled.mock.calls[0]).toMatchInlineSnapshot(`
Array [
Object {
"id": "Actions-cleanup_failed_action_executions",
"params": Object {},
"schedule": Object {
"interval": "5m",
},
"state": Object {
"runs": 0,
"total_cleaned_up": 0,
},
"taskType": "cleanup_failed_action_executions",
},
]
`);
});
it('should log an error and not throw when ensureScheduled function throws', async () => {
taskManager.ensureScheduled.mockRejectedValue(new Error('Fail'));
await ensureScheduled(taskManager, logger, config);
expect(logger.error).toHaveBeenCalledWith(
'Error scheduling Actions-cleanup_failed_action_executions, received Fail'
);
});
});

View file

@ -1,34 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger } from '@kbn/core/server';
import { TaskManagerStartContract, asInterval } from '@kbn/task-manager-plugin/server';
import { TASK_ID, TASK_TYPE } from './constants';
import { ActionsConfig } from '../config';
export async function ensureScheduled(
taskManager: TaskManagerStartContract,
logger: Logger,
{ cleanupInterval }: ActionsConfig['cleanupFailedExecutionsTask']
) {
try {
await taskManager.ensureScheduled({
id: TASK_ID,
taskType: TASK_TYPE,
schedule: {
interval: asInterval(cleanupInterval.asMilliseconds()),
},
state: {
runs: 0,
total_cleaned_up: 0,
},
params: {},
});
} catch (e) {
logger.error(`Error scheduling ${TASK_ID}, received ${e.message}`);
}
}

View file

@ -1,168 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { ActionsConfig } from '../config';
import { ActionsPluginsStart } from '../plugin';
import { spacesMock } from '@kbn/spaces-plugin/server/mocks';
import { toElasticsearchQuery } from '@kbn/es-query';
import {
loggingSystemMock,
savedObjectsRepositoryMock,
savedObjectsServiceMock,
elasticsearchServiceMock,
} from '@kbn/core/server/mocks';
import { actionTypeRegistryMock } from '../action_type_registry.mock';
import { FindAndCleanupTasksOpts, findAndCleanupTasks } from './find_and_cleanup_tasks';
jest.mock('./cleanup_tasks', () => ({
cleanupTasks: jest.fn(),
}));
describe('findAndCleanupTasks', () => {
const logger = loggingSystemMock.create().get();
const actionTypeRegistry = actionTypeRegistryMock.create();
const savedObjectsRepository = savedObjectsRepositoryMock.create();
const esStart = elasticsearchServiceMock.createStart();
const spaces = spacesMock.createStart();
const soService = savedObjectsServiceMock.createStartContract();
const coreStartServices = Promise.resolve([
{
savedObjects: {
...soService,
createInternalRepository: () => savedObjectsRepository,
},
elasticsearch: esStart,
},
{
spaces,
},
{},
]) as unknown as Promise<[CoreStart, ActionsPluginsStart, unknown]>;
const config: ActionsConfig['cleanupFailedExecutionsTask'] = {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
};
const findAndCleanupTasksOpts: FindAndCleanupTasksOpts = {
logger,
actionTypeRegistry,
coreStartServices,
config,
kibanaIndex: '.kibana',
taskManagerIndex: '.kibana_task_manager',
};
beforeEach(() => {
actionTypeRegistry.list.mockReturnValue([
{
id: 'my-action-type',
name: 'My action type',
enabled: true,
enabledInConfig: true,
enabledInLicense: true,
minimumLicenseRequired: 'basic',
supportedFeatureIds: ['alerting'],
},
]);
jest.requireMock('./cleanup_tasks').cleanupTasks.mockResolvedValue({
success: true,
successCount: 0,
failureCount: 0,
});
savedObjectsRepository.find.mockResolvedValue({
total: 0,
page: 1,
per_page: 10,
saved_objects: [],
});
});
it('should call the find function with proper parameters', async () => {
await findAndCleanupTasks(findAndCleanupTasksOpts);
expect(savedObjectsRepository.find).toHaveBeenCalledWith({
type: 'task',
filter: expect.any(Object),
page: 1,
perPage: config.pageSize,
sortField: 'runAt',
sortOrder: 'asc',
});
expect(toElasticsearchQuery(savedObjectsRepository.find.mock.calls[0][0].filter))
.toMatchInlineSnapshot(`
Object {
"bool": Object {
"filter": Array [
Object {
"bool": Object {
"minimum_should_match": 1,
"should": Array [
Object {
"match": Object {
"task.attributes.status": "failed",
},
},
],
},
},
Object {
"bool": Object {
"minimum_should_match": 1,
"should": Array [
Object {
"match": Object {
"task.attributes.taskType": "actions:my-action-type",
},
},
],
},
},
],
},
}
`);
});
it('should call the cleanupTasks function with proper parameters', async () => {
await findAndCleanupTasks(findAndCleanupTasksOpts);
expect(jest.requireMock('./cleanup_tasks').cleanupTasks).toHaveBeenCalledWith({
logger: findAndCleanupTasksOpts.logger,
esClient: esStart.client.asInternalUser,
spaces,
kibanaIndex: findAndCleanupTasksOpts.kibanaIndex,
taskManagerIndex: findAndCleanupTasksOpts.taskManagerIndex,
savedObjectsSerializer: soService.createSerializer(),
tasks: [],
});
});
it('should return the cleanup result', async () => {
const result = await findAndCleanupTasks(findAndCleanupTasksOpts);
expect(result).toEqual({
success: true,
successCount: 0,
failureCount: 0,
remaining: 0,
});
});
it('should log a message before cleaning up tasks', async () => {
await findAndCleanupTasks(findAndCleanupTasksOpts);
expect(logger.debug).toHaveBeenCalledWith('Removing 0 of 0 failed execution task(s)');
});
it('should log a message after cleaning up tasks', async () => {
await findAndCleanupTasks(findAndCleanupTasksOpts);
expect(logger.debug).toHaveBeenCalledWith(
'Finished cleanup of failed executions. [success=0, failures=0]'
);
});
});

View file

@ -1,80 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { nodeBuilder } from '@kbn/es-query';
import { Logger, CoreStart } from '@kbn/core/server';
import { TaskInstance } from '@kbn/task-manager-plugin/server';
import { ActionsConfig } from '../config';
import { ActionsPluginsStart } from '../plugin';
import { ActionTypeRegistryContract } from '../types';
import { cleanupTasks, CleanupTasksResult } from './cleanup_tasks';
export interface FindAndCleanupTasksOpts {
logger: Logger;
actionTypeRegistry: ActionTypeRegistryContract;
coreStartServices: Promise<[CoreStart, ActionsPluginsStart, unknown]>;
config: ActionsConfig['cleanupFailedExecutionsTask'];
kibanaIndex: string;
taskManagerIndex: string;
}
export interface FindAndCleanupTasksResult extends CleanupTasksResult {
remaining: number;
}
export async function findAndCleanupTasks({
logger,
actionTypeRegistry,
coreStartServices,
config,
kibanaIndex,
taskManagerIndex,
}: FindAndCleanupTasksOpts): Promise<FindAndCleanupTasksResult> {
logger.debug('Starting cleanup of failed executions');
const [{ savedObjects, elasticsearch }, { spaces }] = await coreStartServices;
const esClient = elasticsearch.client.asInternalUser;
const savedObjectsClient = savedObjects.createInternalRepository(['task']);
const savedObjectsSerializer = savedObjects.createSerializer();
const result = await savedObjectsClient.find<TaskInstance>({
type: 'task',
filter: nodeBuilder.and([
nodeBuilder.is('task.attributes.status', 'failed'),
nodeBuilder.or(
actionTypeRegistry
.list()
.map((actionType) =>
nodeBuilder.is('task.attributes.taskType', `actions:${actionType.id}`)
)
),
]),
page: 1,
perPage: config.pageSize,
sortField: 'runAt',
sortOrder: 'asc',
});
logger.debug(
`Removing ${result.saved_objects.length} of ${result.total} failed execution task(s)`
);
const cleanupResult = await cleanupTasks({
logger,
esClient,
spaces,
kibanaIndex,
taskManagerIndex,
savedObjectsSerializer,
tasks: result.saved_objects,
});
logger.debug(
`Finished cleanup of failed executions. [success=${cleanupResult.successCount}, failures=${cleanupResult.failureCount}]`
);
return {
...cleanupResult,
remaining: result.total - cleanupResult.successCount,
};
}

View file

@ -1,9 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { ensureScheduled as ensureCleanupFailedExecutionsTaskScheduled } from './ensure_scheduled';
export { registerTaskDefinition as registerCleanupFailedExecutionsTaskDefinition } from './register_task_definition';

View file

@ -1,29 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { ElasticsearchClient } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { TransportResult } from '@elastic/elasticsearch';
export async function bulkDelete(
esClient: ElasticsearchClient,
index: string,
ids: string[]
): Promise<TransportResult<estypes.BulkResponse, unknown> | undefined> {
if (ids.length === 0) {
return;
}
return await esClient.bulk(
{
body: ids.map((id) => ({
delete: { _index: index, _id: id },
})),
},
{ meta: true }
);
}

View file

@ -1,30 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { TransportResult } from '@elastic/elasticsearch';
type ResponseFailures = Array<Pick<estypes.BulkResponseItem, '_id' | 'status' | 'result'>>;
export function extractBulkResponseDeleteFailures(
response: TransportResult<estypes.BulkResponse, unknown>
): ResponseFailures {
const result: ResponseFailures = [];
for (const item of response.body.items) {
if (!item.delete || !item.delete.error) {
continue;
}
result.push({
_id: item.delete._id,
status: item.delete.status,
result: item.delete.result,
});
}
return result;
}

View file

@ -1,27 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsFindResult, ISavedObjectsSerializer } from '@kbn/core/server';
import { TaskInstance } from '@kbn/task-manager-plugin/server';
import { SpacesPluginStart } from '@kbn/spaces-plugin/server';
import { spaceIdToNamespace } from '../../lib';
interface GetRawActionTaskParamsIdFromTaskOpts {
task: SavedObjectsFindResult<TaskInstance>;
spaces?: SpacesPluginStart;
savedObjectsSerializer: ISavedObjectsSerializer;
}
export function getRawActionTaskParamsIdFromTask({
task,
spaces,
savedObjectsSerializer,
}: GetRawActionTaskParamsIdFromTaskOpts) {
const { spaceId, actionTaskParamsId } = task.attributes.params;
const namespace = spaceIdToNamespace(spaces, spaceId);
return savedObjectsSerializer.generateRawId(namespace, 'action_task_params', actionTaskParamsId);
}

View file

@ -1,10 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
export { extractBulkResponseDeleteFailures } from './extract_bulk_response_delete_failures';
export { bulkDelete } from './bulk_delete';
export { getRawActionTaskParamsIdFromTask } from './get_raw_action_task_params_id_from_task';

View file

@ -1,71 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { ActionsConfig } from '../config';
import { ActionsPluginsStart } from '../plugin';
import { registerTaskDefinition } from './register_task_definition';
import { taskManagerMock } from '@kbn/task-manager-plugin/server/mocks';
import { loggingSystemMock, coreMock } from '@kbn/core/server/mocks';
import { actionTypeRegistryMock } from '../action_type_registry.mock';
import { TaskRunnerOpts } from './task_runner';
jest.mock('./task_runner', () => ({ taskRunner: jest.fn() }));
describe('registerTaskDefinition', () => {
const logger = loggingSystemMock.create().get();
const taskManager = taskManagerMock.createSetup();
const actionTypeRegistry = actionTypeRegistryMock.create();
const coreStartServices = coreMock.createSetup().getStartServices() as Promise<
[CoreStart, ActionsPluginsStart, unknown]
>;
const config: ActionsConfig['cleanupFailedExecutionsTask'] = {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
};
const taskRunnerOpts: TaskRunnerOpts = {
logger,
coreStartServices,
actionTypeRegistry,
config,
kibanaIndex: '.kibana',
taskManagerIndex: '.kibana_task_manager',
};
beforeEach(() => {
jest.resetAllMocks();
jest.requireMock('./task_runner').taskRunner.mockReturnValue(jest.fn());
});
it('should call registerTaskDefinitions with proper parameters', () => {
registerTaskDefinition(taskManager, taskRunnerOpts);
expect(taskManager.registerTaskDefinitions).toHaveBeenCalledTimes(1);
expect(taskManager.registerTaskDefinitions.mock.calls).toMatchInlineSnapshot(`
Array [
Array [
Object {
"cleanup_failed_action_executions": Object {
"createTaskRunner": [MockFunction],
"title": "Cleanup failed action executions",
},
},
],
]
`);
});
it('should call taskRunner with proper parameters', () => {
registerTaskDefinition(taskManager, taskRunnerOpts);
const { taskRunner } = jest.requireMock('./task_runner');
expect(taskRunner).toHaveBeenCalledWith(taskRunnerOpts);
});
});

View file

@ -1,22 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { TaskManagerSetupContract } from '@kbn/task-manager-plugin/server';
import { TASK_TYPE } from './constants';
import { taskRunner, TaskRunnerOpts } from './task_runner';
export function registerTaskDefinition(
taskManager: TaskManagerSetupContract,
taskRunnerOpts: TaskRunnerOpts
) {
taskManager.registerTaskDefinitions({
[TASK_TYPE]: {
title: 'Cleanup failed action executions',
createTaskRunner: taskRunner(taskRunnerOpts),
},
});
}

View file

@ -1,108 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { CoreStart } from '@kbn/core/server';
import { schema } from '@kbn/config-schema';
import { ActionsConfig } from '../config';
import { ActionsPluginsStart } from '../plugin';
import { ConcreteTaskInstance, TaskStatus } from '@kbn/task-manager-plugin/server';
import { loggingSystemMock, coreMock } from '@kbn/core/server/mocks';
import { actionTypeRegistryMock } from '../action_type_registry.mock';
import { taskRunner, TaskRunnerOpts } from './task_runner';
jest.mock('./find_and_cleanup_tasks', () => ({
findAndCleanupTasks: jest.fn(),
}));
describe('taskRunner', () => {
const logger = loggingSystemMock.create().get();
const actionTypeRegistry = actionTypeRegistryMock.create();
const coreStartServices = coreMock.createSetup().getStartServices() as Promise<
[CoreStart, ActionsPluginsStart, unknown]
>;
const config: ActionsConfig['cleanupFailedExecutionsTask'] = {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
};
const taskRunnerOpts: TaskRunnerOpts = {
logger,
coreStartServices,
actionTypeRegistry,
config,
kibanaIndex: '.kibana',
taskManagerIndex: '.kibana_task_manager',
};
const taskInstance: ConcreteTaskInstance = {
id: '123',
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Running,
state: { runs: 0, total_cleaned_up: 0 },
runAt: new Date(),
startedAt: new Date(),
retryAt: new Date(),
ownerId: '234',
taskType: 'foo',
params: {},
};
beforeEach(() => {
jest.resetAllMocks();
jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks.mockResolvedValue({
success: true,
successCount: 1,
failureCount: 1,
remaining: 0,
});
});
describe('run', () => {
it('should call findAndCleanupTasks with proper parameters', async () => {
const runner = taskRunner(taskRunnerOpts)({ taskInstance });
await runner.run();
expect(jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks).toHaveBeenCalledWith(
taskRunnerOpts
);
});
it('should update state to reflect cleanup result', async () => {
const runner = taskRunner(taskRunnerOpts)({ taskInstance });
const { state } = await runner.run();
expect(state).toEqual({
runs: 1,
total_cleaned_up: 1,
});
});
it('should return idle schedule when no remaining tasks to cleanup', async () => {
const runner = taskRunner(taskRunnerOpts)({ taskInstance });
const { schedule } = await runner.run();
expect(schedule).toEqual({
interval: '60m',
});
});
it('should return cleanup schedule when there are some remaining tasks to cleanup', async () => {
jest.requireMock('./find_and_cleanup_tasks').findAndCleanupTasks.mockResolvedValue({
success: true,
successCount: 1,
failureCount: 1,
remaining: 1,
});
const runner = taskRunner(taskRunnerOpts)({ taskInstance });
const { schedule } = await runner.run();
expect(schedule).toEqual({
interval: '5m',
});
});
});
});

View file

@ -1,45 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { Logger, CoreStart } from '@kbn/core/server';
import { RunContext, asInterval } from '@kbn/task-manager-plugin/server';
import { ActionsConfig } from '../config';
import { ActionsPluginsStart } from '../plugin';
import { ActionTypeRegistryContract } from '../types';
import { findAndCleanupTasks } from './find_and_cleanup_tasks';
export interface TaskRunnerOpts {
logger: Logger;
actionTypeRegistry: ActionTypeRegistryContract;
coreStartServices: Promise<[CoreStart, ActionsPluginsStart, unknown]>;
config: ActionsConfig['cleanupFailedExecutionsTask'];
kibanaIndex: string;
taskManagerIndex: string;
}
export function taskRunner(opts: TaskRunnerOpts) {
return ({ taskInstance }: RunContext) => {
const { state } = taskInstance;
return {
async run() {
const cleanupResult = await findAndCleanupTasks(opts);
return {
state: {
runs: state.runs + 1,
total_cleaned_up: state.total_cleaned_up + cleanupResult.successCount,
},
schedule: {
interval:
cleanupResult.remaining > 0
? asInterval(opts.config.cleanupInterval.asMilliseconds())
: asInterval(opts.config.idleInterval.asMilliseconds()),
},
};
},
};
};
}

View file

@ -23,12 +23,6 @@ describe('config validation', () => {
"allowedHosts": Array [
"*",
],
"cleanupFailedExecutionsTask": Object {
"cleanupInterval": "PT5M",
"enabled": true,
"idleInterval": "PT1H",
"pageSize": 100,
},
"enabledActionTypes": Array [
"*",
],
@ -63,12 +57,6 @@ describe('config validation', () => {
"allowedHosts": Array [
"*",
],
"cleanupFailedExecutionsTask": Object {
"cleanupInterval": "PT5M",
"enabled": true,
"idleInterval": "PT1H",
"pageSize": 100,
},
"enabledActionTypes": Array [
"*",
],
@ -210,12 +198,6 @@ describe('config validation', () => {
"allowedHosts": Array [
"*",
],
"cleanupFailedExecutionsTask": Object {
"cleanupInterval": "PT5M",
"enabled": true,
"idleInterval": "PT1H",
"pageSize": 100,
},
"enabledActionTypes": Array [
"*",
],

View file

@ -113,12 +113,6 @@ export const configSchema = schema.object({
maxResponseContentLength: schema.byteSize({ defaultValue: '1mb' }),
responseTimeout: schema.duration({ defaultValue: '60s' }),
customHostSettings: schema.maybe(schema.arrayOf(customHostSettingsSchema)),
cleanupFailedExecutionsTask: schema.object({
enabled: schema.boolean({ defaultValue: true }),
cleanupInterval: schema.duration({ defaultValue: '5m' }),
idleInterval: schema.duration({ defaultValue: '1h' }),
pageSize: schema.number({ defaultValue: 100 }),
}),
microsoftGraphApiUrl: schema.maybe(schema.string()),
email: schema.maybe(
schema.object({

View file

@ -14,7 +14,6 @@ import http from 'http';
import https from 'https';
import axios from 'axios';
import { duration as momentDuration } from 'moment';
import { schema } from '@kbn/config-schema';
import getPort from 'get-port';
import { request } from '../lib/axios_utils';
@ -581,12 +580,6 @@ const BaseActionsConfig: ActionsConfig = {
maxResponseContentLength: ByteSizeValue.parse('1mb'),
responseTimeout: momentDuration(1000 * 30),
customHostSettings: undefined,
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
};
function getACUfromConfig(config: Partial<ActionsConfig> = {}): ActionsConfigurationUtilities {

View file

@ -14,7 +14,6 @@ import http from 'http';
import https from 'https';
import axios from 'axios';
import { duration as momentDuration } from 'moment';
import { schema } from '@kbn/config-schema';
import getPort from 'get-port';
import { request } from '../lib/axios_utils';
@ -593,12 +592,6 @@ const BaseActionsConfig: ActionsConfig = {
maxResponseContentLength: ByteSizeValue.parse('1mb'),
responseTimeout: momentDuration(1000 * 30),
customHostSettings: undefined,
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
};
function getACUfromConfig(config: Partial<ActionsConfig> = {}): ActionsConfigurationUtilities {

View file

@ -7,7 +7,7 @@
import { readFileSync as fsReadFileSync } from 'fs';
import { resolve as pathResolve, join as pathJoin } from 'path';
import { schema, ByteSizeValue } from '@kbn/config-schema';
import { ByteSizeValue } from '@kbn/config-schema';
import moment from 'moment';
import { ActionsConfig } from '../config';
@ -73,12 +73,6 @@ describe('custom_host_settings', () => {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration(60000),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
};
test('ensure it copies over the config parts that it does not touch', () => {

View file

@ -6,7 +6,7 @@
*/
import moment from 'moment';
import { schema, ByteSizeValue } from '@kbn/config-schema';
import { ByteSizeValue } from '@kbn/config-schema';
import { PluginInitializerContext, RequestHandlerContext } from '@kbn/core/server';
import { coreMock, httpServerMock } from '@kbn/core/server/mocks';
import { usageCollectionPluginMock } from '@kbn/usage-collection-plugin/server/mocks';
@ -46,12 +46,6 @@ describe('Actions Plugin', () => {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration(60000),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
});
plugin = new ActionsPlugin(context);
coreSetup = coreMock.createSetup();
@ -219,12 +213,6 @@ describe('Actions Plugin', () => {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration('60s'),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
...overrides,
};
}
@ -280,12 +268,6 @@ describe('Actions Plugin', () => {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration(60000),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
});
plugin = new ActionsPlugin(context);
coreSetup = coreMock.createSetup();
@ -354,12 +336,6 @@ describe('Actions Plugin', () => {
rejectUnauthorized: true,
maxResponseContentLength: new ByteSizeValue(1000000),
responseTimeout: moment.duration('60s'),
cleanupFailedExecutionsTask: {
enabled: true,
cleanupInterval: schema.duration().validate('5m'),
idleInterval: schema.duration().validate('1h'),
pageSize: 100,
},
...overrides,
};
}

View file

@ -39,10 +39,6 @@ import {
IEventLogService,
} from '@kbn/event-log-plugin/server';
import { MonitoringCollectionSetup } from '@kbn/monitoring-collection-plugin/server';
import {
ensureCleanupFailedExecutionsTaskScheduled,
registerCleanupFailedExecutionsTaskDefinition,
} from './cleanup_failed_executions';
import { ActionsConfig, getValidatedConfig } from './config';
import { resolveCustomHosts } from './lib/custom_host_settings';
@ -343,18 +339,6 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
usageCounter: this.usageCounter,
});
// Cleanup failed execution task definition
if (this.actionsConfig.cleanupFailedExecutionsTask.enabled) {
registerCleanupFailedExecutionsTaskDefinition(plugins.taskManager, {
actionTypeRegistry,
logger: this.logger,
coreStartServices: core.getStartServices(),
config: this.actionsConfig.cleanupFailedExecutionsTask,
kibanaIndex: this.kibanaIndex,
taskManagerIndex: plugins.taskManager.index,
});
}
return {
registerType: <
Config extends ActionTypeConfig = ActionTypeConfig,
@ -540,15 +524,6 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
});
}
// Cleanup failed execution task
if (this.actionsConfig.cleanupFailedExecutionsTask.enabled) {
ensureCleanupFailedExecutionsTaskScheduled(
plugins.taskManager,
this.logger,
this.actionsConfig.cleanupFailedExecutionsTask
);
}
return {
isActionTypeEnabled: (id, options = { notifyUsage: false }) => {
return this.actionTypeRegistry!.isActionTypeEnabled(id, options);

View file

@ -22,6 +22,8 @@ export const REMOVED_TYPES: string[] = [
'search_sessions_monitor',
'search_sessions_cleanup',
'search_sessions_expire',
'cleanup_failed_action_executions',
];
/**

View file

@ -112,7 +112,6 @@ export default function ({ getService }: FtrProviderContext) {
'apm-source-map-migration-task',
'apm-telemetry-task',
'cases-telemetry-task',
'cleanup_failed_action_executions',
'cloud_security_posture-stats_task',
'dashboard_telemetry',
'endpoint:metadata-check-transforms-task',