Remove skipping task logic from Task Manager (#177244)

Towards: #176585

This PR removes the task skipping logic from TaskManager, PRs for
Alerting and Actions will follow.

## To verify
Rules and actions should be still working as expected.

---------

Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ersin Erdal 2024-03-14 15:52:56 +01:00 committed by GitHub
parent c60227c089
commit 2abe492033
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 86 additions and 983 deletions

View file

@ -137,9 +137,6 @@ uiSettings:
xpack.task_manager.allow_reading_invalid_state: false
xpack.task_manager.request_timeouts.update_by_query: 60000
## TaskManager requeue invalid tasks, supports ZDT
xpack.task_manager.requeue_invalid_tasks.enabled: true
# Reporting feature
xpack.screenshotting.enabled: false
xpack.reporting.queue.pollInterval: 3m

View file

@ -151,7 +151,7 @@ describe('checking migration metadata changes on all registered SO types', () =>
"synthetics-param": "3ebb744e5571de678b1312d5c418c8188002cf5e",
"synthetics-privates-locations": "f53d799d5c9bc8454aaa32c6abc99a899b025d5c",
"tag": "e2544392fe6563e215bb677abc8b01c2601ef2dc",
"task": "04f30bd7bae923f3a53c31ab3b9745a93872fc02",
"task": "d17f2fc0bf6759a070c2221ec2787ad785c680fe",
"telemetry": "7b00bcf1c7b4f6db1192bb7405a6a63e78b699fd",
"threshold-explorer-view": "175306806f9fc8e13fcc1c8953ec4ba89bda1b70",
"ui-metric": "d227284528fd19904e9d972aea0a13716fc5fe24",

View file

@ -59,7 +59,6 @@ export interface ActionExecutorContext {
export interface TaskInfo {
scheduled: Date;
attempts: number;
numSkippedRuns?: number;
}
export interface ExecuteOptions<Source = unknown> {

View file

@ -106,7 +106,6 @@ export class TaskRunnerFactory {
const taskInfo = {
scheduled: taskInstance.runAt,
attempts: taskInstance.attempts,
numSkippedRuns: taskInstance.numSkippedRuns,
};
const actionExecutionId = uuidv4();
const actionTaskExecutorParams = taskInstance.params as ActionTaskExecutorParams;

View file

@ -45,11 +45,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
@ -108,11 +103,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],
@ -174,11 +164,6 @@ describe('config validation', () => {
"request_timeouts": Object {
"update_by_query": 30000,
},
"requeue_invalid_tasks": Object {
"delay": 3000,
"enabled": false,
"max_attempts": 100,
},
"unsafe": Object {
"authenticate_background_task_utilization": true,
"exclude_task_types": Array [],

View file

@ -56,12 +56,6 @@ const eventLoopDelaySchema = schema.object({
}),
});
const requeueInvalidTasksConfig = schema.object({
delay: schema.number({ defaultValue: 3000, min: 0 }),
enabled: schema.boolean({ defaultValue: false }),
max_attempts: schema.number({ defaultValue: 100, min: 1, max: 500 }),
});
const requestTimeoutsConfig = schema.object({
/* The request timeout config for task manager's updateByQuery default:30s, min:10s, max:10m */
update_by_query: schema.number({ defaultValue: 1000 * 30, min: 1000 * 10, max: 1000 * 60 * 10 }),
@ -143,7 +137,6 @@ export const configSchema = schema.object(
defaultValue: 1000,
min: 1,
}),
requeue_invalid_tasks: requeueInvalidTasksConfig,
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
authenticate_background_task_utilization: schema.boolean({ defaultValue: true }),
@ -181,7 +174,6 @@ export const configSchema = schema.object(
}
);
export type RequeueInvalidTasksConfig = TypeOf<typeof requeueInvalidTasksConfig>;
export type TaskManagerConfig = TypeOf<typeof configSchema>;
export type TaskExecutionFailureThreshold = TypeOf<typeof taskExecutionFailureThresholdSchema>;
export type EventLoopDelayConfig = TypeOf<typeof eventLoopDelaySchema>;

View file

@ -79,11 +79,6 @@ describe('EphemeralTaskLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -34,8 +34,6 @@ export {
throwUnrecoverableError,
throwRetryableError,
isEphemeralTaskRejectedDueToCapacityError,
isSkipError,
createSkipError,
createTaskRunError,
TaskErrorSource,
} from './task_running';

View file

@ -74,11 +74,6 @@ describe('managed configuration', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -51,11 +51,6 @@ const config = {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -62,11 +62,6 @@ const config: TaskManagerConfig = {
},
poll_interval: 6000000,
request_capacity: 1000,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
unsafe: {
authenticate_background_task_utilization: true,
exclude_task_types: [],

View file

@ -47,11 +47,6 @@ describe('Configuration Statistics Aggregator', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -52,11 +52,6 @@ describe('createMonitoringStatsStream', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -72,11 +72,6 @@ const pluginInitializerContextParams = {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -77,11 +77,6 @@ describe('TaskPollingLifecycle', () => {
warn_threshold: 5000,
},
worker_utilization_running_average_window: 5,
requeue_invalid_tasks: {
enabled: false,
delay: 3000,
max_attempts: 20,
},
metrics_reset_interval: 3000,
claim_strategy: 'default',
request_timeouts: {

View file

@ -217,7 +217,6 @@ export class TaskPollingLifecycle implements ITaskEventEmitter<TaskLifecycleEven
executionContext: this.executionContext,
usageCounter: this.usageCounter,
eventLoopDelayConfig: { ...this.config.event_loop_delay },
requeueInvalidTasksConfig: this.config.requeue_invalid_tasks,
allowReadingInvalidState: this.config.allow_reading_invalid_state,
});
};

View file

@ -7,6 +7,7 @@
import type { SavedObjectsServiceSetup } from '@kbn/core/server';
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { taskModelVersions } from './task_model_versions';
import { taskMappings } from './mappings';
import { getMigrations } from './migrations';
import { TaskManagerConfig } from '../config';
@ -72,5 +73,6 @@ export function setupSavedObjects(
},
} as estypes.QueryDslQueryContainer;
},
modelVersions: taskModelVersions,
});
}

View file

@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { schema } from '@kbn/config-schema';
export const taskSchemaV1 = schema.object({
taskType: schema.string(),
scheduledAt: schema.string(),
startedAt: schema.nullable(schema.string()),
retryAt: schema.nullable(schema.string()),
runAt: schema.string(),
schedule: schema.maybe(
schema.object({
interval: schema.duration(),
})
),
params: schema.string(),
state: schema.string(),
stateVersion: schema.maybe(schema.number()),
traceparent: schema.string(),
user: schema.maybe(schema.string()),
scope: schema.maybe(schema.arrayOf(schema.string())),
ownerId: schema.nullable(schema.string()),
enabled: schema.maybe(schema.boolean()),
timeoutOverride: schema.maybe(schema.string()),
attempts: schema.number(),
status: schema.oneOf([
schema.literal('idle'),
schema.literal('claiming'),
schema.literal('running'),
schema.literal('failed'),
schema.literal('unrecognized'),
schema.literal('dead_letter'),
]),
version: schema.maybe(schema.string()),
});

View file

@ -0,0 +1,23 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SavedObjectsModelVersionMap } from '@kbn/core-saved-objects-server';
import { taskSchemaV1 } from './schemas/task';
export const taskModelVersions: SavedObjectsModelVersionMap = {
'1': {
changes: [
{
type: 'mappings_deprecation',
deprecatedMappings: ['numSkippedRuns', 'interval'],
},
],
schemas: {
create: taskSchemaV1,
},
},
};

View file

@ -55,7 +55,6 @@ export type SuccessfulRunResult = {
*/
state: Record<string, unknown>;
taskRunError?: DecoratedError;
skipAttempts?: number;
shouldValidate?: boolean;
} & (
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
@ -341,11 +340,6 @@ export interface TaskInstance {
*/
enabled?: boolean;
/**
* Indicates the number of skipped executions.
*/
numSkippedRuns?: number;
/*
* Optionally override the timeout defined in the task type for this specific task instance
*/
@ -362,6 +356,10 @@ export interface TaskInstanceWithDeprecatedFields extends TaskInstance {
* An interval in minutes (e.g. '5m'). If specified, this is a recurring task.
* */
interval?: string;
/**
* Indicates the number of skipped executions.
*/
numSkippedRuns?: number;
}
/**
@ -384,6 +382,11 @@ export interface ConcreteTaskInstance extends TaskInstance {
*/
interval?: string;
/**
* @deprecated removed with version 8.14.0
*/
numSkippedRuns?: number;
/**
* The saved object version from the Elasticsearch document.
*/

View file

@ -6,10 +6,8 @@
*/
import {
createSkipError,
createTaskRunError,
getErrorSource,
isSkipError,
isUnrecoverableError,
isUserError,
TaskErrorSource,
@ -36,10 +34,6 @@ describe('Error Types', () => {
expect(isUnrecoverableError(new Error('OMG'))).toBeFalsy();
});
it('createSkipError', () => {
expect(isSkipError(createSkipError(new Error('OMG')))).toBeTruthy();
});
it('createTaskRunError', () => {
expect(isUserError(createTaskRunError(new Error('OMG'), TaskErrorSource.USER))).toBeTruthy();
});

View file

@ -12,7 +12,6 @@ export { TaskErrorSource };
// Unrecoverable
const CODE_UNRECOVERABLE = 'TaskManager/unrecoverable';
const CODE_RETRYABLE = 'TaskManager/retryable';
const CODE_SKIP = 'TaskManager/skip';
const code = Symbol('TaskManagerErrorCode');
const retry = Symbol('TaskManagerErrorRetry');
@ -63,18 +62,6 @@ export function throwRetryableError(error: Error, shouldRetry: Date | boolean) {
throw error;
}
export function isSkipError(error: Error | DecoratedError) {
if (isTaskManagerError(error) && error[code] === CODE_SKIP) {
return true;
}
return false;
}
export function createSkipError(error: Error): DecoratedError {
(error as DecoratedError)[code] = CODE_SKIP;
return error;
}
export function createTaskRunError(
error: Error,
errorSource = TaskErrorSource.FRAMEWORK

View file

@ -29,7 +29,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server';
import moment from 'moment';
import { TaskDefinitionRegistry, TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';
import { createSkipError, throwRetryableError, throwUnrecoverableError } from './errors';
import { throwRetryableError, throwUnrecoverableError } from './errors';
import apm from 'elastic-apm-node';
import { executionContextServiceMock } from '@kbn/core/server/mocks';
import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock';
@ -41,16 +41,10 @@ import {
TASK_MANAGER_TRANSACTION_TYPE_MARK_AS_RUNNING,
} from './task_runner';
import { schema } from '@kbn/config-schema';
import { RequeueInvalidTasksConfig } from '../config';
const baseDelay = 5 * 60 * 1000;
const executionContext = executionContextServiceMock.createSetupContract();
const minutesFromNow = (mins: number): Date => secondsFromNow(mins * 60);
const mockRequeueInvalidTasksConfig = {
enabled: false,
delay: 3000,
max_attempts: 20,
};
let fakeTimer: sinon.SinonFakeTimers;
@ -1390,11 +1384,6 @@ describe('TaskManagerRunner', () => {
},
},
},
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
});
expect(await runner.run()).toEqual({
@ -1459,11 +1448,6 @@ describe('TaskManagerRunner', () => {
},
},
},
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
allowReadingInvalidState: true,
});
@ -2004,552 +1988,6 @@ describe('TaskManagerRunner', () => {
tags: ['task:end', 'foo', 'bar'],
});
});
describe('Skip Tasks', () => {
test('skips task.run when the task has invalid params', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Idle,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
params: { foo: 'bar' },
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async run() {
return { state: { foo: 'bar' } };
},
}),
paramsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBe(
new Date(Date.now()).getTime() + mockRequeueInvalidTasksConfig.delay
);
expect(instance.state).toEqual(mockTaskInstance.state);
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(1);
expect(result).toEqual(
asErr({
error: createSkipError(
new Error('[baz]: expected value of type [string] but got [undefined]')
),
state: {
existingStatePAram: 'foo',
},
})
);
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error: [baz]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has skipped executing the Task (bar/foo) 1 times as it has invalid params.'
);
});
test('skips task.run when the task has invalid indirect params e.g. rule', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: true,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBe(
new Date(Date.now()).getTime() + mockRequeueInvalidTasksConfig.delay
);
expect(instance.state).toEqual(mockTaskInstance.state);
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(1);
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error in its indirect params: [baz]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has skipped executing the Task (bar/foo) 1 times as it has invalid params.'
);
expect(result).toEqual(
asErr({
state: mockTaskInstance.state,
error: createSkipError(
new Error('[baz]: expected value of type [string] but got [undefined]')
),
})
);
});
test('does not skip when disabled (recurring task)', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
attempts: 1,
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
numSkippedRuns: mockRequeueInvalidTasksConfig.max_attempts,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: false,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: { new: 'foo' } };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.runAt.getTime()).toBeGreaterThan(mockTaskInstance.runAt!.getTime()); // reschedule attempt
expect(instance.state).toEqual({ new: 'foo' });
expect(instance.schedule).toEqual(mockTaskInstance.schedule);
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(0);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: { new: 'foo' } }));
});
test('does not skip when disabled (non-recurring task)', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
attempts: 5, // defaultMaxAttempts
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStatePAram: 'foo' },
runAt: new Date(),
numSkippedRuns: 0,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: false,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: { new: 'foo' } };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(), // { foo: 'bar' } is valid
}),
},
},
});
await runner.run();
expect(store.update).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(store.remove).toHaveBeenCalled();
});
test('resets skip attempts on the first successful run', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
const instance = store.update.mock.calls[0][0];
expect(instance.state).toEqual({});
expect(instance.attempts).toBe(0);
expect(instance.numSkippedRuns).toBe(0);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asOk({ state: {} }));
});
test('removes the non-recurring tasks on the first successful run after skipping', async () => {
const cleanupFn = jest.fn();
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {} };
},
cleanup: cleanupFn,
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(cleanupFn).toHaveBeenCalled();
expect(store.remove).toHaveBeenCalledWith('foo');
expect(result).toEqual(asOk({ state: {} }));
});
test('does not resets skip attempts for a recurring task as long as there is an error', async () => {
const taskRunError = createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK);
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
schedule: { interval: '10m' },
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return {
state: {},
taskRunError,
};
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledTimes(1);
expect(store.remove).not.toHaveBeenCalled();
const instance = store.update.mock.calls[0][0];
expect(instance.numSkippedRuns).toBe(mockTaskInstance.numSkippedRuns);
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(
asOk({
state: {},
taskRunError,
})
);
});
test('does not resets skip attempts for a non-recurring task as long as there is an error', async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
attempts: 3,
};
const error = new Error('test');
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, error };
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: 3,
numSkippedRuns: 20,
state: {},
status: TaskStatus.Idle,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(logger.warn).not.toHaveBeenCalled();
expect(result).toEqual(asErr({ state: {}, error }));
});
test("sets non recurring task's status as dead_letter after skip and retry attempts ", async () => {
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20, // max
attempts: 5, // default max
};
const error = new Error('test');
const { runner, store } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
delay: 3000,
enabled: true,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { foo: 'bar' } } };
},
async run() {
return { state: {}, error };
},
}),
indirectParamsSchema: schema.object({
baz: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: mockTaskInstance.attempts, // default max
numSkippedRuns: mockTaskInstance.numSkippedRuns,
state: mockTaskInstance.state,
status: TaskStatus.DeadLetter,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(result).toEqual(asErr({ state: {}, error }));
});
test('stops skipping when the max skip limit is reached', async () => {
const taskRunError = createTaskRunError(new Error('test'), TaskErrorSource.FRAMEWORK);
const mockTaskInstance: Partial<ConcreteTaskInstance> = {
status: TaskStatus.Running,
startedAt: new Date(),
schedule: { interval: '3s' },
enabled: true,
state: { existingStateParam: 'foo' },
runAt: new Date(),
numSkippedRuns: 20,
attempts: 0,
};
const { runner, store, logger } = await readyToRunStageSetup({
instance: mockTaskInstance,
requeueInvalidTasksConfig: {
enabled: true,
delay: 3000,
max_attempts: 20,
},
definitions: {
bar: {
title: 'Bar!',
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'bar' } } };
},
async run() {
return {
state: {},
taskRunError,
};
},
}),
indirectParamsSchema: schema.object({
foo: schema.string(),
}),
},
},
});
const result = await runner.run();
expect(store.update).toHaveBeenCalledWith(
expect.objectContaining({
attempts: 0,
numSkippedRuns: 20,
state: {},
status: TaskStatus.Idle,
}),
{ validate: true }
);
expect(store.remove).not.toHaveBeenCalled();
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(logger.warn).toHaveBeenCalledWith(
'Task (bar/foo) has a validation error in its indirect params: [foo]: expected value of type [string] but got [undefined]'
);
expect(logger.warn).toHaveBeenCalledWith(
'Task Manager has reached the max skip attempts for task bar/foo'
);
expect(result).toEqual(
asOk({
state: {},
taskRunError,
skipAttempts: 20,
})
);
});
});
});
describe('isAdHocTaskAndOutOfAttempts', () => {
@ -2683,7 +2121,6 @@ describe('TaskManagerRunner', () => {
instance?: Partial<ConcreteTaskInstance>;
definitions?: TaskDefinitionRegistry;
onTaskEvent?: jest.Mock<(event: TaskEvent<unknown, unknown>) => void>;
requeueInvalidTasksConfig?: RequeueInvalidTasksConfig;
allowReadingInvalidState?: boolean;
}
@ -2760,7 +2197,6 @@ describe('TaskManagerRunner', () => {
monitor: true,
warn_threshold: 5000,
},
requeueInvalidTasksConfig: opts.requeueInvalidTasksConfig || mockRequeueInvalidTasksConfig,
allowReadingInvalidState: opts.allowReadingInvalidState || false,
});

View file

@ -14,10 +14,9 @@
import apm from 'elastic-apm-node';
import { v4 as uuidv4 } from 'uuid';
import { withSpan } from '@kbn/apm-utils';
import { defaults, flow, identity, isUndefined, omit, random } from 'lodash';
import { defaults, flow, identity, omit, random } from 'lodash';
import { ExecutionContextStart, Logger, SavedObjectsErrorHelpers } from '@kbn/core/server';
import { UsageCounter } from '@kbn/usage-collection-plugin/server';
import moment from 'moment';
import { Middleware } from '../lib/middleware';
import {
asErr,
@ -49,14 +48,13 @@ import {
FailedRunResult,
FailedTaskResult,
isFailedRunResult,
RunContext,
SuccessfulRunResult,
TaskDefinition,
TaskStatus,
} from '../task';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { createSkipError, isRetryableError, isSkipError, isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig, RequeueInvalidTasksConfig } from '../config';
import { isRetryableError, isUnrecoverableError } from './errors';
import type { EventLoopDelayConfig } from '../config';
import { TaskValidator } from '../task_validator';
export const EMPTY_RUN_RESULT: SuccessfulRunResult = { state: {} };
@ -109,7 +107,6 @@ type Opts = {
executionContext: ExecutionContextStart;
usageCounter?: UsageCounter;
eventLoopDelayConfig: EventLoopDelayConfig;
requeueInvalidTasksConfig: RequeueInvalidTasksConfig;
allowReadingInvalidState: boolean;
} & Pick<Middleware, 'beforeRun' | 'beforeMarkRunning'>;
@ -159,7 +156,6 @@ export class TaskManagerRunner implements TaskRunner {
private readonly executionContext: ExecutionContextStart;
private usageCounter?: UsageCounter;
private eventLoopDelayConfig: EventLoopDelayConfig;
private readonly requeueInvalidTasksConfig: RequeueInvalidTasksConfig;
private readonly taskValidator: TaskValidator;
/**
@ -184,7 +180,6 @@ export class TaskManagerRunner implements TaskRunner {
executionContext,
usageCounter,
eventLoopDelayConfig,
requeueInvalidTasksConfig,
allowReadingInvalidState,
}: Opts) {
this.instance = asPending(sanitizeInstance(instance));
@ -199,7 +194,6 @@ export class TaskManagerRunner implements TaskRunner {
this.usageCounter = usageCounter;
this.uuid = uuidv4();
this.eventLoopDelayConfig = eventLoopDelayConfig;
this.requeueInvalidTasksConfig = requeueInvalidTasksConfig;
this.taskValidator = new TaskValidator({
logger: this.logger,
definitions: this.definitions,
@ -370,31 +364,11 @@ export class TaskManagerRunner implements TaskRunner {
description: 'run task',
};
let taskParamsValidation;
if (this.requeueInvalidTasksConfig.enabled) {
taskParamsValidation = this.validateTaskParams(modifiedContext);
if (!taskParamsValidation.error) {
taskParamsValidation = await this.validateIndirectTaskParams(modifiedContext);
}
}
const result = await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);
const hasSkipError = !isUndefined(taskParamsValidation?.error);
let shouldSkip = false;
let shouldKeepSkipAttempts = false;
if (hasSkipError) {
const reachedMaxSkipAttempts = this.hasReachedMaxSkipAttempts(modifiedContext.taskInstance);
shouldSkip = !reachedMaxSkipAttempts;
shouldKeepSkipAttempts = reachedMaxSkipAttempts;
}
const result = shouldSkip
? taskParamsValidation
: await this.executionContext.withContext(ctx, () =>
withSpan({ name: 'run', type: 'task manager' }, () => this.task!.run())
);
const validatedResult = this.validateResult(shouldKeepSkipAttempts, result);
const validatedResult = this.validateResult(result);
const processedResult = await withSpan({ name: 'process result', type: 'task manager' }, () =>
this.processResult(validatedResult, stopTaskTimer())
);
@ -420,23 +394,6 @@ export class TaskManagerRunner implements TaskRunner {
}
}
private validateTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, params, id } = taskInstance;
try {
const paramsSchema = this.definition.paramsSchema;
if (paramsSchema) {
paramsSchema.validate(params);
}
} catch (err) {
this.logger.warn(`Task (${taskType}/${id}) has a validation error: ${err.message}`);
error = createSkipError(err);
}
return { ...(error ? { error } : {}), state };
}
private validateTaskState(taskInstance: ConcreteTaskInstance) {
const { taskType, id } = taskInstance;
try {
@ -449,30 +406,6 @@ export class TaskManagerRunner implements TaskRunner {
}
}
private async validateIndirectTaskParams({ taskInstance }: RunContext) {
let error;
const { state, taskType, id } = taskInstance;
const indirectParamsSchema = this.definition.indirectParamsSchema;
if (this.task?.loadIndirectParams && !!indirectParamsSchema) {
const { data } = await this.task.loadIndirectParams();
if (data) {
try {
if (indirectParamsSchema) {
indirectParamsSchema.validate(data.indirectParams);
}
} catch (err) {
this.logger.warn(
`Task (${taskType}/${id}) has a validation error in its indirect params: ${err.message}`
);
error = createSkipError(err);
}
}
}
return { ...(error ? { error } : {}), state };
}
public async removeTask(): Promise<void> {
await this.bufferedTaskStore.remove(this.id);
if (this.task?.cleanup) {
@ -599,16 +532,12 @@ export class TaskManagerRunner implements TaskRunner {
}
private validateResult(
shouldKeepSkipAttempts: boolean,
result?: SuccessfulRunResult | FailedRunResult | void
): Result<SuccessfulRunResult, FailedRunResult> {
return isFailedRunResult(result)
? asErr({ ...result, error: result.error })
: asOk({
...(result || EMPTY_RUN_RESULT),
...(shouldKeepSkipAttempts
? { skipAttempts: this.requeueInvalidTasksConfig.max_attempts }
: {}),
});
}
@ -646,22 +575,6 @@ export class TaskManagerRunner implements TaskRunner {
): Result<SuccessfulRunResult, FailedTaskResult> => {
const { state, error } = failureResult;
const { schedule, attempts } = this.instance.task;
const { max_attempts: maxSkipAttempts, enabled, delay } = this.requeueInvalidTasksConfig;
let skipAttempts = this.instance.task.numSkippedRuns ?? 0;
if (isSkipError(error) && enabled) {
skipAttempts = skipAttempts + 1;
const { taskType, id } = this.instance.task;
this.logger.warn(
`Task Manager has skipped executing the Task (${taskType}/${id}) ${skipAttempts} times as it has invalid params.`
);
return asOk({
state: this.instance.task.state,
runAt: moment().add(delay, 'millisecond').toDate(),
attempts: 0,
skipAttempts,
});
}
if (this.shouldTryToScheduleRetry() && !isUnrecoverableError(error)) {
// if we're retrying, keep the number of attempts
@ -684,16 +597,11 @@ export class TaskManagerRunner implements TaskRunner {
return asOk({
state,
attempts,
skipAttempts,
...reschedule,
});
}
}
if (skipAttempts >= maxSkipAttempts && enabled) {
return asErr({ status: TaskStatus.DeadLetter });
}
// scheduling a retry isn't possible,mark task as failed
return asErr({ status: TaskStatus.Failed });
};
@ -712,17 +620,8 @@ export class TaskManagerRunner implements TaskRunner {
schedule: reschedule,
state,
attempts = 0,
skipAttempts,
}: SuccessfulRunResult & { attempts: number }) => {
const { startedAt, schedule, numSkippedRuns } = this.instance.task;
const { taskRunError } = unwrap(result);
let requeueInvalidTaskAttempts = skipAttempts || numSkippedRuns || 0;
// Alerting TaskRunner returns SuccessResult even though there is an error
// therefore we use "taskRunError" to be sure that there wasn't any error
if (isUndefined(skipAttempts) && isUndefined(taskRunError)) {
requeueInvalidTaskAttempts = 0;
}
const { startedAt, schedule } = this.instance.task;
return asOk({
runAt:
@ -731,7 +630,6 @@ export class TaskManagerRunner implements TaskRunner {
schedule: reschedule ?? schedule,
attempts,
status: TaskStatus.Idle,
numSkippedRuns: requeueInvalidTaskAttempts,
});
}
),
@ -890,18 +788,6 @@ export class TaskManagerRunner implements TaskRunner {
? this.definition.maxAttempts
: this.defaultMaxAttempts;
}
private hasReachedMaxSkipAttempts(taskInstance: ConcreteTaskInstance) {
const { taskType, id, numSkippedRuns = 0 } = taskInstance;
const { max_attempts: maxAttempts } = this.requeueInvalidTasksConfig;
if (numSkippedRuns >= maxAttempts) {
this.logger.warn(`Task Manager has reached the max skip attempts for task ${taskType}/${id}`);
return true;
}
return false;
}
}
function sanitizeInstance(instance: ConcreteTaskInstance): ConcreteTaskInstance {

View file

@ -21,7 +21,8 @@
"@kbn/apm-utils",
"@kbn/core-saved-objects-common",
"@kbn/core-saved-objects-utils-server",
"@kbn/core-test-helpers-kbn-server"
"@kbn/core-test-helpers-kbn-server",
"@kbn/core-saved-objects-server"
],
"exclude": [
"target/**/*",

View file

@ -347,7 +347,6 @@ export function createTestConfig(name: string, options: CreateTestConfigOptions)
: []),
'--notifications.connectors.default.email=notification-email',
'--xpack.task_manager.allow_reading_invalid_state=false',
'--xpack.task_manager.requeue_invalid_tasks.enabled=true',
'--xpack.actions.queued.max=500',
`--xpack.stack_connectors.enableExperimental=${JSON.stringify(experimentalFeatures)}`,
],

View file

@ -36,9 +36,6 @@ export default async function ({ readConfigFile }: FtrConfigProviderContext) {
'--xpack.task_manager.monitored_aggregated_stats_refresh_rate=5000',
'--xpack.task_manager.ephemeral_tasks.enabled=false',
'--xpack.task_manager.ephemeral_tasks.request_capacity=100',
'--xpack.task_manager.requeue_invalid_tasks.enabled=true',
'--xpack.task_manager.requeue_invalid_tasks.delay=1000',
'--xpack.task_manager.requeue_invalid_tasks.max_attempts=2',
...findTestPluginPaths(path.resolve(__dirname, 'plugins')),
],
},

View file

@ -219,52 +219,6 @@ export class SampleTaskManagerFixturePlugin
},
}),
},
sampleRecurringTaskWithInvalidIndirectParam: {
title: 'Sample Recurring Task that has invalid indirect params',
description: 'A sample task that returns invalid params in loadIndirectParams all the time',
maxAttempts: 1,
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'foo' } } }; // invalid
},
async run() {
return { state: {}, schedule: { interval: '1s' }, hasError: true };
},
}),
indirectParamsSchema: schema.object({
param: schema.string(),
}),
},
sampleOneTimeTaskWithInvalidIndirectParam: {
title: 'Sample One Time Task that has invalid indirect params',
description:
'A sample task that returns invalid params in loadIndirectParams all the time and throws error in the run method',
maxAttempts: 1,
createTaskRunner: () => ({
async loadIndirectParams() {
return { data: { indirectParams: { baz: 'foo' } } }; // invalid
},
async run() {
throwRetryableError(new Error('Retry'), true);
},
}),
indirectParamsSchema: schema.object({
param: schema.string(),
}),
},
sampleTaskWithParamsSchema: {
title: 'Sample Task That has paramsSchema',
description: 'A sample task that has paramsSchema to validate params',
maxAttempts: 1,
paramsSchema: schema.object({
param: schema.string(),
}),
createTaskRunner: () => ({
async run() {
throwRetryableError(new Error('Retry'), true);
},
}),
},
taskToDisable: {
title: 'Task used for testing it being disabled',
description: '',

View file

@ -33,9 +33,6 @@ export default function ({ getService }: FtrProviderContext) {
'timedTask',
'timedTaskWithLimitedConcurrency',
'timedTaskWithSingleConcurrency',
'sampleRecurringTaskWithInvalidIndirectParam',
'sampleOneTimeTaskWithInvalidIndirectParam',
'sampleTaskWithParamsSchema',
'taskToDisable',
];

View file

@ -17,7 +17,6 @@ export default function ({ loadTestFile }: FtrProviderContext) {
loadTestFile(require.resolve('./task_management_scheduled_at'));
loadTestFile(require.resolve('./task_management_removed_types'));
loadTestFile(require.resolve('./check_registered_task_types'));
loadTestFile(require.resolve('./skip'));
loadTestFile(require.resolve('./migrations'));
});

