mirror of
https://github.com/elastic/kibana.git
synced 2025-04-23 01:13:23 -04:00
[RAM] adds bulkUpdatesSchedules method to Task Manager API (#132637)
Addresses: https://github.com/elastic/kibana/issues/124850 ## Summary - Adds new method Task Manager API `bulkUpdateSchedules` - Adds calling `taskManager.bulkUpdateSchedules` in rulesClient.bulkEdit to update tasks if updated rules have `scheduleTaskId` property - Enables the rest of operations for rulesClient.bulkEdit (set schedule, notifyWhen, throttle) - #### bulkUpdateSchedules Using `bulkUpdatesSchedules` you can instruct TaskManager to update interval of tasks that are in `idle` status. When interval updated, new `runAt` will be computed and task will be updated with that value ```js export class Plugin { constructor() { } public setup(core: CoreSetup, plugins: { taskManager }) { } public start(core: CoreStart, plugins: { taskManager }) { try { const bulkUpdateResults = await taskManager.bulkUpdateSchedule( ['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'], { interval: '10m' }, ); // If no error is thrown, the bulkUpdateSchedule has completed successfully. // But some updates of some tasks can be failed, due to OCC 409 conflict for example } catch(err: Error) { // if error is caught, means the whole method requested has failed and tasks weren't updated } } } ``` ### in follow-up PRs - use `taskManager.bulkUpdateSchedules` in rulesClient.update (https://github.com/elastic/kibana/pull/134027) - functional test for bulkEdit (https://github.com/elastic/kibana/pull/133635) ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios ### Release note Adds new method to Task Manager - bulkUpdatesSchedules, that allow bulk updates of scheduled tasks. Adds 3 new operations to rulesClient.bulkUpdate: update of schedule, notifyWhen, throttle.
This commit is contained in:
parent
d32d9f571c
commit
6e0086df00
12 changed files with 548 additions and 38 deletions
|
@ -8,7 +8,7 @@
|
|||
import { schema } from '@kbn/config-schema';
|
||||
import { IRouter } from '@kbn/core/server';
|
||||
|
||||
import { ILicenseState, RuleTypeDisabledError } from '../lib';
|
||||
import { ILicenseState, RuleTypeDisabledError, validateDurationSchema } from '../lib';
|
||||
import { verifyAccessAndContext, rewriteRule, handleDisabledApiKeysError } from './lib';
|
||||
import { AlertingRequestHandlerContext, INTERNAL_BASE_ALERTING_API_PATH } from '../types';
|
||||
|
||||
|
@ -34,6 +34,27 @@ const operationsSchema = schema.arrayOf(
|
|||
field: schema.literal('actions'),
|
||||
value: schema.arrayOf(ruleActionSchema),
|
||||
}),
|
||||
schema.object({
|
||||
operation: schema.literal('set'),
|
||||
field: schema.literal('schedule'),
|
||||
value: schema.object({ interval: schema.string({ validate: validateDurationSchema }) }),
|
||||
}),
|
||||
schema.object({
|
||||
operation: schema.literal('set'),
|
||||
field: schema.literal('throttle'),
|
||||
value: schema.nullable(schema.string()),
|
||||
}),
|
||||
schema.object({
|
||||
operation: schema.literal('set'),
|
||||
field: schema.literal('notifyWhen'),
|
||||
value: schema.nullable(
|
||||
schema.oneOf([
|
||||
schema.literal('onActionGroupChange'),
|
||||
schema.literal('onActiveAlert'),
|
||||
schema.literal('onThrottleInterval'),
|
||||
])
|
||||
),
|
||||
}),
|
||||
]),
|
||||
{ minSize: 1 }
|
||||
);
|
||||
|
|
|
@ -211,7 +211,10 @@ export interface FindOptions extends IndexType {
|
|||
filter?: string;
|
||||
}
|
||||
|
||||
export type BulkEditFields = keyof Pick<Rule, 'actions' | 'tags'>;
|
||||
export type BulkEditFields = keyof Pick<
|
||||
Rule,
|
||||
'actions' | 'tags' | 'schedule' | 'throttle' | 'notifyWhen'
|
||||
>;
|
||||
|
||||
export type BulkEditOperation =
|
||||
| {
|
||||
|
@ -223,25 +226,23 @@ export type BulkEditOperation =
|
|||
operation: 'add' | 'set';
|
||||
field: Extract<BulkEditFields, 'actions'>;
|
||||
value: NormalizedAlertAction[];
|
||||
}
|
||||
| {
|
||||
operation: 'set';
|
||||
field: Extract<BulkEditFields, 'schedule'>;
|
||||
value: Rule['schedule'];
|
||||
}
|
||||
| {
|
||||
operation: 'set';
|
||||
field: Extract<BulkEditFields, 'throttle'>;
|
||||
value: Rule['throttle'];
|
||||
}
|
||||
| {
|
||||
operation: 'set';
|
||||
field: Extract<BulkEditFields, 'notifyWhen'>;
|
||||
value: Rule['notifyWhen'];
|
||||
};
|
||||
|
||||
// schedule, throttle, notifyWhen is commented out before https://github.com/elastic/kibana/issues/124850 will be implemented
|
||||
// | {
|
||||
// operation: 'set';
|
||||
// field: Extract<BulkEditFields, 'schedule'>;
|
||||
// value: Rule['schedule'];
|
||||
// }
|
||||
// | {
|
||||
// operation: 'set';
|
||||
// field: Extract<BulkEditFields, 'throttle'>;
|
||||
// value: Rule['throttle'];
|
||||
// }
|
||||
// | {
|
||||
// operation: 'set';
|
||||
// field: Extract<BulkEditFields, 'notifyWhen'>;
|
||||
// value: Rule['notifyWhen'];
|
||||
// };
|
||||
|
||||
type RuleParamsModifier<Params extends RuleTypeParams> = (params: Params) => Promise<Params>;
|
||||
|
||||
export interface BulkEditOptionsFilter<Params extends RuleTypeParams> {
|
||||
|
@ -1494,6 +1495,36 @@ export class RulesClient {
|
|||
);
|
||||
});
|
||||
|
||||
// update schedules only if schedule operation is present
|
||||
const scheduleOperation = options.operations.find(
|
||||
(
|
||||
operation
|
||||
): operation is Extract<BulkEditOperation, { field: Extract<BulkEditFields, 'schedule'> }> =>
|
||||
operation.field === 'schedule'
|
||||
);
|
||||
|
||||
if (scheduleOperation?.value) {
|
||||
const taskIds = updatedRules.reduce<string[]>((acc, rule) => {
|
||||
if (rule.scheduledTaskId) {
|
||||
acc.push(rule.scheduledTaskId);
|
||||
}
|
||||
return acc;
|
||||
}, []);
|
||||
|
||||
try {
|
||||
await this.taskManager.bulkUpdateSchedules(taskIds, scheduleOperation.value);
|
||||
this.logger.debug(
|
||||
`Successfully updated schedules for underlying tasks: ${taskIds.join(', ')}`
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Failure to update schedules for underlying tasks: ${taskIds.join(
|
||||
', '
|
||||
)}. TaskManager bulkUpdateSchedules failed with Error: ${error.message}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return { rules: updatedRules, errors, total };
|
||||
}
|
||||
|
||||
|
|
|
@ -899,4 +899,81 @@ describe('bulkEdit()', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe('task manager', () => {
|
||||
test('should call task manager method bulkUpdateSchedules if operation set new schedules', async () => {
|
||||
unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({
|
||||
saved_objects: [
|
||||
{
|
||||
id: '1',
|
||||
type: 'alert',
|
||||
attributes: {
|
||||
enabled: true,
|
||||
tags: ['foo'],
|
||||
alertTypeId: 'myType',
|
||||
schedule: { interval: '1m' },
|
||||
consumer: 'myApp',
|
||||
scheduledTaskId: 'task-123',
|
||||
params: { index: ['test-index-*'] },
|
||||
throttle: null,
|
||||
notifyWhen: null,
|
||||
actions: [],
|
||||
},
|
||||
references: [],
|
||||
version: '123',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await rulesClient.bulkEdit({
|
||||
operations: [
|
||||
{
|
||||
field: 'schedule',
|
||||
operation: 'set',
|
||||
value: { interval: '10m' },
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(taskManager.bulkUpdateSchedules).toHaveBeenCalledWith(['task-123'], {
|
||||
interval: '10m',
|
||||
});
|
||||
});
|
||||
|
||||
test('should not call task manager method bulkUpdateSchedules if operation is not set schedule', async () => {
|
||||
unsecuredSavedObjectsClient.bulkUpdate.mockResolvedValue({
|
||||
saved_objects: [
|
||||
{
|
||||
id: '1',
|
||||
type: 'alert',
|
||||
attributes: {
|
||||
enabled: true,
|
||||
tags: ['foo'],
|
||||
alertTypeId: 'myType',
|
||||
schedule: { interval: '1m' },
|
||||
consumer: 'myApp',
|
||||
params: { index: ['test-index-*'] },
|
||||
throttle: null,
|
||||
notifyWhen: null,
|
||||
actions: [],
|
||||
},
|
||||
references: [],
|
||||
version: '123',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await rulesClient.bulkEdit({
|
||||
operations: [
|
||||
{
|
||||
field: 'tags',
|
||||
operation: 'set',
|
||||
value: ['test-tag'],
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(taskManager.bulkUpdateSchedules).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -328,6 +328,9 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's
|
|||
runNow: (taskId: string) => {
|
||||
// ...
|
||||
},
|
||||
bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => {
|
||||
// ...
|
||||
},
|
||||
ensureScheduled: (taskInstance: TaskInstanceWithId, options?: any) => {
|
||||
// ...
|
||||
},
|
||||
|
@ -415,6 +418,38 @@ export class Plugin {
|
|||
}
|
||||
```
|
||||
|
||||
#### bulkUpdateSchedules
|
||||
Using `bulkUpdatesSchedules` you can instruct TaskManger to update interval of tasks that are in `idle` status
|
||||
(for the tasks which have `running` status, `schedule` and `runAt` will be recalculated after task run finishes).
|
||||
When interval updated, new `runAt` will be computed and task will be updated with that value, using formula
|
||||
```
|
||||
newRunAt = oldRunAt - oldInterval + newInterval
|
||||
```
|
||||
|
||||
Example:
|
||||
```js
|
||||
export class Plugin {
|
||||
constructor() {
|
||||
}
|
||||
|
||||
public setup(core: CoreSetup, plugins: { taskManager }) {
|
||||
}
|
||||
|
||||
public start(core: CoreStart, plugins: { taskManager }) {
|
||||
try {
|
||||
const bulkUpdateResults = await taskManager.bulkUpdateSchedule(
|
||||
['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'],
|
||||
{ interval: '10m' },
|
||||
);
|
||||
// If no error is thrown, the bulkUpdateSchedule has completed successfully.
|
||||
// But some updates of some tasks can be failed, due to OCC 409 conflict for example
|
||||
} catch(err: Error) {
|
||||
// if error is caught, means the whole method requested has failed and tasks weren't updated
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### more options
|
||||
|
||||
More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time.
|
||||
|
|
|
@ -30,7 +30,7 @@ export {
|
|||
throwUnrecoverableError,
|
||||
isEphemeralTaskRejectedDueToCapacityError,
|
||||
} from './task_running';
|
||||
export type { RunNowResult } from './task_scheduling';
|
||||
export type { RunNowResult, BulkUpdateSchedulesResult } from './task_scheduling';
|
||||
export { getOldestIdleActionTask } from './queries/oldest_idle_action_task';
|
||||
export {
|
||||
IdleTaskWithExpiredRunAt,
|
||||
|
|
|
@ -27,6 +27,7 @@ const createStartMock = () => {
|
|||
ensureScheduled: jest.fn(),
|
||||
removeIfExists: jest.fn(),
|
||||
supportsEphemeralTasks: jest.fn(),
|
||||
bulkUpdateSchedules: jest.fn(),
|
||||
};
|
||||
return mock;
|
||||
};
|
||||
|
|
|
@ -48,7 +48,7 @@ export interface TaskManagerSetupContract {
|
|||
|
||||
export type TaskManagerStartContract = Pick<
|
||||
TaskScheduling,
|
||||
'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled'
|
||||
'schedule' | 'runNow' | 'ephemeralRunNow' | 'ensureScheduled' | 'bulkUpdateSchedules'
|
||||
> &
|
||||
Pick<TaskStore, 'fetch' | 'get' | 'remove'> & {
|
||||
removeIfExists: TaskStore['remove'];
|
||||
|
@ -238,6 +238,7 @@ export class TaskManagerPlugin
|
|||
schedule: (...args) => taskScheduling.schedule(...args),
|
||||
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
|
||||
runNow: (...args) => taskScheduling.runNow(...args),
|
||||
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
|
||||
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
|
||||
supportsEphemeralTasks: () => this.config.ephemeral_tasks.enabled,
|
||||
};
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
|
||||
import { Subject } from 'rxjs';
|
||||
import { none, some } from 'fp-ts/lib/Option';
|
||||
import moment from 'moment';
|
||||
|
||||
import {
|
||||
asTaskMarkRunningEvent,
|
||||
|
@ -27,6 +28,7 @@ import { TaskRunResult } from './task_running';
|
|||
import { mockLogger } from './test_utils';
|
||||
import { TaskTypeDictionary } from './task_type_dictionary';
|
||||
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
|
||||
import { mustBeAllOf } from './queries/query_clauses';
|
||||
|
||||
jest.mock('uuid', () => ({
|
||||
v4: () => 'v4uuid',
|
||||
|
@ -134,6 +136,139 @@ describe('TaskScheduling', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('bulkUpdateSchedules', () => {
|
||||
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
|
||||
beforeEach(() => {
|
||||
mockTaskStore.bulkUpdate.mockImplementation(() =>
|
||||
Promise.resolve([{ tag: 'ok', value: mockTask() }])
|
||||
);
|
||||
});
|
||||
|
||||
test('should search for tasks by ids and idle status', async () => {
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
|
||||
await taskScheduling.bulkUpdateSchedules([id], { interval: '1h' });
|
||||
|
||||
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
|
||||
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
|
||||
query: mustBeAllOf(
|
||||
{
|
||||
terms: {
|
||||
_id: [`task:${id}`],
|
||||
},
|
||||
},
|
||||
{
|
||||
term: {
|
||||
'task.status': 'idle',
|
||||
},
|
||||
}
|
||||
),
|
||||
size: 100,
|
||||
});
|
||||
});
|
||||
|
||||
test('should split search on chunks when input ids array too large', async () => {
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
|
||||
await taskScheduling.bulkUpdateSchedules(Array.from({ length: 1250 }), { interval: '1h' });
|
||||
|
||||
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
|
||||
});
|
||||
|
||||
test('should transform response into correct format', async () => {
|
||||
const successfulTask = mockTask({ id: 'task-1', schedule: { interval: '1h' } });
|
||||
const failedTask = mockTask({ id: 'task-2', schedule: { interval: '1h' } });
|
||||
mockTaskStore.bulkUpdate.mockImplementation(() =>
|
||||
Promise.resolve([
|
||||
{ tag: 'ok', value: successfulTask },
|
||||
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
|
||||
])
|
||||
);
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
|
||||
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
const result = await taskScheduling.bulkUpdateSchedules([successfulTask.id, failedTask.id], {
|
||||
interval: '1h',
|
||||
});
|
||||
|
||||
expect(result).toEqual({
|
||||
tasks: [successfulTask],
|
||||
errors: [{ task: failedTask, error: new Error('fail') }],
|
||||
});
|
||||
});
|
||||
|
||||
test('should not update task if new interval is equal to previous', async () => {
|
||||
const task = mockTask({ id, schedule: { interval: '3h' } });
|
||||
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
|
||||
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
await taskScheduling.bulkUpdateSchedules([id], { interval: '3h' });
|
||||
|
||||
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
|
||||
|
||||
expect(bulkUpdatePayload).toHaveLength(0);
|
||||
});
|
||||
|
||||
test('should postpone task run if new interval is greater than previous', async () => {
|
||||
// task set to be run in 2 hrs from now
|
||||
const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds());
|
||||
const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs });
|
||||
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
|
||||
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
await taskScheduling.bulkUpdateSchedules([id], { interval: '5h' });
|
||||
|
||||
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
|
||||
|
||||
expect(bulkUpdatePayload).toHaveLength(1);
|
||||
expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '5h' });
|
||||
// if tasks updated with schedule interval of '5h' and previous interval was 3h, task will be scheduled to run in 2 hours later
|
||||
expect(bulkUpdatePayload[0].runAt.getTime() - runInTwoHrs.getTime()).toBe(
|
||||
moment.duration(2, 'hours').asMilliseconds()
|
||||
);
|
||||
});
|
||||
|
||||
test('should set task run sooner if new interval is lesser than previous', async () => {
|
||||
// task set to be run in one 2hrs from now
|
||||
const runInTwoHrs = new Date(Date.now() + moment.duration(2, 'hours').asMilliseconds());
|
||||
const task = mockTask({ id, schedule: { interval: '3h' }, runAt: runInTwoHrs });
|
||||
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
|
||||
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
await taskScheduling.bulkUpdateSchedules([id], { interval: '2h' });
|
||||
|
||||
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
|
||||
|
||||
expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '2h' });
|
||||
// if tasks updated with schedule interval of '2h' and previous interval was 3h, task will be scheduled to run in 1 hour sooner
|
||||
expect(runInTwoHrs.getTime() - bulkUpdatePayload[0].runAt.getTime()).toBe(
|
||||
moment.duration(1, 'hour').asMilliseconds()
|
||||
);
|
||||
});
|
||||
|
||||
test('should set task run to now if time that passed from last run is greater than new interval', async () => {
|
||||
// task set to be run in one 1hr from now. With interval of '2h', it means last run happened 1 hour ago
|
||||
const runInOneHr = new Date(Date.now() + moment.duration(1, 'hour').asMilliseconds());
|
||||
const task = mockTask({ id, schedule: { interval: '2h' }, runAt: runInOneHr });
|
||||
|
||||
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
|
||||
|
||||
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
|
||||
await taskScheduling.bulkUpdateSchedules([id], { interval: '30m' });
|
||||
|
||||
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
|
||||
|
||||
expect(bulkUpdatePayload[0]).toHaveProperty('schedule', { interval: '30m' });
|
||||
|
||||
// if time that passed from last task run is greater than new interval, task should be set to run at now time
|
||||
expect(bulkUpdatePayload[0].runAt.getTime()).toBeLessThanOrEqual(Date.now());
|
||||
});
|
||||
});
|
||||
describe('runNow', () => {
|
||||
test('resolves when the task run succeeds', () => {
|
||||
const events$ = new Subject<TaskLifecycleEvent>();
|
||||
|
|
|
@ -6,15 +6,16 @@
|
|||
*/
|
||||
|
||||
import { filter, take } from 'rxjs/operators';
|
||||
|
||||
import pMap from 'p-map';
|
||||
import { pipe } from 'fp-ts/lib/pipeable';
|
||||
import { Option, map as mapOptional, getOrElse, isSome } from 'fp-ts/lib/Option';
|
||||
|
||||
import uuid from 'uuid';
|
||||
import { pick } from 'lodash';
|
||||
import { pick, chunk } from 'lodash';
|
||||
import { merge, Subject } from 'rxjs';
|
||||
import agent from 'elastic-apm-node';
|
||||
import { Logger } from '@kbn/core/server';
|
||||
import { mustBeAllOf } from './queries/query_clauses';
|
||||
import { asOk, either, map, mapErr, promiseResult, isErr } from './lib/result_type';
|
||||
import {
|
||||
isTaskRunEvent,
|
||||
|
@ -28,6 +29,7 @@ import {
|
|||
TaskClaimErrorType,
|
||||
} from './task_events';
|
||||
import { Middleware } from './lib/middleware';
|
||||
import { parseIntervalAsMillisecond } from './lib/intervals';
|
||||
import {
|
||||
ConcreteTaskInstance,
|
||||
TaskInstanceWithId,
|
||||
|
@ -36,6 +38,7 @@ import {
|
|||
TaskLifecycleResult,
|
||||
TaskStatus,
|
||||
EphemeralTask,
|
||||
IntervalSchedule,
|
||||
} from './task';
|
||||
import { TaskStore } from './task_store';
|
||||
import { ensureDeprecatedFieldsAreCorrected } from './lib/correct_deprecated_fields';
|
||||
|
@ -56,6 +59,20 @@ export interface TaskSchedulingOpts {
|
|||
taskManagerId: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* return type of TaskScheduling.bulkUpdateSchedules method
|
||||
*/
|
||||
export interface BulkUpdateSchedulesResult {
|
||||
/**
|
||||
* list of successfully updated tasks
|
||||
*/
|
||||
tasks: ConcreteTaskInstance[];
|
||||
|
||||
/**
|
||||
* list of failed tasks and errors caused failure
|
||||
*/
|
||||
errors: Array<{ task: ConcreteTaskInstance; error: Error }>;
|
||||
}
|
||||
export interface RunNowResult {
|
||||
id: ConcreteTaskInstance['id'];
|
||||
state?: ConcreteTaskInstance['state'];
|
||||
|
@ -111,6 +128,75 @@ export class TaskScheduling {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk updates schedules for tasks by ids.
|
||||
* Only tasks with `idle` status will be updated, as for the tasks which have `running` status,
|
||||
* `schedule` and `runAt` will be recalculated after task run finishes
|
||||
*
|
||||
* @param {string[]} taskIds - list of task ids
|
||||
* @param {IntervalSchedule} schedule - new schedule
|
||||
* @returns {Promise<BulkUpdateSchedulesResult>}
|
||||
*/
|
||||
public async bulkUpdateSchedules(
|
||||
taskIds: string[],
|
||||
schedule: IntervalSchedule
|
||||
): Promise<BulkUpdateSchedulesResult> {
|
||||
const tasks = await pMap(
|
||||
chunk(taskIds, 100),
|
||||
async (taskIdsChunk) =>
|
||||
this.store.fetch({
|
||||
query: mustBeAllOf(
|
||||
{
|
||||
terms: {
|
||||
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
term: {
|
||||
'task.status': 'idle',
|
||||
},
|
||||
}
|
||||
),
|
||||
size: 100,
|
||||
}),
|
||||
{ concurrency: 10 }
|
||||
);
|
||||
|
||||
const updatedTasks = tasks
|
||||
.flatMap(({ docs }) => docs)
|
||||
.reduce<ConcreteTaskInstance[]>((acc, task) => {
|
||||
// if task schedule interval is the same, no need to update it
|
||||
if (task.schedule?.interval === schedule.interval) {
|
||||
return acc;
|
||||
}
|
||||
|
||||
const oldIntervalInMs = parseIntervalAsMillisecond(task.schedule?.interval ?? '0s');
|
||||
|
||||
// computing new runAt using formula:
|
||||
// newRunAt = oldRunAt - oldInterval + newInterval
|
||||
const newRunAtInMs = Math.max(
|
||||
Date.now(),
|
||||
task.runAt.getTime() - oldIntervalInMs + parseIntervalAsMillisecond(schedule.interval)
|
||||
);
|
||||
|
||||
acc.push({ ...task, schedule, runAt: new Date(newRunAtInMs) });
|
||||
return acc;
|
||||
}, []);
|
||||
|
||||
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateSchedulesResult>(
|
||||
(acc, task) => {
|
||||
if (task.tag === 'ok') {
|
||||
acc.tasks.push(task.value);
|
||||
} else {
|
||||
acc.errors.push({ error: task.error.error, task: task.error.entity });
|
||||
}
|
||||
|
||||
return acc;
|
||||
},
|
||||
{ tasks: [], errors: [] }
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run task.
|
||||
*
|
||||
|
|
|
@ -437,7 +437,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) {
|
|||
statusCode: 400,
|
||||
error: 'Bad Request',
|
||||
message:
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]',
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [delete]\n - [request body.operations.0.operation.2]: expected value to equal [set]\n- [request body.operations.0.1.operation]: types that failed validation:\n - [request body.operations.0.operation.0]: expected value to equal [add]\n - [request body.operations.0.operation.1]: expected value to equal [set]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]',
|
||||
});
|
||||
expect(response.statusCode).to.eql(400);
|
||||
break;
|
||||
|
@ -446,21 +446,14 @@ export default function createUpdateTests({ getService }: FtrProviderContext) {
|
|||
}
|
||||
});
|
||||
|
||||
it('should handle bulk edit of rules when operation field is invalid', async () => {
|
||||
const { body: createdRule } = await supertest
|
||||
.post(`${getUrlPrefix(space.id)}/api/alerting/rule`)
|
||||
.set('kbn-xsrf', 'foo')
|
||||
.send(getTestRuleData({ tags: ['foo'] }))
|
||||
.expect(200);
|
||||
objectRemover.add(space.id, createdRule.id, 'rule', 'alerting');
|
||||
|
||||
it('should handle bulk edit of rules when operation value type is incorrect', async () => {
|
||||
const payload = {
|
||||
ids: [createdRule.id],
|
||||
filter: '',
|
||||
operations: [
|
||||
{
|
||||
operation: 'add',
|
||||
field: 'test',
|
||||
value: ['test'],
|
||||
field: 'tags',
|
||||
value: 'not an array',
|
||||
},
|
||||
],
|
||||
};
|
||||
|
@ -482,7 +475,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) {
|
|||
statusCode: 400,
|
||||
error: 'Bad Request',
|
||||
message:
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]',
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.value]: could not parse array value from json input\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]',
|
||||
});
|
||||
expect(response.statusCode).to.eql(400);
|
||||
break;
|
||||
|
@ -520,7 +513,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) {
|
|||
statusCode: 400,
|
||||
error: 'Bad Request',
|
||||
message:
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]',
|
||||
'[request body.operations.0]: types that failed validation:\n- [request body.operations.0.0.field]: expected value to equal [tags]\n- [request body.operations.0.1.field]: expected value to equal [actions]\n- [request body.operations.0.2.operation]: expected value to equal [set]\n- [request body.operations.0.3.operation]: expected value to equal [set]\n- [request body.operations.0.4.operation]: expected value to equal [set]',
|
||||
});
|
||||
expect(response.statusCode).to.eql(400);
|
||||
break;
|
||||
|
|
|
@ -111,6 +111,31 @@ export function initRoutes(
|
|||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
{
|
||||
path: `/api/sample_tasks/bulk_update_schedules`,
|
||||
validate: {
|
||||
body: schema.object({
|
||||
taskIds: schema.arrayOf(schema.string()),
|
||||
schedule: schema.object({ interval: schema.string() }),
|
||||
}),
|
||||
},
|
||||
},
|
||||
async function (
|
||||
context: RequestHandlerContext,
|
||||
req: KibanaRequest<any, any, any, any>,
|
||||
res: KibanaResponseFactory
|
||||
) {
|
||||
const { taskIds, schedule } = req.body;
|
||||
try {
|
||||
const taskManager = await taskManagerStart;
|
||||
return res.ok({ body: await taskManager.bulkUpdateSchedules(taskIds, schedule) });
|
||||
} catch (err) {
|
||||
return res.ok({ body: { taskIds, error: `${err}` } });
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
router.post(
|
||||
{
|
||||
path: `/api/sample_tasks/ephemeral_run_now`,
|
||||
|
|
|
@ -5,12 +5,13 @@
|
|||
* 2.0.
|
||||
*/
|
||||
|
||||
import moment from 'moment';
|
||||
import { random, times } from 'lodash';
|
||||
import expect from '@kbn/expect';
|
||||
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
|
||||
import TaskManagerMapping from '@kbn/task-manager-plugin/server/saved_objects/mappings.json';
|
||||
import { DEFAULT_MAX_WORKERS, DEFAULT_POLL_INTERVAL } from '@kbn/task-manager-plugin/server/config';
|
||||
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
|
||||
import { ConcreteTaskInstance, BulkUpdateSchedulesResult } from '@kbn/task-manager-plugin/server';
|
||||
import { FtrProviderContext } from '../../ftr_provider_context';
|
||||
|
||||
const {
|
||||
|
@ -177,6 +178,15 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
.then((response) => response.body);
|
||||
}
|
||||
|
||||
function bulkUpdateSchedules(taskIds: string[], schedule: { interval: string }) {
|
||||
return supertest
|
||||
.post('/api/sample_tasks/bulk_update_schedules')
|
||||
.set('kbn-xsrf', 'xxx')
|
||||
.send({ taskIds, schedule })
|
||||
.expect(200)
|
||||
.then((response: { body: BulkUpdateSchedulesResult }) => response.body);
|
||||
}
|
||||
|
||||
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
|
||||
// function runEphemeralTaskNow(task: {
|
||||
// taskType: string;
|
||||
|
@ -899,6 +909,101 @@ export default function ({ getService }: FtrProviderContext) {
|
|||
});
|
||||
});
|
||||
|
||||
it('should bulk update schedules for multiple tasks', async () => {
|
||||
const initialTime = Date.now();
|
||||
const tasks = await Promise.all([
|
||||
scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
schedule: { interval: '1h' },
|
||||
params: {},
|
||||
}),
|
||||
|
||||
scheduleTask({
|
||||
taskType: 'sampleTask',
|
||||
schedule: { interval: '5m' },
|
||||
params: {},
|
||||
}),
|
||||
]);
|
||||
|
||||
const taskIds = tasks.map(({ id }) => id);
|
||||
|
||||
await retry.try(async () => {
|
||||
// ensure each task has ran at least once and been rescheduled for future run
|
||||
for (const task of tasks) {
|
||||
const { state } = await currentTask<{ count: number }>(task.id);
|
||||
expect(state.count).to.be(1);
|
||||
}
|
||||
|
||||
// first task to be scheduled in 1h
|
||||
expect(Date.parse((await currentTask(tasks[0].id)).runAt) - initialTime).to.be.greaterThan(
|
||||
moment.duration(1, 'hour').asMilliseconds()
|
||||
);
|
||||
|
||||
// second task to be scheduled in 5m
|
||||
expect(Date.parse((await currentTask(tasks[1].id)).runAt) - initialTime).to.be.greaterThan(
|
||||
moment.duration(5, 'minutes').asMilliseconds()
|
||||
);
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const updates = await bulkUpdateSchedules(taskIds, { interval: '3h' });
|
||||
|
||||
expect(updates.tasks.length).to.be(2);
|
||||
expect(updates.errors.length).to.be(0);
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const updatedTasks = (await currentTasks()).docs;
|
||||
|
||||
updatedTasks.forEach((task) => {
|
||||
expect(task.schedule).to.eql({ interval: '3h' });
|
||||
// should be scheduled to run in 3 hours
|
||||
expect(Date.parse(task.runAt) - initialTime).to.be.greaterThan(
|
||||
moment.duration(3, 'hours').asMilliseconds()
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should not bulk update schedules for task in running status', async () => {
|
||||
// this task should be in running status for 60s until it will be time outed
|
||||
const longRunningTask = await scheduleTask({
|
||||
taskType: 'sampleRecurringTaskWhichHangs',
|
||||
schedule: { interval: '1h' },
|
||||
params: {},
|
||||
});
|
||||
|
||||
runTaskNow({ id: longRunningTask.id });
|
||||
|
||||
let scheduledRunAt: string;
|
||||
// ensure task is running and store scheduled runAt
|
||||
await retry.try(async () => {
|
||||
const task = await currentTask(longRunningTask.id);
|
||||
|
||||
expect(task.status).to.be('running');
|
||||
|
||||
scheduledRunAt = task.runAt;
|
||||
});
|
||||
|
||||
await retry.try(async () => {
|
||||
const updates = await bulkUpdateSchedules([longRunningTask.id], { interval: '3h' });
|
||||
|
||||
// length should be 0, as task in running status won't be updated
|
||||
expect(updates.tasks.length).to.be(0);
|
||||
expect(updates.errors.length).to.be(0);
|
||||
});
|
||||
|
||||
// ensure task wasn't updated
|
||||
await retry.try(async () => {
|
||||
const task = await currentTask(longRunningTask.id);
|
||||
|
||||
// interval shouldn't be changed
|
||||
expect(task.schedule).to.eql({ interval: '1h' });
|
||||
|
||||
// scheduledRunAt shouldn't be changed
|
||||
expect(task.runAt).to.eql(scheduledRunAt);
|
||||
});
|
||||
});
|
||||
// TODO: Add this back in with https://github.com/elastic/kibana/issues/106139
|
||||
// it('should return the resulting task state when asked to run an ephemeral task now', async () => {
|
||||
// const ephemeralTask = await runEphemeralTaskNow({
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue