[Response Ops] [Alerting] Run rule after enabling (#140664)

* Splitting bulk enable and disable and resetting runAt and scheduledAt on enable

* Adding functional test

* Adding functional test

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
This commit is contained in:
Ying Mao 2022-09-19 14:47:27 -04:00 committed by GitHub
parent d8d1b1097b
commit 8089c01a13
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 427 additions and 146 deletions

View file

@ -2142,7 +2142,7 @@ export class RulesClient {
});
} else {
// Task exists so set enabled to true
await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId!], true);
await this.taskManager.bulkEnable([attributes.scheduledTaskId!]);
}
}
@ -2290,7 +2290,7 @@ export class RulesClient {
if (attributes.scheduledTaskId !== id) {
await this.taskManager.removeIfExists(attributes.scheduledTaskId);
} else {
await this.taskManager.bulkEnableDisable([attributes.scheduledTaskId], false);
await this.taskManager.bulkDisable([attributes.scheduledTaskId]);
}
}
}

View file

@ -229,7 +229,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.bulkDisable).toHaveBeenCalledWith(['1']);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
@ -300,7 +300,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.bulkDisable).toHaveBeenCalledWith(['1']);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(eventLogger.logEvent).toHaveBeenCalledTimes(1);
@ -382,7 +382,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.bulkDisable).toHaveBeenCalledWith(['1']);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(eventLogger.logEvent).toHaveBeenCalledTimes(0);
@ -425,7 +425,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['1'], false);
expect(taskManager.bulkDisable).toHaveBeenCalledWith(['1']);
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
@ -441,7 +441,7 @@ describe('disable()', () => {
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
@ -450,7 +450,7 @@ describe('disable()', () => {
await rulesClient.disable({ id: '1' });
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalled();
expect(taskManager.bulkDisable).toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
expect(rulesClientParams.logger.error).toHaveBeenCalledWith(
'disable(): Failed to load API key of alert 1: Fail'
@ -463,12 +463,12 @@ describe('disable()', () => {
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to update"`
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).not.toHaveBeenCalledWith();
});
test('throws when failing to disable task', async () => {
taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to disable task'));
taskManager.bulkDisable.mockRejectedValueOnce(new Error('Failed to disable task'));
await expect(rulesClient.disable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to disable task"`
@ -516,7 +516,7 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkDisable).not.toHaveBeenCalled();
expect(taskManager.removeIfExists).toHaveBeenCalledWith('task-123');
});
@ -563,6 +563,6 @@ describe('disable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkDisable).not.toHaveBeenCalled();
});
});

View file

@ -239,7 +239,7 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test('enables a rule that does not have an apiKey', async () => {
@ -295,7 +295,7 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test(`doesn't update already enabled alerts but ensures task is enabled`, async () => {
@ -311,7 +311,7 @@ describe('enable()', () => {
expect(rulesClientParams.getUserName).not.toHaveBeenCalled();
expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.create).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test('sets API key when createAPIKey returns one', async () => {
@ -361,7 +361,7 @@ describe('enable()', () => {
version: '123',
}
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test('throws an error if API key creation throws', async () => {
@ -373,7 +373,7 @@ describe('enable()', () => {
await expect(
async () => await rulesClient.enable({ id: '1' })
).rejects.toThrowErrorMatchingInlineSnapshot(`"Error creating API key for rule: no"`);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
});
test('falls back when failing to getDecryptedAsInternalUser', async () => {
@ -384,7 +384,7 @@ describe('enable()', () => {
expect(rulesClientParams.logger.error).toHaveBeenCalledWith(
'enable(): Failed to load API key of alert 1: Fail'
);
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test('throws error when failing to load the saved object using SOC', async () => {
@ -397,7 +397,7 @@ describe('enable()', () => {
expect(rulesClientParams.getUserName).not.toHaveBeenCalled();
expect(rulesClientParams.createAPIKey).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).not.toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
});
test('throws when unsecuredSavedObjectsClient update fails', async () => {
@ -413,7 +413,7 @@ describe('enable()', () => {
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
});
test('enables task when scheduledTaskId is defined and task exists', async () => {
@ -423,11 +423,11 @@ describe('enable()', () => {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).toHaveBeenCalledWith(['task-123'], true);
expect(taskManager.bulkEnable).toHaveBeenCalledWith(['task-123']);
});
test('throws error when enabling task fails', async () => {
taskManager.bulkEnableDisable.mockRejectedValueOnce(new Error('Failed to enable task'));
taskManager.bulkEnable.mockRejectedValueOnce(new Error('Failed to enable task'));
await expect(rulesClient.enable({ id: '1' })).rejects.toThrowErrorMatchingInlineSnapshot(
`"Failed to enable task"`
);
@ -459,7 +459,7 @@ describe('enable()', () => {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalledWith({
id: '1',
taskType: `alerting:myType`,
@ -508,7 +508,7 @@ describe('enable()', () => {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalledWith({
id: '1',
taskType: `alerting:myType`,
@ -543,7 +543,7 @@ describe('enable()', () => {
`"Fail to schedule"`
);
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(1);
});
@ -562,7 +562,7 @@ describe('enable()', () => {
namespace: 'default',
});
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
expect(taskManager.schedule).toHaveBeenCalled();
});
@ -601,7 +601,7 @@ describe('enable()', () => {
expect(rulesClientParams.getUserName).toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenCalledTimes(2);
expect(taskManager.schedule).toHaveBeenCalled();
expect(taskManager.bulkEnableDisable).not.toHaveBeenCalled();
expect(taskManager.bulkEnable).not.toHaveBeenCalled();
expect(unsecuredSavedObjectsClient.update).toHaveBeenNthCalledWith(2, 'alert', '1', {
scheduledTaskId: '1',
});

View file

@ -328,7 +328,10 @@ The _Start_ Plugin api allow you to use Task Manager to facilitate your Plugin's
runSoon: (taskId: string) => {
// ...
},
bulkEnableDisable: (taskIds: string[], enabled: boolean) => {
bulkEnable: (taskIds: string[], runSoon: boolean = true) => {
// ...
},
bulkDisable: (taskIds: string[]) => {
// ...
},
bulkUpdateSchedules: (taskIds: string[], schedule: IntervalSchedule) => {
@ -421,8 +424,8 @@ export class Plugin {
}
```
#### bulkEnableDisable
Using `bulkEnableDisable` you can instruct TaskManger to update the `enabled` status of tasks.
#### bulkDisable
Using `bulkDisable` you can instruct TaskManger to disable tasks by setting the `enabled` status of specific tasks to `false`.
Example:
```js
@ -435,11 +438,37 @@ export class Plugin {
public start(core: CoreStart, plugins: { taskManager }) {
try {
const bulkDisableResults = await taskManager.bulkEnableDisable(
const bulkDisableResults = await taskManager.bulkDisable(
['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'],
false,
);
// If no error is thrown, the bulkEnableDisable has completed successfully.
// If no error is thrown, the bulkDisable has completed successfully.
// But some updates of some tasks can be failed, due to OCC 409 conflict for example
} catch(err: Error) {
// if error is caught, means the whole method requested has failed and tasks weren't updated
}
}
}
```
#### bulkEnable
Using `bulkEnable` you can instruct TaskManger to enable tasks by setting the `enabled` status of specific tasks to `true`. Specify the `runSoon` parameter to run the task immediately on enable.
Example:
```js
export class Plugin {
constructor() {
}
public setup(core: CoreSetup, plugins: { taskManager }) {
}
public start(core: CoreStart, plugins: { taskManager }) {
try {
const bulkEnableResults = await taskManager.bulkEnable(
['97c2c4e7-d850-11ec-bf95-895ffd19f959', 'a5ee24d1-dce2-11ec-ab8d-cf74da82133d'],
true,
);
// If no error is thrown, the bulkEnable has completed successfully.
// But some updates of some tasks can be failed, due to OCC 409 conflict for example
} catch(err: Error) {
// if error is caught, means the whole method requested has failed and tasks weren't updated

View file

@ -30,7 +30,8 @@ const createStartMock = () => {
supportsEphemeralTasks: jest.fn(),
bulkUpdateSchedules: jest.fn(),
bulkSchedule: jest.fn(),
bulkEnableDisable: jest.fn(),
bulkDisable: jest.fn(),
bulkEnable: jest.fn(),
};
return mock;
};

View file

@ -53,7 +53,8 @@ export type TaskManagerStartContract = Pick<
| 'ephemeralRunNow'
| 'ensureScheduled'
| 'bulkUpdateSchedules'
| 'bulkEnableDisable'
| 'bulkEnable'
| 'bulkDisable'
| 'bulkSchedule'
> &
Pick<TaskStore, 'fetch' | 'aggregate' | 'get' | 'remove'> & {
@ -252,7 +253,8 @@ export class TaskManagerPlugin
bulkSchedule: (...args) => taskScheduling.bulkSchedule(...args),
ensureScheduled: (...args) => taskScheduling.ensureScheduled(...args),
runSoon: (...args) => taskScheduling.runSoon(...args),
bulkEnableDisable: (...args) => taskScheduling.bulkEnableDisable(...args),
bulkEnable: (...args) => taskScheduling.bulkEnable(...args),
bulkDisable: (...args) => taskScheduling.bulkDisable(...args),
bulkUpdateSchedules: (...args) => taskScheduling.bulkUpdateSchedules(...args),
ephemeralRunNow: (task: EphemeralTask) => taskScheduling.ephemeralRunNow(task),
supportsEphemeralTasks: () =>

View file

@ -9,7 +9,8 @@ import { TaskScheduling } from './task_scheduling';
const createTaskSchedulingMock = () => {
return {
bulkEnableDisable: jest.fn(),
bulkDisable: jest.fn(),
bulkEnable: jest.fn(),
ensureScheduled: jest.fn(),
schedule: jest.fn(),
runSoon: jest.fn(),

View file

@ -5,6 +5,7 @@
* 2.0.
*/
import sinon from 'sinon';
import { Subject } from 'rxjs';
import moment from 'moment';
@ -21,6 +22,7 @@ import { TaskTypeDictionary } from './task_type_dictionary';
import { ephemeralTaskLifecycleMock } from './ephemeral_task_lifecycle.mock';
import { mustBeAllOf } from './queries/query_clauses';
let fakeTimer: sinon.SinonFakeTimers;
jest.mock('uuid', () => ({
v4: () => 'v4uuid',
}));
@ -33,6 +35,11 @@ jest.mock('elastic-apm-node', () => ({
}));
describe('TaskScheduling', () => {
beforeAll(() => {
fakeTimer = sinon.useFakeTimers();
});
afterAll(() => fakeTimer.restore());
const mockTaskStore = taskStoreMock.create({});
const definitions = new TaskTypeDictionary(mockLogger());
const taskSchedulingOpts = {
@ -145,7 +152,7 @@ describe('TaskScheduling', () => {
});
});
describe('bulkEnableDisable', () => {
describe('bulkEnable', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
@ -153,39 +160,11 @@ describe('TaskScheduling', () => {
);
});
test('should search for tasks by ids enabled = true when disabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], false);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': true,
},
},
],
},
},
size: 100,
});
});
test('should search for tasks by ids enabled = false when enabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], true);
await taskScheduling.bulkEnable([id]);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
@ -213,7 +192,144 @@ describe('TaskScheduling', () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable(Array.from({ length: 1250 }), false);
await taskScheduling.bulkEnable(Array.from({ length: 1250 }), false);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
});
test('should transform response into correct format', async () => {
const successfulTask = mockTask({
id: 'task-1',
enabled: false,
schedule: { interval: '1h' },
});
const failedTask = mockTask({ id: 'task-2', enabled: false, schedule: { interval: '1h' } });
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([
{ tag: 'ok', value: successfulTask },
{ tag: 'err', error: { entity: failedTask, error: new Error('fail') } },
])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkEnable([successfulTask.id, failedTask.id]);
expect(result).toEqual({
tasks: [successfulTask],
errors: [{ task: failedTask, error: new Error('fail') }],
});
});
test('should not enable task if it is already enabled', async () => {
const task = mockTask({ id, enabled: true, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id]);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toHaveLength(0);
});
test('should set runAt and scheduledAt if runSoon is true', async () => {
const task = mockTask({
id,
enabled: false,
schedule: { interval: '3h' },
runAt: new Date('1969-09-13T21:33:58.285Z'),
scheduledAt: new Date('1969-09-10T21:33:58.285Z'),
});
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: task }])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id]);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toEqual([
{
...task,
enabled: true,
runAt: new Date('1970-01-01T00:00:00.000Z'),
scheduledAt: new Date('1970-01-01T00:00:00.000Z'),
},
]);
});
test('should not set runAt and scheduledAt if runSoon is false', async () => {
const task = mockTask({
id,
enabled: false,
schedule: { interval: '3h' },
runAt: new Date('1969-09-13T21:33:58.285Z'),
scheduledAt: new Date('1969-09-10T21:33:58.285Z'),
});
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: task }])
);
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnable([id], false);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toEqual([
{
...task,
enabled: true,
},
]);
});
});
describe('bulkDisable', () => {
const id = '01ddff11-e88a-4d13-bc4e-256164e755e2';
beforeEach(() => {
mockTaskStore.bulkUpdate.mockImplementation(() =>
Promise.resolve([{ tag: 'ok', value: mockTask() }])
);
});
test('should search for tasks by ids enabled = true when disabling', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkDisable([id]);
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(1);
expect(mockTaskStore.fetch).toHaveBeenCalledWith({
query: {
bool: {
must: [
{
terms: {
_id: [`task:${id}`],
},
},
{
term: {
'task.enabled': true,
},
},
],
},
},
size: 100,
});
});
test('should split search on chunks when input ids array too large', async () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkDisable(Array.from({ length: 1250 }));
expect(mockTaskStore.fetch).toHaveBeenCalledTimes(13);
});
@ -234,10 +350,7 @@ describe('TaskScheduling', () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [successfulTask, failedTask] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
const result = await taskScheduling.bulkEnableDisable(
[successfulTask.id, failedTask.id],
false
);
const result = await taskScheduling.bulkDisable([successfulTask.id, failedTask.id]);
expect(result).toEqual({
tasks: [successfulTask],
@ -251,20 +364,7 @@ describe('TaskScheduling', () => {
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], false);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];
expect(bulkUpdatePayload).toHaveLength(0);
});
test('should not enable task if it is already enabled', async () => {
const task = mockTask({ id, enabled: true, schedule: { interval: '3h' } });
mockTaskStore.fetch.mockResolvedValue({ docs: [task] });
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
await taskScheduling.bulkEnableDisable([id], true);
await taskScheduling.bulkDisable([id]);
const bulkUpdatePayload = mockTaskStore.bulkUpdate.mock.calls[0][0];

View file

@ -15,6 +15,7 @@ import { chunk, pick } from 'lodash';
import { Subject } from 'rxjs';
import agent from 'elastic-apm-node';
import { Logger } from '@kbn/core/server';
import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { mustBeAllOf } from './queries/query_clauses';
import { asOk, either, isErr, map, mapErr, promiseResult } from './lib/result_type';
import {
@ -161,59 +162,53 @@ export class TaskScheduling {
return await this.store.bulkSchedule(modifiedTasks);
}
public async bulkEnableDisable(
taskIds: string[],
enabled: boolean
): Promise<BulkUpdateTaskResult> {
const tasks = await pMap(
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: {
bool: {
must: [
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
{
term: {
'task.enabled': !enabled,
},
},
],
},
},
size: BULK_ACTION_SIZE,
}),
{ concurrency: 10 }
);
public async bulkDisable(taskIds: string[]) {
const enabledTasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.enabled': true,
},
});
const updatedTasks = tasks
const updatedTasks = enabledTasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task is not enabled, no need to update it
if (enabled === task.enabled) {
if (!task.enabled) {
return acc;
}
acc.push({ ...task, enabled });
acc.push({ ...task, enabled: false });
return acc;
}, []);
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {
acc.tasks.push(task.value);
return await this.bulkUpdateTasksHelper(updatedTasks);
}
public async bulkEnable(taskIds: string[], runSoon: boolean = true) {
const disabledTasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.enabled': false,
},
});
const updatedTasks = disabledTasks
.flatMap(({ docs }) => docs)
.reduce<ConcreteTaskInstance[]>((acc, task) => {
// if task is enabled, no need to update it
if (task.enabled) {
return acc;
}
if (runSoon) {
acc.push({ ...task, enabled: true, scheduledAt: new Date(), runAt: new Date() });
} else {
acc.errors.push({ error: task.error.error, task: task.error.entity });
acc.push({ ...task, enabled: true });
}
return acc;
},
{ tasks: [], errors: [] }
);
}, []);
return await this.bulkUpdateTasksHelper(updatedTasks);
}
/**
@ -229,26 +224,11 @@ export class TaskScheduling {
taskIds: string[],
schedule: IntervalSchedule
): Promise<BulkUpdateTaskResult> {
const tasks = await pMap(
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: mustBeAllOf(
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
{
term: {
'task.status': 'idle',
},
}
),
size: BULK_ACTION_SIZE,
}),
{ concurrency: 10 }
);
const tasks = await this.bulkGetTasksHelper(taskIds, {
term: {
'task.status': 'idle',
},
});
const updatedTasks = tasks
.flatMap(({ docs }) => docs)
@ -271,6 +251,29 @@ export class TaskScheduling {
return acc;
}, []);
return await this.bulkUpdateTasksHelper(updatedTasks);
}
private async bulkGetTasksHelper(taskIds: string[], ...must: QueryDslQueryContainer[]) {
return await pMap(
chunk(taskIds, BULK_ACTION_SIZE),
async (taskIdsChunk) =>
this.store.fetch({
query: mustBeAllOf(
{
terms: {
_id: taskIdsChunk.map((taskId) => `task:${taskId}`),
},
},
...must
),
size: BULK_ACTION_SIZE,
}),
{ concurrency: 10 }
);
}
private async bulkUpdateTasksHelper(updatedTasks: ConcreteTaskInstance[]) {
return (await this.store.bulkUpdate(updatedTasks)).reduce<BulkUpdateTaskResult>(
(acc, task) => {
if (task.tag === 'ok') {

View file

@ -34,7 +34,7 @@ export default function alertTests({ getService }: FtrProviderContext) {
const esTestIndexTool = new ESTestIndexTool(es, retry);
const taskManagerUtils = new TaskManagerUtils(es, retry);
describe('alerts test me', () => {
describe('alerts', () => {
const authorizationIndex = '.kibana-test-authorization';
const objectRemover = new ObjectRemover(supertest);

View file

@ -111,6 +111,55 @@ export function initRoutes(
}
);
router.post(
{
path: `/api/sample_tasks/bulk_enable`,
validate: {
body: schema.object({
taskIds: schema.arrayOf(schema.string()),
runSoon: schema.boolean({ defaultValue: true }),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
) {
const { taskIds, runSoon } = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.bulkEnable(taskIds, runSoon) });
} catch (err) {
return res.ok({ body: { taskIds, error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/bulk_disable`,
validate: {
body: schema.object({
taskIds: schema.arrayOf(schema.string()),
}),
},
},
async function (
context: RequestHandlerContext,
req: KibanaRequest<any, any, any, any>,
res: KibanaResponseFactory
) {
const { taskIds } = req.body;
try {
const taskManager = await taskManagerStart;
return res.ok({ body: await taskManager.bulkDisable(taskIds) });
} catch (err) {
return res.ok({ body: { taskIds, error: `${err}` } });
}
}
);
router.post(
{
path: `/api/sample_tasks/bulk_update_schedules`,

View file

@ -178,6 +178,24 @@ export default function ({ getService }: FtrProviderContext) {
.then((response) => response.body);
}
function bulkEnable(taskIds: string[], runSoon: boolean) {
return supertest
.post('/api/sample_tasks/bulk_enable')
.set('kbn-xsrf', 'xxx')
.send({ taskIds, runSoon })
.expect(200)
.then((response) => response.body);
}
function bulkDisable(taskIds: string[]) {
return supertest
.post('/api/sample_tasks/bulk_disable')
.set('kbn-xsrf', 'xxx')
.send({ taskIds })
.expect(200)
.then((response) => response.body);
}
function bulkUpdateSchedules(taskIds: string[], schedule: { interval: string }) {
return supertest
.post('/api/sample_tasks/bulk_update_schedules')
@ -623,6 +641,84 @@ export default function ({ getService }: FtrProviderContext) {
expect(await successfulRunSoonResult).to.eql({ id: longRunningTask.id });
});
it('should disable and reenable task and run it when runSoon = true', async () => {
const historyItem = random(1, 100);
const scheduledTask = await scheduleTask({
taskType: 'sampleTask',
schedule: { interval: '30m' },
params: { historyItem },
});
await retry.try(async () => {
expect((await historyDocs()).length).to.eql(1);
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, scheduledTask.id).enabled).to.eql(true);
});
// disable the task
await bulkDisable([scheduledTask.id]);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, scheduledTask.id).enabled).to.eql(false);
});
// re-enable the task
await bulkEnable([scheduledTask.id], true);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, scheduledTask.id).enabled).to.eql(true);
// should get a new document even tho original schedule interval was 30m
expect((await historyDocs()).length).to.eql(2);
});
});
it('should disable and reenable task and not run it when runSoon = false', async () => {
const historyItem = random(1, 100);
const scheduledTask = await scheduleTask({
taskType: 'sampleTask',
schedule: { interval: '30m' },
params: { historyItem },
});
await retry.try(async () => {
expect((await historyDocs()).length).to.eql(1);
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, scheduledTask.id).enabled).to.eql(true);
});
// disable the task
await bulkDisable([scheduledTask.id]);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
expect(getTaskById(tasks, scheduledTask.id).enabled).to.eql(false);
});
// re-enable the task
await bulkEnable([scheduledTask.id], false);
await retry.try(async () => {
const tasks = (await currentTasks()).docs;
const task = getTaskById(tasks, scheduledTask.id);
expect(task.enabled).to.eql(true);
// task runAt should be set in the future by greater than 20 minutes
// this assumes it takes less than 10 minutes to disable and renable the task
// since the schedule interval is 30 minutes
expect(Date.parse(task.runAt) - Date.now()).to.be.greaterThan(10 * 60 * 1000);
});
});
function expectReschedule(
originalRunAt: number,
task: SerializedConcreteTaskInstance<any, any>,