View file

@ -1,144 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import { SerializedConcreteTaskInstance, TaskStatus } from '@kbn/task-manager-plugin/server/task';
import expect from '@kbn/expect';
import { FtrProviderContext } from '../../ftr_provider_context';
export default function ({ getService }: FtrProviderContext) {
const supertest = getService('supertest');
const log = getService('log');
const retry = getService('retry');
describe('Skip invalid tasks', () => {
function currentTask(task: string): Promise<SerializedConcreteTaskInstance> {
return supertest
.get(`/api/sample_tasks/task/${task}`)
.send({ task })
.expect((response) => {
expect(response.status).to.eql(200);
expect(typeof JSON.parse(response.text).id).to.eql(`string`);
})
.then((response) => response.body);
}
after(async () => {
// clean up after last test
return await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200);
});
it('Skips recurring tasks that has invalid indirect param', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleRecurringTaskWithInvalidIndirectParam',
params: {},
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
let lastRunAt: string;
await retry.try(async () => {
const task = await currentTask(createdTask.id);
lastRunAt = task.runAt;
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
let newLastRun: string;
await retry.try(async () => {
const task = await currentTask(createdTask.id);
expect(task.attempts).to.eql(0);
expect(task.retryAt).to.eql(null);
// skip attempts remains as it is
expect(task.numSkippedRuns).to.eql(2);
// keeps rescheduling after skips
expect(new Date(task.runAt).getTime()).to.greaterThan(new Date(lastRunAt).getTime());
newLastRun = task.runAt;
});
// should keep running the rule after 2 skips and 1 successful run
await retry.try(async () => {
const task = await currentTask(createdTask.id);
expect(task.attempts).to.eql(0);
expect(task.retryAt).to.eql(null);
// skip attempts remains as it is
expect(task.numSkippedRuns).to.eql(2);
// keeps rescheduling after skips
expect(new Date(task.runAt).getTime()).to.greaterThan(new Date(newLastRun).getTime());
});
});
it('Skips non-recurring tasks that have invalid indirect params and sets status as "dead_letter" after 1 reschedule attempt', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleOneTimeTaskWithInvalidIndirectParam',
params: {},
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// reschedules 1 more time and set the status as 'dead_letter'
expect(task.attempts).to.eql(1);
expect(task.status).to.eql(TaskStatus.DeadLetter);
expect(task.numSkippedRuns).to.eql(2);
});
});
it('Skips the tasks with invalid params and sets status as "dead_letter" after 1 reschedule attempt', async () => {
const createdTask = await supertest
.post('/api/sample_tasks/schedule')
.set('kbn-xsrf', 'xxx')
.send({
task: {
taskType: 'sampleTaskWithParamsSchema',
params: { foo: 'bar' }, // invalid params
},
})
.expect(200)
.then((response: { body: SerializedConcreteTaskInstance }) => {
log.debug(`Task Scheduled: ${response.body.id}`);
return response.body;
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// skips 2 times
expect(task.numSkippedRuns).to.eql(2);
});
await retry.try(async () => {
const task = await currentTask(createdTask.id);
// reschedules 1 more time and set the status as 'dead_letter' as the task throws an error
expect(task.attempts).to.eql(1);
expect(task.status).to.eql(TaskStatus.DeadLetter);
expect(task.numSkippedRuns).to.eql(2);
});
});
});
}