Remove tasks with cleanup logic instead of marking them as failed (#152841)

Part of https://github.com/elastic/kibana/issues/79977 (step 1 and 3).

In this PR, I'm making Task Manager remove tasks instead of updating
them with `status: failed` whenever a task is out of attempts. I've also
added an optional `cleanup` hook to the task runner that can be defined
if additional cleanup is necessary whenever a task has been deleted (ex:
delete `action_task_params`).

## To verify an ad-hoc task that always fails

1. With this PR codebase, modify an action to always throw an error
2. Create an alerting rule that will invoke the action once
3. See the action fail three times
4. Observe the task SO is deleted (search by task type / action type)
alongside the action_task_params SO

## To verify Kibana crashing on the last ad-hoc task attempt

1. With this PR codebase, modify an action to always throw an error
(similar to scenario above) but also add a delay of 10s before the error
is thrown (`await new Promise((resolve) => setTimeout(resolve, 10000));`
and a log message before the delay begins
2. Create an alerting rule that will invoke the action once
3. See the action fail twice
4. On the third run, crash Kibana while the action is waiting for the
10s delay, this will cause the action to still be marked as running
while it no longer is
5. Restart Kibana
6. Wait 5-10m until the task's retryAt is overdue
7. Observe the task getting deleted and the action_task_params getting
deleted

## To verify recurring tasks that continuously fail

1. With this PR codebase, modify a rule type to always throw an error
when it runs
2. Create an alerting rule of that type (with a short interval)
3. Observe the rule continuously running and not getting trapped into
the PR changes

Flaky test runner:
https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/2036

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Mike Côté 2023-03-27 11:42:33 -04:00 committed by GitHub
parent dec52ef09d
commit 676aec76fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 493 additions and 545 deletions

View file

@ -152,8 +152,7 @@ export class ActionTypeRegistry {
[`actions:${actionType.id}`]: {
title: actionType.name,
maxAttempts,
createTaskRunner: (context: RunContext) =>
this.taskRunnerFactory.create(context, maxAttempts),
createTaskRunner: (context: RunContext) => this.taskRunnerFactory.create(context),
},
});
// No need to notify usage on basic action types

View file

@ -12,14 +12,22 @@ import { TaskRunnerFactory } from './task_runner_factory';
import { actionTypeRegistryMock } from '../action_type_registry.mock';
import { actionExecutorMock } from './action_executor.mock';
import { encryptedSavedObjectsMock } from '@kbn/encrypted-saved-objects-plugin/server/mocks';
import { savedObjectsClientMock, loggingSystemMock, httpServiceMock } from '@kbn/core/server/mocks';
import {
savedObjectsClientMock,
loggingSystemMock,
httpServiceMock,
savedObjectsRepositoryMock,
} from '@kbn/core/server/mocks';
import { eventLoggerMock } from '@kbn/event-log-plugin/server/mocks';
import { ActionTypeDisabledError } from './errors';
import { actionsClientMock } from '../mocks';
import { inMemoryMetricsMock } from '../monitoring/in_memory_metrics.mock';
import { IN_MEMORY_METRICS } from '../monitoring';
import { pick } from 'lodash';
import { isRetryableError } from '@kbn/task-manager-plugin/server/task_running';
import {
isRetryableError,
isUnrecoverableError,
} from '@kbn/task-manager-plugin/server/task_running';
const executeParamsFields = [
'actionId',
@ -86,15 +94,12 @@ const taskRunnerFactoryInitializerParams = {
logger: loggingSystemMock.create().get(),
encryptedSavedObjectsClient: mockedEncryptedSavedObjectsClient,
basePathService: httpServiceMock.createBasePath(),
getUnsecuredSavedObjectsClient: jest.fn().mockReturnValue(services.savedObjectsClient),
savedObjectsRepository: savedObjectsRepositoryMock.create(),
};
beforeEach(() => {
jest.resetAllMocks();
actionExecutorInitializerParams.getServices.mockReturnValue(services);
taskRunnerFactoryInitializerParams.getUnsecuredSavedObjectsClient.mockReturnValue(
services.savedObjectsClient
);
});
test(`throws an error if factory isn't initialized`, () => {
@ -410,36 +415,18 @@ test('executes the task by calling the executor with proper parameters when noti
);
});
test('cleans up action_task_params object', async () => {
test('cleans up action_task_params object through the cleanup runner method', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});
mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
await taskRunner.cleanup();
await taskRunner.run();
expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3', {
refresh: false,
});
expect(taskRunnerFactoryInitializerParams.savedObjectsRepository.delete).toHaveBeenCalledWith(
'action_task_params',
'3',
{ refresh: false }
);
});
test('task runner should implement CancellableTask cancel method with logging warning message', async () => {
@ -474,37 +461,22 @@ test('task runner should implement CancellableTask cancel method with logging wa
);
});
test('runs successfully when cleanup fails and logs the error', async () => {
test('cleanup runs successfully when action_task_params cleanup fails and logs the error', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: mockedTaskInstance,
});
mockedActionExecutor.execute.mockResolvedValueOnce({ status: 'ok', actionId: '2' });
spaceIdToNamespace.mockReturnValueOnce('namespace-test');
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
services.savedObjectsClient.delete.mockRejectedValueOnce(new Error('Fail'));
taskRunnerFactoryInitializerParams.savedObjectsRepository.delete.mockRejectedValueOnce(
new Error('Fail')
);
await taskRunner.run();
await taskRunner.cleanup();
expect(services.savedObjectsClient.delete).toHaveBeenCalledWith('action_task_params', '3', {
refresh: false,
});
expect(taskRunnerFactoryInitializerParams.savedObjectsRepository.delete).toHaveBeenCalledWith(
'action_task_params',
'3',
{ refresh: false }
);
expect(taskRunnerFactoryInitializerParams.logger.error).toHaveBeenCalledWith(
'Failed to cleanup action_task_params object [id="3"]: Fail'
);
@ -814,15 +786,12 @@ test(`doesn't use API key when not provided`, async () => {
});
test(`throws an error when license doesn't support the action type`, async () => {
const taskRunner = taskRunnerFactory.create(
{
taskInstance: {
...mockedTaskInstance,
attempts: 1,
},
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
attempts: 1,
},
2
);
});
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
@ -849,7 +818,7 @@ test(`throws an error when license doesn't support the action type`, async () =>
await taskRunner.run();
throw new Error('Should have thrown');
} catch (e) {
expect(isRetryableError(e)).toEqual(true);
expect(isUnrecoverableError(e)).toEqual(true);
}
});
@ -895,56 +864,11 @@ test(`will throw an error with retry: false if the task is not retryable`, async
expect(err).toBeDefined();
expect(isRetryableError(err)).toEqual(false);
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will not retry: Error message`
`Action '2' failed: Error message`
);
});
test(`treats errors as successes if the task is not retryable`, async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
attempts: 1,
},
});
mockedEncryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValueOnce({
id: '3',
type: 'action_task_params',
attributes: {
actionId: '2',
params: { baz: true },
executionId: '123abc',
apiKey: Buffer.from('123:abc').toString('base64'),
},
references: [
{
id: '2',
name: 'actionRef',
type: 'action',
},
],
});
mockedActionExecutor.execute.mockResolvedValueOnce({
status: 'error',
actionId: '2',
message: 'Error message',
data: { foo: true },
retry: false,
});
let err;
try {
await taskRunner.run();
} catch (e) {
err = e;
}
expect(err).toBeUndefined();
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will not retry: Error message`
);
});
test('will throw a retry error if the error is thrown instead of returned', async () => {
test('will rethrow the error if the error is thrown instead of returned', async () => {
const taskRunner = taskRunnerFactory.create({
taskInstance: {
...mockedTaskInstance,
@ -969,7 +893,8 @@ test('will throw a retry error if the error is thrown instead of returned', asyn
},
],
});
mockedActionExecutor.execute.mockRejectedValueOnce({});
const thrownError = new Error('Fail');
mockedActionExecutor.execute.mockRejectedValueOnce(thrownError);
let err;
try {
@ -978,10 +903,10 @@ test('will throw a retry error if the error is thrown instead of returned', asyn
err = e;
}
expect(err).toBeDefined();
expect(isRetryableError(err)).toEqual(true);
expect(taskRunnerFactoryInitializerParams.logger.error as jest.Mock).toHaveBeenCalledWith(
`Action '2' failed and will retry: undefined`
`Action '2' failed: Fail`
);
expect(thrownError).toEqual(err);
});
test('increments monitoring metrics after execution', async () => {

View file

@ -10,18 +10,20 @@ import { pick } from 'lodash';
import { addSpaceIdToPath } from '@kbn/spaces-plugin/server';
import {
Logger,
SavedObjectsClientContract,
KibanaRequest,
CoreKibanaRequest,
IBasePath,
SavedObject,
Headers,
FakeRawRequest,
SavedObjectReference,
ISavedObjectsRepository,
} from '@kbn/core/server';
import { RunContext } from '@kbn/task-manager-plugin/server';
import { EncryptedSavedObjectsClient } from '@kbn/encrypted-saved-objects-plugin/server';
import { throwRetryableError } from '@kbn/task-manager-plugin/server/task_running';
import {
throwRetryableError,
throwUnrecoverableError,
} from '@kbn/task-manager-plugin/server/task_running';
import { ActionExecutorContract } from './action_executor';
import {
ActionTaskParams,
@ -40,6 +42,7 @@ import {
import { RelatedSavedObjects, validatedRelatedSavedObjects } from './related_saved_objects';
import { injectSavedObjectReferences } from './action_task_params_utils';
import { InMemoryMetrics, IN_MEMORY_METRICS } from '../monitoring';
import { ActionTypeDisabledError } from './errors';
export interface TaskRunnerContext {
logger: Logger;
@ -47,7 +50,7 @@ export interface TaskRunnerContext {
encryptedSavedObjectsClient: EncryptedSavedObjectsClient;
spaceIdToNamespace: SpaceIdToNamespaceFunction;
basePathService: IBasePath;
getUnsecuredSavedObjectsClient: (request: KibanaRequest) => SavedObjectsClientContract;
savedObjectsRepository: ISavedObjectsRepository;
}
export class TaskRunnerFactory {
@ -69,7 +72,7 @@ export class TaskRunnerFactory {
this.taskRunnerContext = taskRunnerContext;
}
public create({ taskInstance }: RunContext, maxAttempts: number = 1) {
public create({ taskInstance }: RunContext) {
if (!this.isInitialized) {
throw new Error('TaskRunnerFactory not initialized');
}
@ -80,7 +83,7 @@ export class TaskRunnerFactory {
encryptedSavedObjectsClient,
spaceIdToNamespace,
basePathService,
getUnsecuredSavedObjectsClient,
savedObjectsRepository,
} = this.taskRunnerContext!;
const taskInfo = {
@ -88,10 +91,10 @@ export class TaskRunnerFactory {
attempts: taskInstance.attempts,
};
const actionExecutionId = uuidv4();
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
return {
async run() {
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
const { spaceId } = actionTaskExecutorParams;
const {
@ -115,12 +118,6 @@ export class TaskRunnerFactory {
const request = getFakeRequest(apiKey);
basePathService.set(request, path);
// TM will treat a task as a failure if `attempts >= maxAttempts`
// so we need to handle that here to avoid TM persisting the failed task
const isRetryableBasedOnAttempts = taskInfo.attempts < maxAttempts;
const willRetryMessage = `and will retry`;
const willNotRetryMessage = `and will not retry`;
let executorResult: ActionTypeExecutorResult<unknown> | undefined;
try {
executorResult = await actionExecutor.execute({
@ -136,65 +133,28 @@ export class TaskRunnerFactory {
...getSource(references, source),
});
} catch (e) {
logger.error(
`Action '${actionId}' failed ${
isRetryableBasedOnAttempts ? willRetryMessage : willNotRetryMessage
}: ${e.message}`
);
if (isRetryableBasedOnAttempts) {
// To retry, we will throw a Task Manager RetryableError
throw throwRetryableError(new Error(e.message), true);
logger.error(`Action '${actionId}' failed: ${e.message}`);
if (e instanceof ActionTypeDisabledError) {
// We'll stop re-trying due to action being forbidden
throwUnrecoverableError(e);
}
throw e;
}
inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_EXECUTIONS);
if (
executorResult &&
executorResult?.status === 'error' &&
executorResult?.retry !== undefined &&
isRetryableBasedOnAttempts
) {
if (executorResult.status === 'error') {
inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_FAILURES);
logger.error(
`Action '${actionId}' failed ${
!!executorResult.retry ? willRetryMessage : willNotRetryMessage
}: ${executorResult.message}`
);
// When the return status is `error`, we will throw a Task Manager RetryableError
logger.error(`Action '${actionId}' failed: ${executorResult.message}`);
// Task manager error handler only kicks in when an error thrown (at this time)
// So what we have to do is throw when the return status is `error`.
throw throwRetryableError(
new Error(executorResult.message),
executorResult.retry as boolean | Date
);
} else if (executorResult && executorResult?.status === 'error') {
inMemoryMetrics.increment(IN_MEMORY_METRICS.ACTION_FAILURES);
logger.error(
`Action '${actionId}' failed ${willNotRetryMessage}: ${executorResult.message}`
);
}
// Cleanup action_task_params object now that we're done with it
if (isPersistedActionTask(actionTaskExecutorParams)) {
try {
// If the request has reached this far we can assume the user is allowed to run clean up
// We would idealy secure every operation but in order to support clean up of legacy alerts
// we allow this operation in an unsecured manner
// Once support for legacy alert RBAC is dropped, this can be secured
await getUnsecuredSavedObjectsClient(request).delete(
ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
actionTaskExecutorParams.actionTaskParamsId,
{ refresh: false }
);
} catch (e) {
// Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic)
logger.error(
`Failed to cleanup ${ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE} object [id="${actionTaskExecutorParams.actionTaskParamsId}"]: ${e.message}`
);
}
}
},
cancel: async () => {
// Write event log entry
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;
const { spaceId } = actionTaskExecutorParams;
const {
@ -227,6 +187,23 @@ export class TaskRunnerFactory {
);
return { state: {} };
},
cleanup: async () => {
// Cleanup action_task_params object now that we're done with it
if (isPersistedActionTask(actionTaskExecutorParams)) {
try {
await savedObjectsRepository.delete(
ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
actionTaskExecutorParams.actionTaskParamsId,
{ refresh: false, namespace: spaceIdToNamespace(actionTaskExecutorParams.spaceId) }
);
} catch (e) {
// Log error only, we shouldn't fail the task because of an error here (if ever there's retry logic)
logger.error(
`Failed to cleanup ${ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE} object [id="${actionTaskExecutorParams.actionTaskParamsId}"]: ${e.message}`
);
}
}
},
};
}
}

View file

@ -524,8 +524,9 @@ export class ActionsPlugin implements Plugin<PluginSetupContract, PluginStartCon
encryptedSavedObjectsClient,
basePathService: core.http.basePath,
spaceIdToNamespace: (spaceId?: string) => spaceIdToNamespace(plugins.spaces, spaceId),
getUnsecuredSavedObjectsClient: (request: KibanaRequest) =>
this.getUnsecuredSavedObjectsClient(core.savedObjects, request),
savedObjectsRepository: core.savedObjects.createInternalRepository([
ACTION_TASK_PARAMS_SAVED_OBJECT_TYPE,
]),
});
this.eventLogService!.isEsContextReady().then(() => {

View file

@ -20,12 +20,12 @@ export const tryToRemoveTasks = async ({
}) => {
const taskIdsFailedToBeDeleted: string[] = [];
const taskIdsSuccessfullyDeleted: string[] = [];
return await withSpan({ name: 'taskManager.bulkRemoveIfExist', type: 'rules' }, async () => {
return await withSpan({ name: 'taskManager.bulkRemove', type: 'rules' }, async () => {
if (taskIdsToDelete.length > 0) {
try {
const resultFromDeletingTasks = await taskManager.bulkRemoveIfExist(taskIdsToDelete);
const resultFromDeletingTasks = await taskManager.bulkRemove(taskIdsToDelete);
resultFromDeletingTasks?.statuses.forEach((status) => {
if (status.success) {
if (status.success || status.error?.statusCode === 404) {
taskIdsSuccessfullyDeleted.push(status.id);
} else {
taskIdsFailedToBeDeleted.push(status.id);
@ -49,7 +49,7 @@ export const tryToRemoveTasks = async ({
logger.error(
`Failure to delete schedules for underlying tasks: ${taskIdsToDelete.join(
', '
)}. TaskManager bulkRemoveIfExist failed with Error: ${error.message}`
)}. TaskManager bulkRemove failed with Error: ${error.message}`
);
}
}

View file

@ -141,8 +141,8 @@ describe('bulkDelete', () => {
enabledRule1,
enabledRule2,
]);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1']);
expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1']);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith(
{ apiKeys: ['MTIzOmFiYw=='] },
@ -205,8 +205,8 @@ describe('bulkDelete', () => {
const result = await rulesClient.bulkDeleteRules({ ids: ['id1', 'id2'] });
expect(unsecuredSavedObjectsClient.bulkDelete).toHaveBeenCalledTimes(4);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1']);
expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1']);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith(
{ apiKeys: ['MTIzOmFiYw=='] },
@ -263,8 +263,8 @@ describe('bulkDelete', () => {
const result = await rulesClient.bulkDeleteRules({ ids: ['id1', 'id2'] });
expect(unsecuredSavedObjectsClient.bulkDelete).toHaveBeenCalledTimes(2);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['id1', 'id2']);
expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemove).toHaveBeenCalledWith(['id1', 'id2']);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledTimes(1);
expect(bulkMarkApiKeysForInvalidation).toHaveBeenCalledWith(
{ apiKeys: ['MTIzOmFiYw==', 'MzIxOmFiYw=='] },
@ -321,7 +321,7 @@ describe('bulkDelete', () => {
{ id: 'id2', type: 'alert', success: true },
],
});
taskManager.bulkRemoveIfExist.mockImplementation(async () => ({
taskManager.bulkRemove.mockImplementation(async () => ({
statuses: [
{
id: 'id1',
@ -349,7 +349,7 @@ describe('bulkDelete', () => {
{ id: 'id2', type: 'alert', success: true },
],
});
taskManager.bulkRemoveIfExist.mockImplementation(() => {
taskManager.bulkRemove.mockImplementation(() => {
throw new Error('UPS');
});
@ -357,7 +357,7 @@ describe('bulkDelete', () => {
expect(logger.error).toBeCalledTimes(1);
expect(logger.error).toBeCalledWith(
'Failure to delete schedules for underlying tasks: id1, id2. TaskManager bulkRemoveIfExist failed with Error: UPS'
'Failure to delete schedules for underlying tasks: id1, id2. TaskManager bulkRemove failed with Error: UPS'
);
});
@ -369,7 +369,7 @@ describe('bulkDelete', () => {
{ id: 'id2', type: 'alert', success: true },
],
});
taskManager.bulkRemoveIfExist.mockImplementation(async () => ({
taskManager.bulkRemove.mockImplementation(async () => ({
statuses: [
{
id: 'id1',

View file

@ -399,7 +399,7 @@ describe('bulkDisableRules', () => {
],
});
taskManager.bulkRemoveIfExist.mockResolvedValue({
taskManager.bulkRemove.mockResolvedValue({
statuses: [
{ id: 'id1', type: 'alert', success: true },
{ id: 'id2', type: 'alert', success: false },
@ -408,8 +408,8 @@ describe('bulkDisableRules', () => {
await rulesClient.bulkDisableRules({ filter: 'fake_filter' });
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemoveIfExist).toHaveBeenCalledWith(['taskId1', 'taskId2']);
expect(taskManager.bulkRemove).toHaveBeenCalledTimes(1);
expect(taskManager.bulkRemove).toHaveBeenCalledWith(['taskId1', 'taskId2']);
expect(logger.debug).toBeCalledTimes(1);
expect(logger.debug).toBeCalledWith(
@ -477,7 +477,7 @@ describe('bulkDisableRules', () => {
);
});
test('should not throw an error if taskManager.bulkRemoveIfExist throw an error', async () => {
test('should not throw an error if taskManager.bulkRemove throw an error', async () => {
unsecuredSavedObjectsClient.bulkCreate.mockResolvedValue({
saved_objects: [
{
@ -490,15 +490,15 @@ describe('bulkDisableRules', () => {
],
});
taskManager.bulkRemoveIfExist.mockImplementation(() => {
throw new Error('Something happend during bulkRemoveIfExist');
taskManager.bulkRemove.mockImplementation(() => {
throw new Error('Something happend during bulkRemove');
});
await rulesClient.bulkDisableRules({ filter: 'fake_filter' });
expect(logger.error).toBeCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
'Failure to delete schedules for underlying tasks: taskId1. TaskManager bulkRemoveIfExist failed with Error: Something happend during bulkRemoveIfExist'
'Failure to delete schedules for underlying tasks: taskId1. TaskManager bulkRemove failed with Error: Something happend during bulkRemove'
);
});
});

View file

@ -68,7 +68,7 @@ export function getBeforeSetup(
ownerId: null,
enabled: false,
});
taskManager.bulkRemoveIfExist.mockResolvedValue({
taskManager.bulkRemove.mockResolvedValue({
statuses: [{ id: 'taskId', type: 'alert', success: true }],
});
const actionsClient = actionsClientMock.create();

View file

@ -1,43 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { v4 as uuidv4 } from 'uuid';
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { bulkRemoveIfExist } from './bulk_remove_if_exist';
import { taskStoreMock } from '../task_store.mock';
describe('removeIfExists', () => {
const ids = [uuidv4(), uuidv4()];
test('removes the tasks by its IDs', async () => {
const ts = taskStoreMock.create({});
expect(await bulkRemoveIfExist(ts, ids)).toBe(undefined);
expect(ts.bulkRemove).toHaveBeenCalledWith(ids);
});
test('handles 404 errors caused by the task not existing', async () => {
const ts = taskStoreMock.create({});
ts.bulkRemove.mockRejectedValue(
SavedObjectsErrorHelpers.createGenericNotFoundError('task', ids[0])
);
expect(await bulkRemoveIfExist(ts, ids)).toBe(undefined);
expect(ts.bulkRemove).toHaveBeenCalledWith(ids);
});
test('throws if any other error is caused by task removal', async () => {
const ts = taskStoreMock.create({});
const error = SavedObjectsErrorHelpers.createInvalidVersionError(uuidv4());
ts.bulkRemove.mockRejectedValue(error);
expect(bulkRemoveIfExist(ts, ids)).rejects.toBe(error);
expect(ts.bulkRemove).toHaveBeenCalledWith(ids);
});
});

View file

@ -1,26 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import { TaskStore } from '../task_store';
/**
* Removes a task from the store, ignoring a not found error
* Other errors are re-thrown
*
* @param taskStore
* @param taskIds
*/
export async function bulkRemoveIfExist(taskStore: TaskStore, taskIds: string[]) {
try {
return await taskStore.bulkRemove(taskIds);
} catch (err) {
if (!SavedObjectsErrorHelpers.isNotFoundError(err)) {
throw err;
}
}
}

View file

@ -24,12 +24,12 @@ const createStartMock = () => {
get: jest.fn(),
aggregate: jest.fn(),
remove: jest.fn(),
bulkRemove: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),
ephemeralRunNow: jest.fn(),
ensureScheduled: jest.fn(),
removeIfExists: jest.fn(),
bulkRemoveIfExist: jest.fn(),
supportsEphemeralTasks: jest.fn(),
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),

View file

@ -18,12 +18,10 @@ import {
ServiceStatusLevels,
CoreStatus,
} from '@kbn/core/server';
import type { SavedObjectsBulkDeleteResponse } from '@kbn/core/server';
import { TaskPollingLifecycle } from './polling_lifecycle';
import { TaskManagerConfig } from './config';
import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware';
import { removeIfExists } from './lib/remove_if_exists';
import { bulkRemoveIfExist } from './lib/bulk_remove_if_exist';
import { setupSavedObjects } from './saved_objects';
import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary';
import { AggregationOpts, FetchResult, SearchOpts, TaskStore } from './task_store';
@ -61,10 +59,8 @@ export type TaskManagerStartContract = Pick<
| 'bulkDisable'
| 'bulkSchedule'
> &
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove'> & {
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove' | 'bulkRemove'> & {
removeIfExists: TaskStore['remove'];
} & {
bulkRemoveIfExist: (ids: string[]) => Promise<SavedObjectsBulkDeleteResponse | undefined>;
} & {
supportsEphemeralTasks: () => boolean;
getRegisteredTypes: () => string[];
@ -275,7 +271,7 @@ export class TaskManagerPlugin
taskStore.aggregate(opts),
get: (id: string) => taskStore.get(id),
remove: (id: string) => taskStore.remove(id),
bulkRemoveIfExist: (ids: string[]) => bulkRemoveIfExist(taskStore, ids),
bulkRemove: (ids: string[]) => taskStore.bulkRemove(ids),
removeIfExists: (id: string) => removeIfExists(taskStore, id),
schedule: (...args) => taskScheduling.schedule(...args),
bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args),

View file

@ -270,7 +270,18 @@ export class TaskPollingLifecycle {
this.createTaskRunnerForTask,
// place tasks in the Task Pool
async (tasks: TaskRunner[]) => {
const result = await this.pool.run(tasks);
const tasksToRun = [];
const removeTaskPromises = [];
for (const task of tasks) {
if (task.isAdHocTaskAndOutOfAttempts) {
this.logger.debug(`Removing ${task} because the max attempts have been reached.`);
removeTaskPromises.push(task.removeTask());
} else {
tasksToRun.push(task);
}
}
// Wait for all the promises at once to speed up the polling cycle
const [result] = await Promise.all([this.pool.run(tasksToRun), ...removeTaskPromises]);
// Emit the load after fetching tasks, giving us a good metric for evaluating how
// busy Task manager tends to be in this Kibana instance
this.emitEvent(asTaskManagerStatEvent('load', asOk(this.pool.workerLoad)));

View file

@ -138,8 +138,7 @@ if (doc['task.runAt'].size()!=0) {
script: {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
if(ctx._source.task.retryAt != null && ZonedDateTime.parse(ctx._source.task.retryAt).toInstant().toEpochMilli() < params.now) {
ctx._source.task.scheduledAt=ctx._source.task.retryAt;
} else {
ctx._source.task.scheduledAt=ctx._source.task.runAt;
@ -147,9 +146,6 @@ if (doc['task.runAt'].size()!=0) {
ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates)
.map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`)
.join(' ')}
} else {
ctx._source.task.status = "failed";
}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {

View file

@ -146,11 +146,7 @@ export const updateFieldsAndMarkAsFailed = ({
return {
source: `
if (params.claimableTaskTypes.contains(ctx._source.task.taskType)) {
if (ctx._source.task.schedule != null || ctx._source.task.attempts < params.taskMaxAttempts[ctx._source.task.taskType]) {
${setScheduledAtAndMarkAsClaimed}
} else {
ctx._source.task.status = "failed";
}
${setScheduledAtAndMarkAsClaimed}
} else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) {
ctx._source.task.status = "unrecognized";
} else {

View file

@ -91,6 +91,7 @@ export type CancelFunction = () => Promise<RunResult | undefined | void>;
export interface CancellableTask {
run: RunFunction;
cancel?: CancelFunction;
cleanup?: () => Promise<void>;
}
export type TaskRunCreatorFunction = (context: RunContext) => CancellableTask;

View file

@ -409,6 +409,8 @@ describe('TaskPool', () => {
run: mockRun(),
stage: TaskRunningStage.PENDING,
toString: () => `TaskType "shooooo"`,
isAdHocTaskAndOutOfAttempts: false,
removeTask: jest.fn(),
get expiration() {
return new Date();
},

View file

@ -181,6 +181,14 @@ export class EphemeralTaskManagerRunner implements TaskRunner {
return this.expiration < new Date();
}
/**
* Returns true whenever the task is ad hoc and has ran out of attempts. When true before
* running a task, the task should be deleted instead of ran.
*/
public get isAdHocTaskAndOutOfAttempts() {
return false;
}
public get isEphemeral() {
return true;
}
@ -252,6 +260,11 @@ export class EphemeralTaskManagerRunner implements TaskRunner {
}
}
/**
* Used by the non-ephemeral task runner
*/
public async removeTask(): Promise<void> {}
/**
* Noop for Ephemeral tasks
*

View file

@ -896,9 +896,8 @@ describe('TaskManagerRunner', () => {
await runner.run();
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
expect(store.remove).toHaveBeenCalled();
expect(store.update).not.toHaveBeenCalled();
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
@ -1101,11 +1100,8 @@ describe('TaskManagerRunner', () => {
await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
expect(store.remove).toHaveBeenCalled();
expect(store.update).not.toHaveBeenCalled();
});
test('bypasses getRetry function (returning false) on error of a recurring task', async () => {
@ -1170,13 +1166,8 @@ describe('TaskManagerRunner', () => {
await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.attempts).toEqual(3);
expect(instance.status).toEqual('failed');
expect(instance.retryAt!).toBeNull();
expect(instance.runAt.getTime()).toBeLessThanOrEqual(Date.now());
expect(instance.enabled).not.toBeDefined();
expect(store.remove).toHaveBeenCalled();
expect(store.update).not.toHaveBeenCalled();
});
test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => {
@ -1403,9 +1394,8 @@ describe('TaskManagerRunner', () => {
await runner.run();
const instance = store.update.mock.calls[0][0];
expect(instance.status).toBe('failed');
expect(instance.enabled).not.toBeDefined();
expect(store.remove).toHaveBeenCalled();
expect(store.update).not.toHaveBeenCalled();
expect(onTaskEvent).toHaveBeenCalledWith(
withAnyTiming(
@ -1509,6 +1499,113 @@ describe('TaskManagerRunner', () => {
});
});
describe('isAdHocTaskAndOutOfAttempts', () => {
it(`should return false if the task doesn't have a schedule`, async () => {
const { runner } = await pendingStageSetup({
instance: {
id: 'foo',
taskType: 'testbar',
},
});
expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(false);
});
it(`should return false if the recurring task still has attempts remaining`, async () => {
const { runner } = await pendingStageSetup({
instance: {
id: 'foo',
taskType: 'testbar',
attempts: 4,
},
});
expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(false);
});
it(`should return true if the recurring task is out of attempts`, async () => {
const { runner } = await pendingStageSetup({
instance: {
id: 'foo',
taskType: 'testbar',
attempts: 5,
},
});
expect(runner.isAdHocTaskAndOutOfAttempts).toEqual(true);
});
});
describe('removeTask()', () => {
it(`should remove the task saved-object`, async () => {
const { runner, store } = await readyToRunStageSetup({
instance: {
id: 'foo',
taskType: 'testbar',
},
});
await runner.run();
await runner.removeTask();
expect(store.remove).toHaveBeenCalledWith('foo');
});
it(`should call the task cleanup function if defined`, async () => {
const cleanupFn = jest.fn();
const { runner } = await readyToRunStageSetup({
instance: {
id: 'foo',
taskType: 'testbar2',
},
definitions: {
testbar2: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { state: {} };
},
cancel: jest.fn(),
cleanup: cleanupFn,
}),
},
},
});
// Remove task is called after run() with the this.task object defined
await runner.run();
expect(cleanupFn).toHaveBeenCalledTimes(1);
});
it(`doesn't throw an error if the cleanup function throws an error`, async () => {
const cleanupFn = jest.fn().mockRejectedValue(new Error('Fail'));
const { runner, logger } = await readyToRunStageSetup({
instance: {
id: 'foo',
taskType: 'testbar2',
},
definitions: {
testbar2: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { state: {} };
},
cancel: jest.fn(),
cleanup: cleanupFn,
}),
},
},
});
// Remove task is called after run() with the this.task object defined
await runner.run();
expect(cleanupFn).toHaveBeenCalledTimes(1);
expect(logger.error).toHaveBeenCalledWith(
`Error encountered when running onTaskRemoved() hook for testbar2 "foo": Fail`
);
});
});
interface TestOpts {
instance?: Partial<ConcreteTaskInstance>;
definitions?: TaskDefinitionRegistry;

View file

@ -73,6 +73,8 @@ export interface TaskRunner {
isEphemeral?: boolean;
toString: () => string;
isSameTask: (executionId: string) => boolean;
isAdHocTaskAndOutOfAttempts: boolean;
removeTask: () => Promise<void>;
}
export enum TaskRunningStage {
@ -258,6 +260,14 @@ export class TaskManagerRunner implements TaskRunner {
return this.expiration < new Date();
}
/**
* Returns true whenever the task is ad hoc and has ran out of attempts. When true before
* running a task, the task should be deleted instead of ran.
*/
public get isAdHocTaskAndOutOfAttempts() {
return !this.instance.task.schedule && this.instance.task.attempts >= this.getMaxAttempts();
}
/**
* Returns a log-friendly representation of this task.
*/
@ -330,6 +340,19 @@ export class TaskManagerRunner implements TaskRunner {
}
}
public async removeTask(): Promise<void> {
await this.bufferedTaskStore.remove(this.id);
if (this.task?.cleanup) {
try {
await this.task.cleanup();
} catch (e) {
this.logger.error(
`Error encountered when running onTaskRemoved() hook for ${this}: ${e.message}`
);
}
}
}
/**
* Attempts to claim exclusive rights to run the task. If the attempt fails
* with a 409 (http conflict), we assume another Kibana instance beat us to the punch.
@ -474,8 +497,7 @@ export class TaskManagerRunner implements TaskRunner {
return false;
}
const maxAttempts = this.definition.maxAttempts || this.defaultMaxAttempts;
return this.instance.task.attempts < maxAttempts;
return this.instance.task.attempts < this.getMaxAttempts();
}
private rescheduleFailedRun = (
@ -536,7 +558,17 @@ export class TaskManagerRunner implements TaskRunner {
unwrap
)(result);
if (!this.isExpired) {
if (this.isExpired) {
this.usageCounter?.incrementCounter({
counterName: `taskManagerUpdateSkippedDueToTaskExpiration`,
counterType: 'taskManagerTaskRunner',
incrementBy: 1,
});
} else if (fieldUpdates.status === TaskStatus.Failed) {
// Delete the SO instead so it doesn't remain in the index forever
this.instance = asRan(this.instance.task);
await this.removeTask();
} else {
this.instance = asRan(
await this.bufferedTaskStore.update(
defaults(
@ -551,12 +583,6 @@ export class TaskManagerRunner implements TaskRunner {
)
)
);
} else {
this.usageCounter?.incrementCounter({
counterName: `taskManagerUpdateSkippedDueToTaskExpiration`,
counterType: 'taskManagerTaskRunner',
incrementBy: 1,
});
}
return fieldUpdates.status === TaskStatus.Failed
@ -569,8 +595,8 @@ export class TaskManagerRunner implements TaskRunner {
private async processResultWhenDone(): Promise<TaskRunResult> {
// not a recurring task: clean up by removing the task instance from store
try {
await this.bufferedTaskStore.remove(this.id);
this.instance = asRan(this.instance.task);
await this.removeTask();
} catch (err) {
if (err.statusCode === 404) {
this.logger.warn(`Task cleanup of ${this} failed in processing. Was remove called twice?`);
@ -660,6 +686,12 @@ export class TaskManagerRunner implements TaskRunner {
}
return result;
}
private getMaxAttempts() {
return this.definition.maxAttempts !== undefined
? this.definition.maxAttempts
: this.defaultMaxAttempts;
}
}
function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance {

View file

@ -22,7 +22,11 @@ export default function createRunSoonTests({ getService }: FtrProviderContext) {
const objectRemover = new ObjectRemover(supertest);
before(async () => {
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id');
// Not 100% sure why, seems the rules need to be loaded separately to avoid the task
// failing to load the rule during execution and deleting itself. Otherwise
// we have flakiness
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules');
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks');
});
afterEach(async () => {
@ -30,7 +34,8 @@ export default function createRunSoonTests({ getService }: FtrProviderContext) {
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id');
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks');
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules');
});
it('should successfully run rule where scheduled task id is different than rule id', async () => {

View file

@ -36,11 +36,16 @@ export default function createScheduledTaskIdTests({ getService }: FtrProviderCo
}
before(async () => {
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id');
// Not 100% sure why, seems the rules need to be loaded separately to avoid the task
// failing to load the rule during execution and deleting itself. Otherwise
// we have flakiness
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules');
await esArchiver.load('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks');
});
after(async () => {
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id');
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/tasks');
await esArchiver.unload('x-pack/test/functional/es_archives/rules_scheduled_task_id/rules');
});
it('cannot create rule with same ID as a scheduled task ID used by another rule', async () => {

View file

@ -40,39 +40,6 @@
}
}
{
"type": "doc",
"value": {
"id": "task:329798f0-b0b0-11ea-9510-fdf248d5f2a4",
"index": ".kibana_task_manager_1",
"source": {
"migrationVersion": {
"task": "7.16.0"
},
"task": {
"attempts": 0,
"ownerId": null,
"params": "{\"alertId\":\"74f3e6d7-b7bb-477d-ac28-92ee22728e6e\",\"spaceId\":\"default\"}",
"retryAt": null,
"runAt": "2021-11-05T16:21:52.148Z",
"schedule": {
"interval": "1m"
},
"scheduledAt": "2021-11-05T15:28:42.055Z",
"scope": [
"alerting"
],
"startedAt": null,
"status": "idle",
"taskType": "alerting:example.always-firing"
},
"references": [],
"type": "task",
"updated_at": "2021-11-05T16:21:37.629Z"
}
}
}
{
"type": "doc",
"value": {
@ -114,36 +81,3 @@
}
}
}
{
"type": "doc",
"value": {
"id": "task:46be60d4-ae63-48ed-ab6f-f4d9b4defacf",
"index": ".kibana_task_manager_1",
"source": {
"migrationVersion": {
"task": "7.16.0"
},
"task": {
"attempts": 0,
"ownerId": null,
"params": "{\"alertId\":\"46be60d4-ae63-48ed-ab6f-f4d9b4defacf\",\"spaceId\":\"default\"}",
"retryAt": null,
"runAt": "2021-11-05T16:21:52.148Z",
"schedule": {
"interval": "1m"
},
"scheduledAt": "2021-11-05T15:28:42.055Z",
"scope": [
"alerting"
],
"startedAt": null,
"status": "idle",
"taskType": "sampleTaskRemovedType"
},
"references": [],
"type": "task",
"updated_at": "2021-11-05T16:21:37.629Z"
}
}
}

View file

@ -342,119 +342,3 @@
}
}
}
{
"type": "index",
"value": {
"aliases": {
".kibana_task_manager": {
}
},
"index": ".kibana_task_manager_1",
"mappings": {
"_meta": {
"migrationMappingPropertyHashes": {
"migrationVersion": "4a1746014a75ade3a714e1db5763276f",
"namespace": "2f4316de49999235636386fe51dc06c1",
"namespaces": "2f4316de49999235636386fe51dc06c1",
"originId": "2f4316de49999235636386fe51dc06c1",
"references": "7997cf5a56cc02bdc9c93361bde732b0",
"task": "235412e52d09e7165fac8a67a43ad6b4",
"type": "2f4316de49999235636386fe51dc06c1",
"updated_at": "00da57df13e94e9d98437d13ace4bfe0"
}
},
"dynamic": "strict",
"properties": {
"migrationVersion": {
"dynamic": "true",
"properties": {
"task": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
}
}
},
"references": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"type": {
"type": "keyword"
}
},
"type": "nested"
},
"task": {
"properties": {
"attempts": {
"type": "integer"
},
"ownerId": {
"type": "keyword"
},
"params": {
"type": "text"
},
"retryAt": {
"type": "date"
},
"runAt": {
"type": "date"
},
"schedule": {
"properties": {
"interval": {
"type": "keyword"
}
}
},
"scheduledAt": {
"type": "date"
},
"scope": {
"type": "keyword"
},
"startedAt": {
"type": "date"
},
"state": {
"type": "text"
},
"status": {
"type": "keyword"
},
"taskType": {
"type": "keyword"
},
"user": {
"type": "keyword"
}
}
},
"type": {
"type": "keyword"
},
"updated_at": {
"type": "date"
}
}
},
"settings": {
"index": {
"auto_expand_replicas": "0-1",
"number_of_replicas": "0",
"number_of_shards": "1"
}
}
}
}

View file

@ -0,0 +1,65 @@
{
"type": "doc",
"value": {
"id": "task:329798f0-b0b0-11ea-9510-fdf248d5f2a4",
"index": ".kibana_task_manager_1",
"source": {
"migrationVersion": {
"task": "7.16.0"
},
"task": {
"attempts": 0,
"ownerId": null,
"params": "{\"alertId\":\"74f3e6d7-b7bb-477d-ac28-92ee22728e6e\",\"spaceId\":\"default\"}",
"retryAt": null,
"runAt": "2021-11-05T16:21:52.148Z",
"schedule": {
"interval": "1m"
},
"scheduledAt": "2021-11-05T15:28:42.055Z",
"scope": [
"alerting"
],
"startedAt": null,
"status": "idle",
"taskType": "alerting:example.always-firing"
},
"references": [],
"type": "task",
"updated_at": "2021-11-05T16:21:37.629Z"
}
}
}
{
"type": "doc",
"value": {
"id": "task:46be60d4-ae63-48ed-ab6f-f4d9b4defacf",
"index": ".kibana_task_manager_1",
"source": {
"migrationVersion": {
"task": "7.16.0"
},
"task": {
"attempts": 0,
"ownerId": null,
"params": "{\"alertId\":\"46be60d4-ae63-48ed-ab6f-f4d9b4defacf\",\"spaceId\":\"default\"}",
"retryAt": null,
"runAt": "2021-11-05T16:21:52.148Z",
"schedule": {
"interval": "1m"
},
"scheduledAt": "2021-11-05T15:28:42.055Z",
"scope": [
"alerting"
],
"startedAt": null,
"status": "idle",
"taskType": "sampleTaskRemovedType"
},
"references": [],
"type": "task",
"updated_at": "2021-11-05T16:21:37.629Z"
}
}
}

View file

@ -0,0 +1,115 @@
{
"type": "index",
"value": {
"aliases": {
".kibana_task_manager": {
}
},
"index": ".kibana_task_manager_1",
"mappings": {
"_meta": {
"migrationMappingPropertyHashes": {
"migrationVersion": "4a1746014a75ade3a714e1db5763276f",
"namespace": "2f4316de49999235636386fe51dc06c1",
"namespaces": "2f4316de49999235636386fe51dc06c1",
"originId": "2f4316de49999235636386fe51dc06c1",
"references": "7997cf5a56cc02bdc9c93361bde732b0",
"task": "235412e52d09e7165fac8a67a43ad6b4",
"type": "2f4316de49999235636386fe51dc06c1",
"updated_at": "00da57df13e94e9d98437d13ace4bfe0"
}
},
"dynamic": "strict",
"properties": {
"migrationVersion": {
"dynamic": "true",
"properties": {
"task": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
}
}
},
"references": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"type": {
"type": "keyword"
}
},
"type": "nested"
},
"task": {
"properties": {
"attempts": {
"type": "integer"
},
"ownerId": {
"type": "keyword"
},
"params": {
"type": "text"
},
"retryAt": {
"type": "date"
},
"runAt": {
"type": "date"
},
"schedule": {
"properties": {
"interval": {
"type": "keyword"
}
}
},
"scheduledAt": {
"type": "date"
},
"scope": {
"type": "keyword"
},
"startedAt": {
"type": "date"
},
"state": {
"type": "text"
},
"status": {
"type": "keyword"
},
"taskType": {
"type": "keyword"
},
"user": {
"type": "keyword"
}
}
},
"type": {
"type": "keyword"
},
"updated_at": {
"type": "date"
}
}
},
"settings": {
"index": {
"auto_expand_replicas": "0-1",
"number_of_replicas": "0",
"number_of_shards": "1"
}
}
}
}

View file

@ -10,7 +10,6 @@ import { random } from 'lodash';
import expect from '@kbn/expect';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json';
import { DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config';
import { ConcreteTaskInstance, BulkUpdateTaskResult } from '@kbn/task-manager-plugin/server';
import { FtrProviderContext } from '../../ftr_provider_context';
@ -18,8 +17,6 @@ const {
task: { properties: taskManagerIndexMapping },
} = TaskManagerMapping;
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
export interface RawDoc {
_id: string;
_source: any;
@ -563,12 +560,12 @@ export default function ({ getService }: FtrProviderContext) {
params: { throwOnMarkAsRunning: true },
});
await delay(DEFAULT_POLL_INTERVAL * 3);
expect(originalTask.attempts).to.eql(0);
// Wait for task manager to attempt running the task a second time
await retry.try(async () => {
const task = await currentTask(originalTask.id);
expect(task.attempts).to.eql(3);
expect(task.status).to.eql('failed');
expect(task.attempts).to.eql(2);
});
});
@ -769,17 +766,15 @@ export default function ({ getService }: FtrProviderContext) {
});
});
it('should mark non-recurring task as failed if task is still running but maxAttempts has been reached', async () => {
const task = await scheduleTask({
it('should delete the task if it is still running but maxAttempts has been reached', async () => {
await scheduleTask({
taskType: 'sampleOneTimeTaskThrowingError',
params: {},
});
await retry.try(async () => {
const [scheduledTask] = (await currentTasks()).docs;
expect(scheduledTask.id).to.eql(task.id);
expect(scheduledTask.status).to.eql('failed');
expect(scheduledTask.attempts).to.eql(3);
const results = (await currentTasks()).docs;
expect(results.length).to.eql(0);
});
});
@ -894,38 +889,6 @@ export default function ({ getService }: FtrProviderContext) {
});
});
it('should allow a failed task to be rerun using runSoon', async () => {
const taskThatFailsBeforeRunNow = await scheduleTask({
taskType: 'singleAttemptSampleTask',
params: {
waitForParams: true,
},
});
// tell the task to fail on its next run
await provideParamsToTasksWaitingForParams(taskThatFailsBeforeRunNow.id, {
failWith: 'error on first run',
});
// wait for task to fail
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, taskThatFailsBeforeRunNow.id).status).to.eql('failed');
});
// run the task again
await runTaskSoon({
id: taskThatFailsBeforeRunNow.id,
});
// runTaskSoon should successfully update the runAt property of the task
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(
Date.parse(getTaskById(tasks, taskThatFailsBeforeRunNow.id).runAt)
).to.be.greaterThan(Date.parse(taskThatFailsBeforeRunNow.runAt));
});
});
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
// it('should return the resulting task state when asked to run an ephemeral task now', async () => {
// const ephemeralTask = await runEphemeralTaskNow({